Source code for canberry.logic

# -*- coding: utf-8 -*-
"""
High-level functions to read and write a sensor of the MOVIDRIVE traction
converter
"""
from __future__ import print_function, absolute_import, division

import logging

import can
from .can_utils import make_sdo, bytes_to_int, Service
from .utils import list_attributes, read_config


__author__ = 'Florian Wilhelm'
__copyright__ = 'Florian Wilhelm'

_logger = logging.getLogger(__name__)


[docs]class Sensor(object): """ Namespace for convenient and consistent naming """ SPEED = 'speed' DUMMY1 = 'dummy1' DUMMY2 = 'dummy2' code = {SPEED: 0x207E, DUMMY1: 0xFFFF, DUMMY2: 0xFFFF} @classmethod
[docs] def list_all(cls): attrs = list_attributes(cls) sensors = {k: v for k, v in attrs.items() if isinstance(v, basestring)} if not read_config()['debug']: sensors = {k: v for k,v in sensors.items() if not k.startswith('DUMMY')} return sensors
[docs]def read_sensor(sensor): """ Retrieve the data from a sensor :param sensor: name of a sensor according to :obj:`inSensor` :return: sensor data as dictionary """ identifier = read_config()['identifier'] bus = can.interface.Bus() services = [Service.READ_PARAM, Service.READ_DEFAULT, Service.READ_MIN, Service.READ_MAX, Service.READ_SCALE] response = dict() for service in services: msg = make_sdo(recipient=identifier, index=Sensor.code[sensor], service=service) _logger.debug("Sending {} message to {}...".format(service, sensor)) if bus.send(msg) < 0: raise RuntimeError('Error sending message') _logger.debug("Waiting for message...") reply = bus.recv() _logger.debug("Message received:\n{}".format(reply)) response[service] = bytes_to_int(reply.data[4:8]) return response
[docs]def is_sensor_known(sensor): """ Check if sensor is known :param sensor: sensor as string :return: boolean """ for known_sensor in Sensor.list_all().values(): if sensor == known_sensor: return True return False
[docs]def write_sensor(sensor, value, volatile=False): """ Write a value to a sensor :param sensor: name of a sensor according to :obj:`inSensor` :param value: value to write :param volatile: write parameter volatile as boolean """ identifier = read_config()['identifier'] bus = can.interface.Bus() if volatile: service = Service.WRITE_PARAM_VOLATILE else: service = Service.WRITE_PARAM msg = make_sdo(recipient=identifier, index=Sensor.code[sensor], service=service, value=value) _logger.debug("Sending {} message with value {} message " "to {}...".format(service, value, sensor)) if bus.send(msg) < 0: raise RuntimeError('Error sending message')