diff --git a/README.md b/README.md index 22c4d269404c753be01ef6940270c08180bf92f3..90333e1cb779c90131c3820fe45a2f12e283471d 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@ PI MQTT GPIO [](https://gitter.im/mqtt-io/community?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge) -Expose the Raspberry Pi GPIO pins, external IO modules and I2C sensors to an MQTT server. This allows pins to be read and switched by reading or writing messages to MQTT topics. The I2C sensors will be read periodically and publish their values. +Expose the Raspberry Pi GPIO pins, external IO modules, streams and I2C sensors to an MQTT server. This allows pins to be read and switched by reading or writing messages to MQTT topics. The streams and I2C sensors will be read periodically and publish their values. GPIO Modules ------------ @@ -22,6 +22,11 @@ Sensors - BH1750 light level sensor (`bh1750`) - DS18S20, DS1822, DS18B20, DS1825, DS28EA00, MAX31850K one-wire temperature sensors: (`ds18b`) +Streams +------- + +- Serial port (`streamserial`) + Installation ------------ @@ -163,6 +168,48 @@ sensor_inputs: digits: 2 ``` +### Streams + +Transmit data by publishing to the `home/stream/<stream write name>` topic. In the following example, this would be `home/stream/serialtx`. + +Receive data from a stream by subscribing to the `home/stream/<stream read name>` topic. In the following example, this would be `home/stream/serialrx`. + +The stream data is parsed using Python's `string_escape` to allow the transfer of control characters. + +```yaml +mqtt: + host: test.mosquitto.org + port: 1883 + user: "" + password: "" + topic_prefix: home + +stream_modules: + - name: serialcomms + module: streamserial + device: /dev/ttyS0 + baud: 115200 + cleanup: no # This optional boolean value sets whether the module's `cleanup()` function will be called when the software exits. + +stream_reads: + - name: serialrx + module: serialcomms + interval: 0.25 # Stream read polling interval in seconds + +stream_writes: + - name: serialtx + module: serialcomms +``` + +Testing example: + +```bash +# -N disables printing extra new line on each subscription +mosquitto_sub -h <broker url> -t <topic prefix>/stream/serialrx -N + +mosquitto_pub -h <broker url> -t <topic prefix>/stream/serialtx -m "testing123\r\n" +``` + ### OrangePi boards You need to specify what OrangePi board you use diff --git a/pi_mqtt_gpio/__init__.py b/pi_mqtt_gpio/__init__.py index 9ed256df001e3bc9cfbd22ef823c157dfcc97071..b602729c527bec694af69980b17f24a5f532ce40 100644 --- a/pi_mqtt_gpio/__init__.py +++ b/pi_mqtt_gpio/__init__.py @@ -149,6 +149,27 @@ sensor_modules: required: no default: yes +stream_modules: + type: list + required: no + default: [] + schema: + type: dict + allow_unknown: yes + schema: + name: + type: string + required: yes + empty: no + module: + type: string + required: yes + empty: no + cleanup: + type: boolean + required: no + default: yes + digital_inputs: type: list required: no @@ -290,6 +311,49 @@ sensor_inputs: default: 2 min: 0 +stream_reads: + type: list + required: no + default: [] + schema: + type: dict + allow_unknown: yes + schema: + name: + type: string + required: yes + empty: no + module: + type: string + required: yes + empty: no + retain: + type: boolean + required: no + default: no + interval: + type: float + required: no + default: 60 + min: 0.01 + +stream_writes: + type: list + required: no + default: [] + schema: + type: dict + allow_unknown: yes + schema: + name: + type: string + required: yes + empty: no + module: + type: string + required: yes + empty: no + logging: type: dict required: no diff --git a/pi_mqtt_gpio/modules/__init__.py b/pi_mqtt_gpio/modules/__init__.py index 69fff58827cfbd2c563e24a622f067e53748dd1b..5d8b8ee04d653a6509c65d999cadaa9c6849b31d 100644 --- a/pi_mqtt_gpio/modules/__init__.py +++ b/pi_mqtt_gpio/modules/__init__.py @@ -90,3 +90,28 @@ class GenericSensor(object): Called when closing the program to handle any cleanup operations. """ pass + +class GenericStream(object): + """ + Abstracts a generic stream interface to be implemented + by the modules in this directory. + """ + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def setup_stream(self, config): + pass + + @abc.abstractmethod + def read(self, config): + pass + + @abc.abstractmethod + def write(self, config, data): + pass + + def cleanup(self): + """ + Called when closing the program to handle any cleanup operations. + """ + pass diff --git a/pi_mqtt_gpio/modules/streamserial.py b/pi_mqtt_gpio/modules/streamserial.py new file mode 100644 index 0000000000000000000000000000000000000000..d9467817dd903e59d4f78e6a79d68d1f4cc41eaf --- /dev/null +++ b/pi_mqtt_gpio/modules/streamserial.py @@ -0,0 +1,49 @@ +from __future__ import print_function + +from pi_mqtt_gpio.modules import GenericStream +from serial import Serial + +REQUIREMENTS = ("serial",) +CONFIG_SCHEMA = { + "device": {"type": "string", "required": True, "empty": False}, + "baud": {"type": "integer", "required": True, "empty": False}, +} + +PORTS_USED = {} + +class Stream(GenericStream): + """ + Implementation of stream class for outputting to STDIO. + """ + + def __init__(self, config): + global PORTS_USED + #print("__init__(config=%r)" % config) + if not config['device'] in PORTS_USED: + self.ser = Serial(config['device'], config['baud'], timeout=20) + self.ser.flushInput() + PORTS_USED[config['device']] = self.ser + else: + self.ser = PORTS_USED[config['device']] + + def setup_stream(self, config): + #print("setup_stream(config=%r)" % config) + pass + + def read(self, config): + if (self.ser.inWaiting() <= 0): + return None + data = self.ser.read(self.ser.inWaiting()).decode('string_escape') + #print("read(config=%r) = %s" % (config, data)) + return data + + def write(self, config, data): + #print("write(config=%r, data=%s)" % (config,data)) + self.ser.write(data) + pass + + def cleanup(self): + for device, s in PORTS_USED: + s.close() + PORTS_USED.pop(device) + pass diff --git a/pi_mqtt_gpio/server.py b/pi_mqtt_gpio/server.py index 4c60d2bf8f85375653e066a1e6bb730eba264cb0..c43ec141ab735a8316c7d35371ea729707a80468 100644 --- a/pi_mqtt_gpio/server.py +++ b/pi_mqtt_gpio/server.py @@ -33,9 +33,13 @@ LOG_LEVEL_MAP = { RECONNECT_DELAY_SECS = 5 GPIO_MODULES = {} # storage for gpio modules SENSOR_MODULES = {} # storage for sensor modules +STREAM_MODULES = {} # storage for stream modules GPIO_CONFIGS = {} # storage for gpios SENSOR_CONFIGS = {} # storage for sensors +STREAM_CONFIGS = {} # storage for streams SENSOR_INPUT_CONFIGS = {} # storage for sensor input configs +STREAM_READ_CONFIGS = {} # storage for streams read configs +STREAM_WRITE_CONFIGS = {} # storage for streams write configs LAST_STATES = {} GPIO_INTERRUPT_LOOKUP = {} SET_TOPIC = "set" @@ -44,6 +48,7 @@ SET_OFF_MS_TOPIC = "set_off_ms" OUTPUT_TOPIC = "output" INPUT_TOPIC = "input" SENSOR_TOPIC = "sensor" +STREAM_TOPIC = "stream" _LOG = logging.getLogger("mqtt_gpio") @@ -117,6 +122,20 @@ def output_by_name(output_name): _LOG.warning("No output found with name of %r", output_name) +def stream_write_by_name(name): + """ + Returns the stream write configuration for a given name. + :param name: The name of the write + :type output_name: str + :return: The output configuration or None if not found + :rtype: dict + """ + for write in stream_writes: + if write["name"] == name: + return write + _LOG.warning("No write found with name of %r", name) + + def set_pin(topic_prefix, output_config, value): """ Sets the output pin to a new value and publishes it on MQTT. @@ -145,6 +164,26 @@ def set_pin(topic_prefix, output_config, value): payload=payload, ) +def stream_write_output(topic_prefix, stream_write_config, data): + """ + Writes data to stream + :param topic_prefix: the name of the topic, the pin is published + :type topic_prefix: string + :param stream_write_config: The write stream configuration + :type stream_write_config: dict + :param data: data to write + :type value: string + :return: None + :rtype: NoneType + """ + sw = STREAM_MODULES[stream_write_config["module"]] + sw.write(stream_write_config["name"], data.encode("utf-8")) + _LOG.info( + "Write %r %r write %s", + stream_write_config["module"], + stream_write_config["name"], + data, + ) def get_pin(in_conf, module): """ @@ -235,6 +274,22 @@ def handle_set_ms(topic_prefix, msg, value): ms, ) +def handle_raw(topic_prefix, msg): + """ + Handles an incoming raw MQTT message. + :param topic_prefix: the name of the topic + :type topic_prefix: string + :param msg: The incoming MQTT message + :type msg: paho.mqtt.client.MQTTMessage + :return: None + :rtype: NoneType + """ + stream_write_name = stream_write_name_from_topic(msg.topic, topic_prefix) + stream_write_config = stream_write_by_name(stream_write_name) + if stream_write_config is None: + return + payload = msg.payload.decode("string_escape") + stream_write_output(topic_prefix, stream_write_config, payload) def install_missing_requirements(module): """ @@ -269,6 +324,25 @@ def install_missing_requirements(module): ) +def type_from_topic(topic, topic_prefix): + """ + Return the topic type for this topic + The topics are formatted as topic_prefix/type[/parameter...]. + The parameter section is optional. + :param topic: String such as 'mytopicprefix/output/tv_lamp/set' + :type topic: str + :param topic_prefix: Prefix of our topicsclient, + :type topic_prefix: str + :return: Type for this topic + :rtype: str + """ + # Strip off the prefix + lindex = len("%s/" % topic_prefix) + s = topic[lindex:] + # Get remaining fields, the first one is the type + fields = s.split('/') + return fields[0] + def output_name_from_topic(topic, topic_prefix, suffix): """ Return the name of the output which the topic is setting. @@ -288,13 +362,28 @@ def output_name_from_topic(topic, topic_prefix, suffix): return topic[lindex:rindex] -def init_mqtt(config, digital_outputs): +def stream_write_name_from_topic(topic, topic_prefix): + """ + Return the name of the stream write which the topic is setting. + :param topic: String such as 'mytopicprefix/stream/tx' + :type topic: str + :param topic_prefix: Prefix of our topicsclient, + :type topic_prefix: str + :return: Name of the output this topic is setting + :rtype: str + """ + lindex = len("%s/%s/" % (topic_prefix, STREAM_TOPIC)) + return topic[lindex:] + +def init_mqtt(config, digital_outputs, stream_writes): """ Configure MQTT client. :param config: Validated config dict containing MQTT connection details :type config: dict :param digital_outputs: List of validated config dicts for digital outputs :type digital_outputs: list + :param stream_writes: List of validated config dicts for stream writes + :type stream_writes: list :return: Connected and initialised MQTT client :rtype: paho.mqtt.client.Client """ @@ -371,6 +460,14 @@ def init_mqtt(config, digital_outputs): ) client.subscribe(topic, qos=1) _LOG.info("Subscribed to topic: %r", topic) + for stream_write_conf in stream_writes: + topic = "%s/%s/%s" % ( + topic_prefix, + STREAM_TOPIC, + stream_write_conf["name"], + ) + client.subscribe(topic, qos=1) + _LOG.info("Subscribed to topic: %r", topic) client.publish( status_topic, config["status_payload_running"], qos=1, retain=True ) @@ -379,7 +476,7 @@ def init_mqtt(config, digital_outputs): for in_conf in digital_inputs: hass_announce_digital_input(in_conf, topic_prefix, config) for out_conf in digital_outputs: - hass_announce_digital_output(out_conf, topic_prefix, config) + hass_announce_digital_output(out_conf, topic_prefix, config) elif rc == 1: _LOG.fatal("Incorrect protocol version used to connect to MQTT broker.") sys.exit(1) @@ -409,13 +506,19 @@ def init_mqtt(config, digital_outputs): :rtype: NoneType """ try: - _LOG.info("Received message on topic %r: %r", msg.topic, msg.payload) - if msg.topic.endswith("/%s" % SET_TOPIC): - handle_set(topic_prefix, msg) - elif msg.topic.endswith("/%s" % SET_ON_MS_TOPIC): - handle_set_ms(topic_prefix, msg, True) - elif msg.topic.endswith("/%s" % SET_OFF_MS_TOPIC): - handle_set_ms(topic_prefix, msg, False) + topic_type = type_from_topic(msg.topic, topic_prefix) + _LOG.info("Received message on topic %r type: %r: %r", msg.topic, topic_type, msg.payload) + if topic_type == OUTPUT_TOPIC: + if msg.topic.endswith("/%s" % SET_TOPIC): + handle_set(topic_prefix, msg) + elif msg.topic.endswith("/%s" % SET_ON_MS_TOPIC): + handle_set_ms(topic_prefix, msg, True) + elif msg.topic.endswith("/%s" % SET_OFF_MS_TOPIC): + handle_set_ms(topic_prefix, msg, False) + else: + _LOG.warning("Unhandled output topic %r.", msg.topic) + elif topic_type == STREAM_TOPIC: + handle_raw(topic_prefix, msg) else: _LOG.warning("Unhandled topic %r.", msg.topic) except InvalidPayload as exc: @@ -472,6 +575,27 @@ def configure_sensor_module(sensor_config): return sensor_module.Sensor(sensor_config) +def configure_stream_module(stream_config): + """ + Imports stream module, validates its config and returns an instance of it. + :param stream_config: Module configuration values + :type stream_config: dict + :return: Configured instance of the stream module + :rtype: pi_mqtt_gpio.modules.GenericStream + """ + stream_module = import_module("pi_mqtt_gpio.modules.%s" % stream_config["module"]) + # Doesn't need to be a deep copy because we won't modify the base + # validation rules, just add more of them. + module_config_schema = BASE_SCHEMA.copy() + module_config_schema.update(getattr(stream_module, "CONFIG_SCHEMA", {})) + module_validator = cerberus.Validator(module_config_schema) + if not module_validator.validate(stream_config): + raise ModuleConfigInvalid(module_validator.errors) + stream_config = module_validator.normalized(stream_config) + install_missing_requirements(stream_module) + return stream_module.Stream(stream_config) + + def initialise_digital_input(in_conf, gpio): """ Initialises digital input. @@ -642,6 +766,124 @@ def sensor_timer_thread(SENSOR_MODULES, sensor_inputs, topic_prefix): sleep(max(0, next_call - time())) +def validate_stream_read_config(stream_conf): + """ + Validates stream read config. + :param stream_conf: Stream read config + :type stream_conf: dict + :return: None + :rtype: NoneType + """ + stream_module = import_module( + "pi_mqtt_gpio.modules.%s" % STREAM_CONFIGS[stream_conf["module"]]["module"] + ) + # Doesn't need to be a deep copy because we won't modify the base + # validation rules, just add more of them. + stream_read_schema = CONFIG_SCHEMA["stream_reads"]["schema"]["schema"].copy() + stream_read_schema.update(getattr(stream_module, "STREAM_SCHEMA", {})) + stream_validator = cerberus.Validator(stream_read_schema) + if not stream_validator.validate(stream_conf): + raise ModuleConfigInvalid(stream_validator.errors) + return stream_validator.normalized(stream_conf) + + +def validate_stream_write_config(stream_conf): + """ + Validates stream write config. + :param stream_conf: Stream write config + :type stream_conf: dict + :return: None + :rtype: NoneType + """ + stream_module = import_module( + "pi_mqtt_gpio.modules.%s" % STREAM_CONFIGS[stream_conf["module"]]["module"] + ) + # Doesn't need to be a deep copy because we won't modify the base + # validation rules, just add more of them. + stream_write_schema = CONFIG_SCHEMA["stream_writes"]["schema"]["schema"].copy() + stream_write_schema.update(getattr(stream_module, "STREAM_SCHEMA", {})) + stream_validator = cerberus.Validator(stream_write_schema) + if not stream_validator.validate(stream_conf): + raise ModuleConfigInvalid(stream_validator.errors) + return stream_validator.normalized(stream_conf) + + +def initialise_stream(stream_conf, stream): + """ + Initialises stream. + :param stream_conf: Stream config + :type stream_conf: dict + :param stream: Instance of GenericStream to use + :type stream: pi_mqtt_gpio.modules.GenericStream + :return: None + :rtype: NoneType + """ + stream.setup_stream(stream_conf) + + +def stream_timer_thread(STREAM_MODULES, stream_reads, topic_prefix): + """ + Timer thread for the stream reads + To reduce cpu usage, there is only one cyclic thread for all streams. + At the beginning the cycle time is calculated (ggT) to match all intervals. + For each stream interval, the reduction value is calculated, that triggers + the read, round and publish for the stream, when loop_count is a multiple + of it. In worst case, the cycle_time is 1 second, in best case, e.g., when + there is only one stream, cycle_time is its interval. + """ + # calculate the min time + arr = [] + for stream_conf in stream_reads: + arr.append(stream_conf.get("interval", 60)) + + # get the greatest common divisor (gcd) for the list of interval times + cycle_time = reduce(lambda x, y: gcd(x, y), arr) + + _LOG.debug( + "stream_timer_thread: calculated cycle_time will be %d seconds", cycle_time + ) + + for stream_conf in stream_reads: + stream_conf["interval_reduction"] = stream_conf.get("interval", 60) / cycle_time + + # Start the cyclic thread + loop_count = 0 + next_call = time() + while True: + loop_count += 1 + for stream_conf in stream_reads: + if loop_count % stream_conf["interval_reduction"] == 0: + stream = STREAM_MODULES[stream_conf["module"]] + + try: + data = stream.read(stream_conf) + if data is None: + continue + if len(data) == 0: + continue + + _LOG.info( + "stream_timer_thread: reading stream '%s' data %s", + stream_conf["name"], + data, + ) + client.publish( + "%s/%s/%s" % (topic_prefix, STREAM_TOPIC, stream_conf["name"]), + payload=data, + retain=stream_conf["retain"], + ) + except ModuleConfigInvalid as exc: + _LOG.error( + "stream_timer_thread: failed to read stream '%s': %s", + stream_conf["name"], + exc, + ) + + # schedule next call + next_call = next_call + cycle_time # every cycle_time sec + sleep(max(0, next_call - time())) + + def gpio_interrupt_callback(module, pin): try: in_conf = GPIO_INTERRUPT_LOOKUP[module][pin] @@ -740,6 +982,7 @@ def hass_announce_digital_output(out_conf, topic_prefix, mqtt_config): def main(args): global digital_inputs global digital_outputs + global stream_writes global client global scheduler @@ -762,8 +1005,10 @@ def main(args): digital_inputs = config["digital_inputs"] digital_outputs = config["digital_outputs"] sensor_inputs = config["sensor_inputs"] + stream_reads = config["stream_reads"] + stream_writes = config["stream_writes"] - client = init_mqtt(config["mqtt"], config["digital_outputs"]) + client = init_mqtt(config["mqtt"], config["digital_outputs"], config["stream_writes"]) topic_prefix = config["mqtt"]["topic_prefix"] # Install modules for GPIOs @@ -794,6 +1039,20 @@ def main(args): ) sys.exit(1) + # Install modules for Streams + for stream_config in config.get("stream_modules", {}): + STREAM_CONFIGS[stream_config["name"]] = stream_config + try: + STREAM_MODULES[stream_config["name"]] = configure_stream_module(stream_config) + except ModuleConfigInvalid as exc: + _LOG.error( + "Config for %r module named %r did not validate:\n%s", + stream_config["module"], + stream_config["name"], + yaml.dump(exc.errors), + ) + sys.exit(1) + for in_conf in digital_inputs: initialise_digital_input(in_conf, GPIO_MODULES[in_conf["module"]]) LAST_STATES[in_conf["name"]] = None @@ -816,6 +1075,36 @@ def main(args): sys.exit(1) initialise_sensor_input(sens_conf, SENSOR_MODULES[sens_conf["module"]]) + for stream_conf in stream_reads: + try: + STREAM_READ_CONFIGS[stream_conf["name"]] = validate_stream_read_config( + stream_conf + ) + except ModuleConfigInvalid as exc: + _LOG.error( + "Config for %r stream named %r did not validate:\n%s", + STREAM_CONFIGS[stream_conf["module"]]["module"], + stream_conf["name"], + yaml.dump(exc.errors), + ) + sys.exit(1) + initialise_stream(stream_conf, STREAM_MODULES[stream_conf["module"]]) + + for stream_conf in stream_writes: + try: + STREAM_WRITE_CONFIGS[stream_conf["name"]] = validate_stream_write_config( + stream_conf + ) + except ModuleConfigInvalid as exc: + _LOG.error( + "Config for %r stream named %r did not validate:\n%s", + STREAM_CONFIGS[stream_conf["module"]]["module"], + stream_conf["name"], + yaml.dump(exc.errors), + ) + sys.exit(1) + initialise_stream(stream_conf, STREAM_MODULES[stream_conf["module"]]) + try: client.connect(config["mqtt"]["host"], config["mqtt"]["port"], 60) except socket.error as err: @@ -841,6 +1130,21 @@ def main(args): sensor_thread.daemon = True sensor_thread.start() + # Starting the stream thread (if there are streams configured) + if stream_reads: + stream_thread = threading.Thread( + target=stream_timer_thread, + kwargs={ + "STREAM_MODULES": STREAM_MODULES, + "stream_reads": STREAM_READ_CONFIGS.values(), + "topic_prefix": topic_prefix, + }, + ) + stream_thread.name = "pi-mqtt-gpio_StreamReader" + # stops the thread, when main program terminates + stream_thread.daemon = True + stream_thread.start() + while True: for in_conf in digital_inputs: # Only read pins that are not configured as interrupt.