#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright (C) 2015 Legrand France
# All rights reserved
import time
import logging
from pyzigbee.core.log import NullHandler
from pyzigbee.core.exceptions import PyZigBeeBadArgument
from pyzigbee.core.exceptions import PyZigBeeFailed
from pyzigbee.core.exceptions import PyZigBeeException
from pyzigbee.drivers.basedriver import BaseDriver
from pyzigbee.protocols.baseprotocol import BaseProtocol
[docs]class Gateway(object):
"""
Gateways abstracts access to real devices
"""
def __init__(self, driver, protocol, description=""):
self.set_driver(driver)
self.set_protocol(protocol)
self.description = description
self.logger = logging.getLogger(__name__)
self.logger.addHandler(NullHandler())
def set_driver(self, driver):
if isinstance(driver, BaseDriver):
self.driver = driver
else:
raise PyZigBeeBadArgument("%s is not a subclass of BaseDriver"
% driver)
def set_protocol(self, protocol):
if isinstance(protocol, BaseProtocol):
self.protocol = protocol
else:
raise PyZigBeeBadArgument("%s is not a subclass of BaseProtocol"
% protocol)
def get_info(self):
return {'description': self.description,
'driver': self.driver.get_info(),
'protocol': self.protocol.get_info()}
def get_firmware_version(self, zigbee_id=None):
sequence = self.protocol.encode_get_firmware_version(zigbee_id)
answer = self._get_answer(sequence)
return self.protocol.decode_firmware_version(answer, zigbee_id)
def get_hardware_version(self, zigbee_id=None):
sequence = self.protocol.encode_get_hardware_version(zigbee_id)
answer = self._get_answer(sequence)
return self.protocol.decode_hardware_version(answer, zigbee_id)
def open(self):
self.driver.open()
return self
def close(self):
self.driver.close()
return self
def _run_sequence(self, sequence):
answer = None
for seq in sequence:
if "tx" in seq.keys():
self.driver.write(seq["tx"])
if "rx" in seq.keys():
data = self.driver.read(to_read=len(seq["rx"]))
if data != seq["rx"]:
self.protocol.handle_error(expected=seq["rx"], received=data)
if "delay" in seq.keys():
delay = int(seq["delay"])
self.logger.debug("sleeping for %d seconds...", delay)
time.sleep(delay)
if "answer" in seq.keys():
answer = self.driver.read(stop_on=self.protocol.get_end_of_frame_sep())
self.protocol.check_answer(answer=answer)
return answer
def _get_answer(self, sequence):
answer = self._run_sequence(sequence)
if answer is not None:
return answer
else:
raise PyZigBeeFailed("Device did not reply")
def scan(self, delay=5):
"""
Scan the network and return a list of ZigBee IDs
"""
self.driver.set_unblocking_mode()
self.logger.debug("getting number of devices...")
sequence = self.protocol.encode_get_dev_number(delay=delay)
answer = self._get_answer(sequence)
dev_nb = self.protocol.decode_dev_number(answer)
self.logger.debug("%d device(s) on the network", dev_nb)
# we can now loop over the devices
dev_ids = []
for i in range(0, dev_nb):
try:
self.logger.debug("getting device ID at index %d...", i)
sequence = self.protocol.encode_get_dev_id(dev_index=i)
answer = self._get_answer(sequence)
dev_id = self.protocol.decode_dev_id(answer)
self.logger.info("device ID at index %d: %s", i, dev_id)
dev_ids.append(dev_id)
except PyZigBeeException as error:
self.logger.warn("failed to get device ID at index %d (%s)",
i, error)
return dev_ids
def receive(self, timeout=None):
"""
Receive frame from the network
Optional arg: read timeout in seconds for non blocking mode
"""
if timeout is None or timeout == "":
self.driver.set_blocking_mode()
else:
self.driver.set_unblocking_mode(timeout=timeout)
return self.driver.read(stop_on=self.protocol.get_end_of_frame_sep())
def _decode_binding(self, zigbee_id):
"""
Decode binding request from device
"""
answer = self.driver.read(stop_on=self.protocol.get_end_of_frame_sep())
dev_id = self.protocol.decode_binding_id(answer)
if dev_id != zigbee_id:
raise PyZigBeeBadArgument("Received un/binding request from ID: %s"
" (expected was: %s)" % (dev_id, zigbee_id))
return dev_id
def bind(self, zigbee_id):
"""
Bind procedure
arg: the zigbee device to bind with
"""
self.driver.set_blocking_mode()
dev_id = self._decode_binding(zigbee_id)
sequence = self.protocol.encode_binding_request(dev_id)
self._run_sequence(sequence)
def unbind(self, zigbee_id):
"""
Unbind procedure
arg: the zigbee device to unbind from
"""
self.driver.set_blocking_mode()
dev_id = self._decode_binding(zigbee_id)
sequence = self.protocol.encode_unbinding_request(dev_id)
self._run_sequence(sequence)