#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright (C) 2015 Legrand France
# All rights reserved
import serial
import logging
import re
from pyzigbee.core.log import NullHandler
from pyzigbee.core.exceptions import PyZigBeeFailed
from pyzigbee.core.exceptions import PyZigBeeTimedOut
from pyzigbee.core.exceptions import PyZigBeeBadFormat
from pyzigbee.drivers.basedriver import BaseDriver
READ_TIMEOUT = 3
WRITE_TIMEOUT = 5
[docs]class SerialDriver(BaseDriver):
"""
Serial driver to communicate with underlying hardware
keyword args are:
- port: the serial port such as /dev/tty3 or COM3
- baudrate: the serial line speed
- parity: the serial line parity
"""
def __init__(self, **kwargs):
super(SerialDriver, self).__init__(kwargs=kwargs)
self.port = self._get_or_default(kwargs, 'port', '/dev/ttyUSB0')
self.baudrate = int(self._get_or_default(kwargs, 'baudrate', 115200))
self.parity = self._get_or_default(kwargs, 'parity', 'N')
self.dev = None
self.logger = logging.getLogger(__name__)
self.logger.addHandler(NullHandler())
def _get_or_default(self, params, key, default=None):
return params[key] if key in params.keys() else default
def _is_blocking_mode(self):
return True if self.dev.timeout is None else False
def _is_blocking_mode_str(self):
return "blocking mode" if self._is_blocking_mode() else "non-blocking mode"
def on_open(self):
try:
self.logger.debug("opening serial port %s...", self.port)
self.dev = serial.Serial(port=self.port,
baudrate=self.baudrate,
parity=self.parity,
timeout=READ_TIMEOUT,
writeTimeout=WRITE_TIMEOUT)
self.logger.debug("serial port %s open (%s)",
self.port, self._is_blocking_mode_str())
except (OSError, serial.serialutil.SerialException) as error:
self.logger.error('error when opening serial port %s (%s)',
self.port, error)
raise PyZigBeeFailed(msg="Failed to open serial port %s (%s)"
% (self.port, error))
def on_close(self):
self.logger.debug("closed serial port %s.", self.port)
self.dev = None
def on_write(self, data):
try:
self.logger.debug("write data: %s", data)
self.dev.write(data)
except serial.SerialTimeoutException:
raise PyZigBeeTimedOut("Timeout when writing to device")
def on_read(self, to_read=None, stop_on=None):
data = ""
if to_read is not None and to_read != "":
data = self.dev.read(size=int(to_read))
else:
if stop_on is not None:
stop_pattern = re.escape(stop_on)
self.logger.debug("next read will stop when receiving: %s", stop_on)
endof = False
while not endof:
byte = self.dev.read(size=1)
data += byte
# self.logger.debug("received byte: %s", byte)
if re.match(".*%s" % stop_pattern, data):
endof = True
else:
data = self.dev.read()
self.logger.debug("read data: %s", data)
return data
def get_info(self):
return {"description": "Serial driver",
"port": self.port,
"baudrate": self.baudrate,
"parity": self.parity}
def set_blocking(self):
if self.dev.timeout is not None:
self.dev.timeout = None
self.logger.debug("serial port %s set to blocking mode", self.port)
else:
self.logger.debug("serial port %s already set to blocking mode", self.port)
def set_unblocking(self, timeout=READ_TIMEOUT):
try:
if self.dev.timeout is None:
self.dev.timeout = timeout
self.logger.debug("serial port %s set to non-blocking mode (timeout: %s)",
self.port, self.dev.timeout)
else:
self.logger.debug("serial port %s already set to non-blocking mode",
self.port)
except ValueError as error:
raise PyZigBeeBadFormat("timeout must be an integer (%s)" % error)