Source code for pyzigbee.protocols.openwebnet

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright (C) 2015 Legrand France
# All rights reserved

import re
import logging
from pyzigbee.core.log import NullHandler
from pyzigbee.core.exceptions import PyZigBeeBadFormat
from pyzigbee.protocols.baseprotocol import BaseProtocol


OWN_ACK = "*#*1##"
OWN_NACK = "*#*0##"


[docs]class OWNProtocol(BaseProtocol): """ OWN protocol is in charge of decoding/encoding OpenWebNet frames """ def __init__(self): self.logger = logging.getLogger(__name__) self.logger.addHandler(NullHandler()) def _build_where_field(self, zigbee_id=None): if zigbee_id is None: return "" else: return zigbee_id def handle_error(self, expected, received): """ Handler called on error when expected data differs from received one """ self.logger.warn("frame error (expected:%s , received: %s)", expected, received) raise PyZigBeeBadFormat("Frame error: received '%s' when " "expected was '%s'" % (received, expected)) def check_answer(self, answer): """ Handler called on received answer """ if answer == OWN_NACK: self._raise_format_error("NACK received", answer) return answer def _raise_format_error(self, msg, data): if data == OWN_NACK: self.logger.debug("OWN: received NACK (%s)", OWN_NACK) self.logger.warn("OWN format error: %s (data: %s)", msg, data) raise PyZigBeeBadFormat("OWN: %s (data: %s)" % (msg, data)) def get_info(self): return {"description": "OpenWebNet protocol"} def get_end_of_frame_sep(self): return "##" def encode_get_dev_number(self, delay=5): """ Build the frames sequence to get the number of devices delay: number of seconds to wait for an answer """ return [{"tx": "*13*65*##"}, {"rx": OWN_ACK}, {"delay": delay}, {"answer": ""}] def decode_dev_number(self, data): """ Decode the given data to find the number of devices """ m = re.match("\*\#13\*\*67\*(\S+)\#\#", data) if m is not None: dev_nb = int(m.group(1)) self.logger.debug("number of devices: %d", dev_nb) return dev_nb else: self._raise_format_error("could not extract device number from frame", data) def encode_get_dev_id(self, dev_index): """ Build the frames sequence to get the device ID from a given device index """ return [{"tx": "*#13**66#%d##" % dev_index}, {"answer": ""}] def decode_dev_id(self, data): """ Decode the given data to find the device ID """ m = re.match("\*\#13\*(.*)\#.*\#.*\#\#", data) if m is not None: dev_id = m.group(1) self.logger.debug("device ID: %s", dev_id) return dev_id else: self._raise_format_error("could not extract device ID from frame", data) def encode_get_firmware_version(self, zigbee_id=None): """ Build the frames sequence to get the firmware version of the gateway """ return [{"tx": "*#13*%s*16##" % self._build_where_field(zigbee_id)}, {"answer": ""}, {"rx": OWN_ACK}] def encode_get_hardware_version(self, zigbee_id=None): """ Build the frames sequence to get the firmware version of the gateway """ return [{"tx": "*#13*%s*17##" % self._build_where_field(zigbee_id)}, {"answer": ""}, {"rx": OWN_ACK}] def decode_firmware_version(self, data, zigbee_id=None): return self._decode_version(data, "16", zigbee_id) def decode_hardware_version(self, data, zigbee_id=None): return self._decode_version(data, "17", zigbee_id) def _decode_version(self, data, code="16", zigbee_id=None): """ Decode the given data to find the version """ escaped_id = re.escape(self._build_where_field(zigbee_id)) m = re.match("\*\#13\*%s\*%s\*(\S+)\*(\S+)\*(\S+)\#\#" % (escaped_id, code), data) if m is not None: version = m.group(1) + "." + m.group(2) + "." + m.group(3) self.logger.debug("version: %s", version) return version else: self._raise_format_error("could not extract version from frame", data) def decode_binding_id(self, data): """ Decode binding ID from the given data """ m = re.match("\*25\*35\*(\S+)\#9\#\#", data) if m is not None: binding_id = m.group(1) self.logger.debug("binding ID: %s", binding_id) return binding_id else: self._raise_format_error("could not extract bindign ID from frame", data) def encode_binding_request(self, zigbee_id): """ Encode binding request for a given zigbee ID """ zigbee_id = self._build_where_field(zigbee_id) return [{"tx": "*25*33*%s#9##" % zigbee_id}, {"rx": OWN_ACK}, {"rx": "*25*36*%s#9##" % zigbee_id}] def encode_unbinding_request(self, zigbee_id): """ Encode unbinding request for a given zigbee ID """ zigbee_id = self._build_where_field(zigbee_id) return [{"tx": "*25*34*%s#9##" % zigbee_id}, {"rx": OWN_ACK}, {"rx": "*25*36*%s#9##" % zigbee_id}]