Source code for mascara_electronics

"""
.. module:: mascara_electronics
.. moduleauthor:: Julien Spronck, Anna-Lea Lesage
.. created:: Feb 18, 2014
.. version:: 0.1 
.. last change:: May 14. 2014 by Anna-Lea.
.. change history:
     Added the function matchPort2ID, to fetch the serial number of the 
            deviced connected. Implemented this in findCOMPort. Now the instance 
            RHsensor does deliver a serial number!! -- Apr 2, 2014
    Added listallport() to list all the ports in use on the computer -- Apr 3, 2014
    Changed RHSensor.__init__() to run through the available ports and find
            the RH sensors. Now 2 sensors can be connected on random ports and found. -- Apr 4, 2014
    Added PDU class
                  
This module is for external communication with devices such as humidity/temperature sensor, relays, leds and PDUs

Example:
To read the humidity sensor
   ``
    >>> import Maselec
    >>> rh = Maselec.RHSensor()
    >>> rh.getHumidity() # relative humidity in %
    '25.7'
    >>> rh.getTemperature() # temperature in Celsius
    '21.1'
    >>> rh.serialNumber 
    'N14010707A'
   ``
"""

import serial
import warnings
import ctypes as ct
import socket
import time
import mascara_config as cfg
import mascara_credentials as creds 
import telnetlib

