Source code for pyzigbee.drivers.basedriver

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright (C) 2015 Legrand France
# All rights reserved

from pyzigbee.core.exceptions import PyZigBeeDenied
from pyzigbee.core.exceptions import PyZigBeeFailed
from pyzigbee.core.exceptions import PyZigBeeBadFormat
from pyzigbee.core.exceptions import PyZigBeeNotSupported


[docs]class BaseDriver(object): """ Base driver inherited by all the drivers """ def __init__(self, **kwargs): kwargs = kwargs self.is_open = False def on_open(self): raise PyZigBeeNotSupported("on_open: This method must be" " implemented by your driver") def on_close(self): raise PyZigBeeNotSupported("on_close: This method must be" " implemented by your driver") def on_write(self, data): data = data raise PyZigBeeNotSupported("on_write: This method must be" " implemented by your driver") def on_read(self, to_read, stop_on): to_read = to_read stop_on = stop_on raise PyZigBeeNotSupported("on_read: This method must be" " implemented by your driver") def set_blocking(self): raise PyZigBeeNotSupported("set_blocking: This method must be" " implemented by your driver") def set_unblocking(self, timeout=0): timeout = timeout raise PyZigBeeNotSupported("set_unblocking: This method must be" " implemented by your driver") def open(self): if not self.is_open: self.is_open = True self.on_open() else: raise PyZigBeeDenied("Driver is already open") def close(self): if self.is_open: self.is_open = False self.on_close() else: raise PyZigBeeDenied("Driver is already closed") def write(self, data): if not self.is_open: raise PyZigBeeDenied("Driver is closed") try: self.on_write(data) except Exception as error: raise PyZigBeeFailed("failed to write data to driver (%s)" % error) def read(self, to_read=None, stop_on=None): if not self.is_open: raise PyZigBeeDenied("Driver is closed") try: return self.on_read(to_read=to_read, stop_on=stop_on) except Exception as error: raise PyZigBeeFailed("failed to read data from driver (%s)" % error) def get_info(self): raise PyZigBeeNotSupported("get_info: This method must be" " implemented by your driver") def set_blocking_mode(self): if not self.is_open: raise PyZigBeeDenied("Driver is closed") self.set_blocking() def set_unblocking_mode(self, timeout=0): if not self.is_open: raise PyZigBeeDenied("Driver is closed") try: timeout = int(timeout) except ValueError as error: raise PyZigBeeBadFormat("timeout passed to" " set_unblocking_mode must be an integer (%s)" % error) self.set_unblocking(timeout=timeout)