[docs]class OnOffObject(object): '''A class for all the on-off objects: relays, heaters, LEDs. They are activated via relays. '''
[docs] def __init__(self,daq, channel, inversed=False): ''' Initialization of the relay :param daq: an instance of the DAQ class representing the DAQ :param channel: digital channel of the object *see :ref:`mascara_config` for existing channels* ''' self.channel = channel self.daq = daq self.state = 'off' self.inversed = inversed self.turnOff()
[docs] def turnOn(self): '''Switches on ''' if self.inversed: self.daq.dBitOut(self.channel,1) else: self.daq.dBitOut(self.channel,0) self.state = 'on'
[docs] def turnOff(self): '''Switches off ''' if self.inversed: self.daq.dBitOut(self.channel,0) else: self.daq.dBitOut(self.channel,1) self.state = 'off'
[docs] def isOn(self): '''is the switch on? ''' return self.state == 'on'
[docs] def isOff(self): '''is the switch on? ''' return self.state == 'off'
[docs]class Relay(OnOffObject): '''A subclass of the OnOffObject class for the relays: ''' def __init__(self,daq,channel, inversed=False): OnOffObject.__init__(self, daq, channel, inversed=inversed)
[docs]class CameraRelay(OnOffObject): '''A subclass of the OnOffObject class for the relays: ''' def __init__(self,daq,channel, inversed=False): OnOffObject.__init__(self, daq, channel, inversed=inversed)
[docs]class Heater(OnOffObject): '''A subclass of the OnOffObject class for the heaters: ''' def __init__(self,daq,channel): OnOffObject.__init__(self,daq,channel)
class LEDS(OnOffObject): '''A subclass of the OnOffObject class for the LEDs: ''' def __init__(self,daq,channel): OnOffObject.__init__(self,daq,channel)
[docs]class LimitSwitch(object): '''A class for the limit switches '''
[docs] def __init__(self,daq,channelDOut,channelAIn): '''Initilization of the limit switch :param daq: an instance of the DAQ class representing the DAQ :param channelDOut: channel of the digital output of the limit switch :param channelAIn: channel of the analog input of the limit switch ''' self.daq = daq self.channelDOut = channelDOut self.channelAIn = channelAIn self.daq.dBitOut(self.channelDOut,1)
def isPushed(self): self.daq.dBitOut(self.channelDOut,1) return self.daq.vIn(self.channelAIn) < 2.5
[docs]class PowerSupply(object): """ Class for the power supplies """ #from serial import SerialException
[docs] def __init__(self, daq, ID): """ Initialize the power supply :param daq: an instance from the DAQ class :param ID: the ID of the power supply (1 or 2) """ if ID > 2 or ID < 1: raise ValueError('ID can only be 1 or 2 (which of the two power supplies)') self.id = ID self.vMax = 32. # How many Volts can it deliver? self.cMax = 20. # How many amps can it deliver? self.chVIn = cfg.chPSVoltage # channel for input voltage self.chCIn = cfg.chPSCurrent # channel for input current self.daq = daq if ID == 1: self.chVOut = cfg.chPS1VoltageOut # channel for output voltage self.chCOut = cfg.chPS1CurrentOut # channel for outpur current elif ID == 2: self.chVOut = cfg.chPS2VoltageOut # channel for output voltage self.chCOut = cfg.chPS2CurrentOut # channel for outpur current self.setVoltage(0) self.setCurrent(0)
[docs] def getVoltage(self): """ Gets output voltage of the power supply """ # There should be voltage dividers between the output of the power supply and the daq return self.daq.vIn(self.chVOut)/10.*self.vMax
[docs] def getCurrent(self): """ Gets output voltage of the power supply """ # There should be voltage dividers between the output of the power supply and the daq return self.daq.vIn(self.chCOut)/10.*self.cMax
[docs] def setVoltage(self,V): """ Sets output voltage of the power supply Do NOT go higher than 24V, the peltiers will not like it """ # There should be voltage doublers between the input of the power supply and the daq if V < 0: V = 0 if V > 24: # DO NOT put more than 24V out of the power supply (it can go to 32V) # The peltiers won't like it V = 24. self.daq.vOut(self.chVIn,V/self.vMax*5.)
[docs] def setCurrent(self,C): """ Sets output current of the power supply """ # There should be voltage doublers between the input of the power supply and the daq if C < 0: C = 0 if C > self.cMax: C = self.cMax self.daq.vOut(self.chCIn,C/self.cMax*5.)
[docs] def turnOn(self,V=24,C=20): """Sets the voltage of the power supply to 24V, current to 0 A""" self.setVoltage(V) self.setCurrent(C)
[docs] def turnOff(self): """Sets the voltage of the power supply to 0V, current to 0 A""" self.setCurrent(0)
[docs]class PDU(object):
[docs] def __init__(self, ip, username= creds.pdu_username, password= creds.pdu_password, timeout= 5): '''Initilization of the PDU. The PDUs are accessed using Telnet protocol, thus they have each a username and password to access the PDU. :param ip: the IP address of the PDU :keyword str username: the identifier of the pdu :keyword str password: the password to access the pdu :keyword int timeout: timeout for reading the PDU buffer ''' self.ip = ip self.username = username self.password = password self.timeout= timeout self.connect() self.attempts = 0
[docs] def connect(self): '''Connect to the PDU using telnet ''' try: self.tn = telnetlib.Telnet(self.ip, timeout= self.timeout) self.tn.read_until('User Name : ', timeout= self.timeout) self.tn.write(self.username+'\r') self.tn.read_until('Password : ', timeout= self.timeout) self.tn.write(self.password+' -c\r') self.tn.read_until('APC> ',timeout= self.timeout) self.connected = True except Exception as err: print 'Connection error: '+err.message self.connected = False
[docs] def close(self): '''close the connection to the PDU ''' if self.connected: self.tn.close() self.connected = False
[docs] def turn_on(self, outlet): '''Turn on a given outlet ''' self.trysend('on '+str(outlet),"turn on/off")
[docs] def turn_off(self, outlet): '''Turn off a given outlet ''' self.trysend('off '+str(outlet),"turn on/off")
[docs] def reboot(self, outlet): '''Reboot a given outlet ''' self.trysend('reboot '+str(outlet),"reboot") time.sleep(30)
[docs] def turn_all_on(self): '''Turn on all outlets ''' self.trysend('on all',"turn on/off")
[docs] def turn_all_off(self): '''Turn off all outlets ''' self.trysend('off all',"turn on/off")
[docs] def reboot_all(self): '''Turn off all outlets ''' self.trysend('reboot all',"reboot") time.sleep(30)
[docs] def get_status(self): '''Get status of all outlets ''' st = self.trysend('status all',"get status") if not self.connected: st = [' 1: ?',' 2: ?',' 3: ?',' 4: ?',' 5: ?',' 6: ?',' 7: ?',' 8: ?'] return st
[docs] def get_current(self): '''Get total current draw in Amps ''' return self.trysend('current',"get current")[0]
[docs] def get_power(self): '''Get total power load ''' return self.trysend('power',"get power")[0]
[docs] def trysend(self, cmd, msg): '''Tries sending a command and try again once if it failed. ''' try: ans = self.send(cmd) if self.attempts == 1: print 'Success!' self.attempts = 0 self.connected = True except Exception as err: self.attempts += 1 self.close() self.connect() if self.attempts == 1: print "Attempt to "+msg+" failed. Now trying again ..." ans = self.trysend(cmd, msg) else: try: print "Attempt to "+msg+" failed: {0}".format(err.strerror) except: print "Attempt to "+msg+" failed again." ans = None self.connected = False return ans
[docs] def send(self, cmd): '''Sends a command (without carriage return) and returns the result as an array of strings ''' self.tn.write(cmd+'\r') ans = self.tn.read_until('APC> ',timeout= self.timeout) ans = ans.split('\r\n')[2:-1] return ans
# class IPrelay(): # '''Creates a interface for connecting and deconnecting the IP relay''' # def __init__(self, host, port=23, timeout=3): # '''IPrelay is an interface for controlling the IPrelay HWg-ER02b. # Inputs: host, '192.168.0.xx' # port, default is 23 # timeout, default is 3 seconds # ''' # # ##socket.setdefaulttimeout(timeout) # ## The first run is to close any previously opened connection # self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # self.is_closed = False # self.host = host # self.port = port # # try: # self.s.connect((host, port)) # except socket.timeout: # self.s.close() # self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # self.s.connect((host,port)) # ## Turn on the station # self.turnOn(cfg.station, flush=False) # # def turnOff(self, output): # '''turn0 turns the output port to 0. # input is the output port (0 or 1) # ''' # #try: # # self.s.recv(4098) # #except: # # print 'IP relay timed out' # if self.is_closed: # return # if output ==0: # self.s.send('\xff\xfa\x2c\x32\x20\xff\xf0') # elif output == 1: # self.s.send('\xff\xfa\x2c\x32\x21\xff\xf0') # # def turnOn(self, output, flush=True): # '''turn1 turns the output port to 1. # Input is the output port (0 or 1) # ''' # if self.is_closed: # return # if output==0: # if flush: # self.s.recv(4098) # self.s.send('\xff\xfa\x2c\x32\x10\xff\xf0') # # elif output==1: # if flush: # self.s.recv(4098) # self.s.send('\xff\xfa\x2c\x32\x11\xff\xf0') # # def close(self): # '''closes the connection with the IP relay and closes the relay''' # #self.turnOff(0) # self.turnOff(cfg.station) # self.s.close() # self.is_closed = True # # def connect(self): # '''closes the connection with the IP relay without closing the relay''' # self.is_closed = False # self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # self.s.connect((self.host,self.port)) # # def disconnect(self): # '''closes the connection with the IP relay without closing the relay''' # self.is_closed = True # self.s.close() # # # def areyouthere(self): # ''' routine to verify that the IPrelay is still here and responding. # Returns True is all is fine, else False # ''' # self.s.send('\xff\xf6') # time.sleep(0.1) # try: # resp = self.s.recv(1024) # return resp.find('<WEB51 HW 6.0 SW 3.1 SN 020701 #01>')>=0 # except socket.timeout: # return False
[docs]class Encoder(object): '''Class for communicating with the #$% encoder using SSI protocol. SSI stands for synchronous Serial Interface. It means that in order to get any signal from the encoder, the computer has to send a continuous pulse of N-bits. The response from the encoder is 20 bits long: * the first 8 are the multiturn information, * the last 12 the singleturn information. The communication are made in BitBang mode, and hex and were a pain to figure out. '''
[docs] def __init__(self): '''Initialise the encoder instance. Requires the ftd2xx.dll to be in the working directory. No libusb is required. ''' self.index = None self.is_closed = True try: self.ftlib = ct.windll.ftd2xx except WindowsError: return gype = ct.c_ulong() locId = ct.c_ulong() Id = ct.c_ulong() serialNumber = ct.create_string_buffer(16) description = ct.create_string_buffer(64) flag = ct.c_ulong() hand = ct.c_ulong() ndev = ct.c_ushort() FT_OK = self.ftlib.FT_CreateDeviceInfoList(ct.byref(ndev)) if (ndev.value >0)*(FT_OK ==0): for index in range(ndev.value): FT_OK = self.ftlib.FT_GetDeviceInfoDetail(index, ct.byref(flag), ct.byref(gype), \ ct.byref(Id), ct.byref(locId), serialNumber, description, \ ct.byref(hand)) if (serialNumber.value == cfg.encoder)*(FT_OK==0): self.index = index self.hand = hand self.openEncoder() self.setTimeout(timeout=20) self.setBaudrate(baudrate=460800) self.setBitMode(bitmode=4) break if self.index == None: print 'the encoder was not found' else: print 'No Serial device connected to computer' return
[docs] def setBitMode(self, bitmode=4): '''Sets the special bitmode for conversing with the encoder. * Normal bitmode is 0 * Asynchronous Bitbang is 1 * Synchronous Bitbang is 4 ''' mode = ct.c_ubyte(bitmode) mask = ct.c_ubyte(1) FT_OK = self.ftlib.FT_SetBitMode(self.hand, mask, mode) if FT_OK !=0: print 'Could not set the bitmode, error %g' %FT_OK
[docs] def setTimeout(self, timeout=20): ''' sets the timeouts in milliseconds ''' tout = ct.c_ulong(timeout) self.ftlib.FT_SetTimeouts(self.hand, tout, tout)
[docs] def openEncoder(self): ''' Opens the connection with the serial encoder''' #self.hand = ct.c_ulong() FT_OK = self.ftlib.FT_Open(self.index, ct.byref(self.hand)) if FT_OK !=0: print 'Opening connection with encoder failed, error %g' %FT_OK self.is_closed = False
[docs] def setBaudrate(self, baudrate=460800): '''Sets the baudrate for the communication. The encoder needs quite a high baudrate to even spit an answer. Default baudrate is 460800. Better not change it. ''' Baudrate = ct.c_ulong(baudrate) FT_OK = self.ftlib.FT_SetBaudRate(self.hand, Baudrate) if FT_OK != 0: print 'Setting the baudrate failed, error %g' %FT_OK
[docs] def purge(self): ''' Purge the buffer of the input and output pins. ''' clearbuff = ct.c_ulong(3) FT_OK = self.ftlib.FT_Purge(self.hand, clearbuff) if FT_OK != 0: print 'Did not purge buffers, error %g' %FT_OK
[docs] def WritetoEncoder(self): ''' Sending the pulse signal to the encoder ''' buff = ct.create_string_buffer('\x01\x00\x01\x00\x01\x00\x01\x00\x01\x00\x01\x00\x01\x00\x01\x00\x01\x00\x01\x00\x01\x00\x01\x00\x01\x00\x01\x00\x01\x00\x01\x00\x01\x00\x01\x00\x01\x00\x01\x00\x01\x00\x01\x00\x01\x00\x01\x00\x01\x00\x01\x00\x01\x00\x01\x00\x01\x00\x01\x00') written = ct.c_ulong() self.purge() FT_OK = self.ftlib.FT_Write(self.hand, buff, 57, ct.byref(written)) if FT_OK != 0: print 'Writing to buffer failed, error %g' %FT_OK
[docs] def ReadEncoder(self): ''' Reads the response of the encoder. The response is already translated in the information multiand single turns. :return mturns: the number of full turns :return sturns: the fractional number of turns. ''' read = ct.create_string_buffer(64) rbytes = ct.c_ulong() FT_OK = self.ftlib.FT_Read(self.hand, read, 64, ct.byref(rbytes)) if FT_OK!=0: print 'Read the encoder failed, error %g' %FT_OK return 0, 0 else: multiturn = 0 for j in range(8): bitval = (((ord(read[2*6+2*j]) & 2)/2)) multiturn = multiturn+bitval*(2**(7-j)) singleturn = 0 for j in range(12): bitval = (((ord(read[2*14+2*j]) & 2)/2)) singleturn= singleturn+bitval*(2**(11-j)) return multiturn, singleturn*360/4096
[docs] def ConverseWithEncoder(self): '''Wrapper around the writetoencoder and readencoder ''' if self.is_closed: return -1, 0 self.WritetoEncoder() return self.ReadEncoder()
[docs] def closeEncoder(self): '''Closes the conection to the encoder''' if not self.is_closed: self.ftlib.FT_Close(self.hand) self.is_closed = True
[docs]class RHSensor(object): """ Class for humidity/temperature sensor """ #from serial import SerialException ser = None serialNumber = None
[docs] def __init__(self, **kwargs): """ Initialize and open the connection with the humidity and temperature sensor. Keywords: - port= 'COM3' - idVendor = '0403 - idProduct = '6001' - serialNumber = 'FDXXXX' - timeout = 1 """ import time port = kwargs.pop('port', None) serialNumber = kwargs.pop('serialNumber', None) idVendor = kwargs.pop('idVendor', '0403') idProduct = kwargs.pop('idProduct', '6001') timeout = kwargs.pop('timeout', 2) self.port = port if idVendor: self.idVendor = idVendor if idProduct: self.idProduct = idProduct if timeout: self.timeout = timeout if serialNumber: self.serialNumber=serialNumber if not self.serialNumber: ports = self.findCOMPort() ## If more than one sensor is connected to the computer: for m in ports: self.port = m[0] self.serialNumber = m[1] print 'testing port %s ' %m[0] try: self.ser = openSerial(port=self.port, baudrate=9600, \ parity=serial.PARITY_NONE,xonxoff=False, \ stopbits=serial.STOPBITS_ONE,timeout=timeout) print 'connected to port %s' %m[0] if self.serialNumber == cfg.RHoutside: self.name = 'dome' elif self.serialNumber == cfg.RHinDome: self.name = 'electronics_box' elif self.serialNumber == cfg.RHinEnclosure: self.name = 'enclosure' else: self.name = 'nolocation' break except serial.SerialException: print 'Port %s already in use' %m[0] else: if self.serialNumber == cfg.RHoutside: self.name = 'dome' elif self.serialNumber == cfg.RHinDome: self.name = 'electronics_box' elif self.serialNumber == cfg.RHinEnclosure: self.name = 'enclosure' print 'Sensor name: '+self.name self.port = matchID2Port(serial=self.serialNumber) print self.port self.ser = openSerial(port=self.port,baudrate=9600, \ parity=serial.PARITY_NONE, \ xonxoff=False, stopbits=1, timeout=timeout)
def findCOMPort(self, inport='COM'): from os import name #ports = listSerialPorts() nisma = matchPort2ID(inport=inport, startstr='FTDIBUS\\') if nisma == None: return None ports, serID = nisma rhports = None if name == 'nt': # for Windows 7 rhports = [(p[0], s) for p,s in zip(ports, serID) if 'VID_'+self.idVendor in p[1] and 'PID_'+self.idProduct in p[1]] if name == 'posix': # for Mac rhports = [(p[0],s) for p,s in zip(ports, serID) if 'VID:PID='+self.idVendor.lstrip('0')+':'+self.idProduct.lstrip('0') in p[2]] if rhports == None: raise Exception('Sorry ... Unsupported operating system') if len(rhports) == 0: raise Exception('Cannot find the humidity/temperature sensor. Please check that it is plugged in and that Vendor ID = '+self.idVendor+' and Product ID = '+self.idProduct) return rhports
[docs] def isOpen(self): """Checks if the serial connection is opened or closed""" return self.ser.isOpen();
[docs] def open(self): """Opens the serial connection if it was closed""" if not isSerialOpen(self.ser): self.ser.open()
[docs] def close(self): """Closes the serial connection if it was closed""" if isSerialOpen(self.ser): self.ser.close()
[docs] def getHumidity(self): """Reads humidity from sensor Returns: - the humidity as a string """ from time import time, sleep import re writeSerial(self.ser,'H'+chr(13)) sleep(0.1) t0 = time() t1 = time() data = readSerial(self.ser); while data == '' and t1<t0+self.timeout: sleep(0.1) data = readSerial(self.ser) t1 = time() data2 = re.search('(^\d{1,2}\.\d)',data) if t1>t0+self.timeout: warnings.warn('Operation timed out ('+str(t1-t0)+'). Read data: '+data) return '' if data2: return data2.group(0) else: return '';
[docs] def getTemperature(self): """Reads temperature from sensor Args: Returns: the temperature as a string """ from time import time, sleep import re writeSerial(self.ser,'C\r') sleep(0.1) t0 = time() t1 = time() data = readSerial(self.ser); while data == '' and t1<t0+self.timeout: sleep(0.1) data = readSerial(self.ser) t1 = time() data2 = re.search('(^\d{1,2}\.\d)',data) if t1>t0+self.timeout: warnings.warn('Operation timed out ('+str(t1-t0)+'). Read data: '+data) return '' if data2: return data2.group(0) else: return '';
[docs] def getAll(self): """Reads humidity and temperature from sensor Args: :return tuple (hum,temp): where - hum, the humidity as a string - temp, the temperature as a string """ try: return (self.getTemperature(),self.getHumidity(), self.getStatus()) except serial.SerialException: return (0.0, 0.0, 'Denied')
[docs] def getCHumidity(self,dt=0,npts=100,verbose=False, plotData=False): """Gets a continuous reading of the humidity. :param int dt: time interval to wait in seconds between two measurements (default = 0) :param int npts: total number of points (default = 100) :returns tuple (t,hum): t, time as a float, and hum, humidity as a float """ import time from numpy import zeros tim = zeros(npts,dtype=float) hum = zeros(npts,dtype=float) if plotData: from pylab import ion, figure, ioff, gca, show, pause, plot, draw, xlabel, ylabel ion() figure() line, = plot([],[],'b+') xlabel('Time (in seconds)') ylabel('Humidity (in %)') ax = gca() j = 0 while j < npts: try: tim[j] = time.time() hum[j] = float(self.getHumidity()) except ValueError: pass time.sleep(dt) if verbose: print str(tim[j])+' '+str(hum[j])+'- '+str(int(float(j+1)/float(npts)*100))+"% completed" if plotData: line.set_xdata(tim[0:j+1]-tim[0]) line.set_ydata(hum[0:j+1]) ax.relim() ax.autoscale_view() draw() pause(0.01) j += 1 if plotData: ioff() show() return (tim,hum)
[docs] def getCTemperature(self,dt=0,npts=100,verbose=False, plotData=False): """ Gets a continuous reading of the temperature. :param int dt: time interval to wait in seconds between two measurements (default = 0) :param int npts: total number of points (default = 100) :return tuple (t,hum): t, time as a float, and temp, temperature as a float """ import time from numpy import zeros tim = zeros(npts,dtype=float) temp = zeros(npts,dtype=float) if plotData: from pylab import ion, figure, ioff, gca, show, pause, plot, draw, xlabel, ylabel ion() figure() line, = plot([],[],'b+') xlabel('Time (in seconds)') ylabel('Temperature (in Celsius)') ax = gca() j = 0 while j < npts: try: tim[j] = time.time() temp[j] = float(self.getTemperature()) except ValueError: pass time.sleep(dt) if verbose: print str(tim[j])+' '+str(temp[j])+'- '+str(int(float(j+1)/float(npts)*100))+"% completed" if plotData: line.set_xdata(tim[0:j+1]-tim[0]) line.set_ydata(temp[0:j+1]) ax.relim() ax.autoscale_view() draw() pause(0.01) j += 1 if plotData: ioff() show() return (tim,temp)
[docs] def getCAll(self,dt=0,npts=100,verbose=False, plotData=False): """ Gets a continuous reading of the humidity and temperature. :param int dt: time interval to wait in seconds between two measurements (default = 0) :param int npts: total number of points (default = 100) :return tuple (t,hum,temp): with: - t: time as a float - hum: humidity as a float - temp: temperature as a float """ import time from numpy import zeros tim = zeros(npts,dtype=float) temp = zeros(npts,dtype=float) hum = zeros(npts,dtype=float) #if plotData: #from pylab import ion, plot, draw, xlabel, ylabel, xlim, ylim #ion() #line, = plot([],[],'b+') #xlabel('Time (in seconds)') #ylabel('Temperature (in Celsius)') #xlim([0,npts*(dt+0.2)]) #ylim([0,40]) j = 0 while j < npts: try: tim[j] = time.time() temp[j] = float(self.getTemperature()) hum[j] = float(self.getHumidity()) except ValueError: pass time.sleep(dt) if verbose: print str(tim[j])+' '+str(hum[j])+' '+str(temp[j])+'- '+str(int(float(j+1)/float(npts)*100))+"% completed" #if plotData: # line.set_xdata(tim[0:j+1]-tim[0]) # line.set_ydata(temp[0:j+1]) # draw() j += 1 return (tim,hum,temp)
[docs] def getStatus(self): ''' Read from WMI the status of the device connected to a specific port. Answers are: - OK - Error - Degraded - Unknown - PredFail - Starting - Stopping - NonRecover - No Contacct - Lost Comm ''' import wmi u = wmi.WMI() cu = u.Win32_USBControllerDevice() for c in cu: tmp = c.Dependent.DeviceID if tmp.find(self.serialNumber) >0: return c.Dependent.Status
[docs] def getInfo(self): """Reads sensor model number and firmware version from humidity/temperature sensor """ from time import time, sleep writeSerial(self.ser,'ENQ\r') sleep(0.1) t0 = time() t1 = time() data = readSerial(self.ser); while data == '' and t1<t0+self.timeout: sleep(0.1) data = readSerial(self.ser) t1 = time() if t1>t0+self.timeout: warnings.warn('Operation timed out ('+str(t1-t0)+'). Read data: '+data) return '' return data
def listallports(): ''' Lists all the ports used on the computer''' import wmi u = wmi.WMI() cu = u.win32_USBControllerDevice() #for lu in cu: # tmp = lu.Dependent.Caption # startport = return len(cu) def matchPort2ID(inport = 'COM', startstr = "FTDIBUS"): ''' For devices which are connected to the computer, matchPort2USB matches the virtual serial port with the device serial nummer given by the provider. ''' import wmi u = wmi.WMI() cu = u.Win32_USBControllerDevice() port = [] serID = [] for item in cu: if str(item.Dependent.DeviceID).startswith(startstr): tmp = item.Dependent.Caption startport = tmp.find(inport) if startport >0: tmpport = str(tmp[startport:startport+4]) #port.append(tmpport) tmpo = item.Dependent.DeviceID tmpo = tmpo.lstrip(startstr) startID = tmpo.find('+N') endID = tmpo.find('A') port.append((tmpport, str(tmpo[0:startID]))) serID.append(str(tmpo[startID+1:endID])) return port, serID def matchID2Port(serial =None): ''' Matches a serial port to the given serial Number. ''' import wmi u = wmi.WMI() cu = u.win32_USBControllerDevice() for item in cu: startID = str(item.Dependent.DeviceID).find(serial) if startID > 0: tmpo = item.Dependent.Caption startport = tmpo.find('COM') if startport > 0: tmpport = str(tmpo[startport:startport+4]) return tmpport def listSerialPorts(): """ Lists all existing serial connections Args: None Returns: list of tuples (portstr,descr,info) with portstr: Port name descr: Description info: Additional information that might include vendor ID and product ID """ from serial.tools.list_ports import comports return list(comports()) def isSerialOpen(ser): """ Checks if a serial connection is open Args: - ser: the serial connection to check Returns: - True if the serial connection is open and False if it is not open or if it is of the wrong type """ from os import name if name == 'nt' and str(type(ser)) != "<class 'serial.serialwin32.Serial'>": raise TypeError('This is not a serial connection') if name == 'posix' and str(type(ser)) != "<class 'serial.serialposix.Serial'>": raise TypeError('This is not a serial connection') return ser.isOpen(); def openSerial(**kwargs): """ opens a serial connection Args: port: the name of the port to open (default = 'COM3') baudrate: the baudrate (default = 9600) parity: the parity (default = serial.PARITY_NONE) stopbits: the stopbits (default = 1) xonxoff: (default = False) timeout: the timeout (default = 0) Returns: The serial connection if successfully open """ import time port = kwargs.pop('port', cfg.defaultPort) baudrate = kwargs.pop('baudrate', cfg.defaultBaudrate) parity = kwargs.pop('parity', cfg.defaultParity) stopbits = kwargs.pop('stopbits', cfg.defaultStops) xonxoff = kwargs.pop('xonxoff', cfg.defaultXonXoff) timeout = kwargs.pop('timeout', cfg.defaultTimeout) ser = serial.Serial() #time.sleep(1) ser.port = port #time.sleep(1) print 'port selected' ser.baudrate = baudrate ser.parity = parity ser.stopbits = stopbits ser.xonxoff = xonxoff ser.timeout = timeout #time.sleep(1) print 'Is serial port %s open? %g' %(port,ser.isOpen()) if isSerialOpen(ser): print 'Cannot open the port %s. Closing it' %port ser.close() time.sleep(1) print ser.isOpen() #raise Exception("Cannot open the port "+port+". It is already open.") #time.sleep(1) ser.open() #time.sleep(1) print 'connected to port' if isSerialOpen(ser): #time.sleep(1) return ser else: raise Exception("The serial connection could not be opened") def closeSerial(ser): """ Closes a serial connection Args: - ser: the serial connection to close Returns: """ if isSerialOpen(ser): ser.close() def writeSerial(ser,cmd): """ writes a command to a serial connection Args: - ser: the serial connection to write to - cmd: the command to write (typically ending by '\r') Returns: - the number of bytes written """ if not isSerialOpen(ser): raise Exception("The serial connection is closed or not valid, you cannot write to it") return ser.write(cmd) def readSerial(ser): """ reads from a serial connection Args: ser: the serial connection to write to Returns: the string read from the device """ if not isSerialOpen(ser): raise Exception("The serial connection is closed or not valid, you cannot write to it") c = ser.read(size=1) str = '' while c != '': if c != '\r' and c != '\n': str = str+c c = ser.read(size=1) return str