"""
.. module:: mascara_electronics
.. moduleauthor:: Julien Spronck, Anna-Lea Lesage
.. created:: Feb 18, 2014
.. version:: 0.1
.. last change:: May 14. 2014 by Anna-Lea.
.. change history:
Added the function matchPort2ID, to fetch the serial number of the
deviced connected. Implemented this in findCOMPort. Now the instance
RHsensor does deliver a serial number!! -- Apr 2, 2014
Added listallport() to list all the ports in use on the computer -- Apr 3, 2014
Changed RHSensor.__init__() to run through the available ports and find
the RH sensors. Now 2 sensors can be connected on random ports and found. -- Apr 4, 2014
Added PDU class
This module is for external communication with devices such as humidity/temperature sensor, relays, leds and PDUs
Example:
To read the humidity sensor
``
>>> import Maselec
>>> rh = Maselec.RHSensor()
>>> rh.getHumidity() # relative humidity in %
'25.7'
>>> rh.getTemperature() # temperature in Celsius
'21.1'
>>> rh.serialNumber
'N14010707A'
``
"""
import serial
import warnings
import ctypes as ct
import socket
import time
import mascara_config as cfg
import mascara_credentials as creds
import telnetlib
[docs]class OnOffObject(object):
'''A class for all the on-off objects: relays, heaters, LEDs.
They are activated via relays.
'''
[docs] def __init__(self,daq, channel, inversed=False):
''' Initialization of the relay
:param daq: an instance of the DAQ class representing the DAQ
:param channel: digital channel of the object
*see :ref:`mascara_config` for existing channels*
'''
self.channel = channel
self.daq = daq
self.state = 'off'
self.inversed = inversed
self.turnOff()
[docs] def turnOn(self):
'''Switches on
'''
if self.inversed:
self.daq.dBitOut(self.channel,1)
else:
self.daq.dBitOut(self.channel,0)
self.state = 'on'
[docs] def turnOff(self):
'''Switches off
'''
if self.inversed:
self.daq.dBitOut(self.channel,0)
else:
self.daq.dBitOut(self.channel,1)
self.state = 'off'
[docs] def isOn(self):
'''is the switch on?
'''
return self.state == 'on'
[docs] def isOff(self):
'''is the switch on?
'''
return self.state == 'off'
[docs]class Relay(OnOffObject):
'''A subclass of the OnOffObject class for the relays:
'''
def __init__(self,daq,channel, inversed=False):
OnOffObject.__init__(self, daq, channel, inversed=inversed)
[docs]class CameraRelay(OnOffObject):
'''A subclass of the OnOffObject class for the relays:
'''
def __init__(self,daq,channel, inversed=False):
OnOffObject.__init__(self, daq, channel, inversed=inversed)
[docs]class Heater(OnOffObject):
'''A subclass of the OnOffObject class for the heaters:
'''
def __init__(self,daq,channel):
OnOffObject.__init__(self,daq,channel)
class LEDS(OnOffObject):
'''A subclass of the OnOffObject class for the LEDs:
'''
def __init__(self,daq,channel):
OnOffObject.__init__(self,daq,channel)
[docs]class LimitSwitch(object):
'''A class for the limit switches
'''
[docs] def __init__(self,daq,channelDOut,channelAIn):
'''Initilization of the limit switch
:param daq: an instance of the DAQ class representing the DAQ
:param channelDOut: channel of the digital output of the limit switch
:param channelAIn: channel of the analog input of the limit switch
'''
self.daq = daq
self.channelDOut = channelDOut
self.channelAIn = channelAIn
self.daq.dBitOut(self.channelDOut,1)
def isPushed(self):
self.daq.dBitOut(self.channelDOut,1)
return self.daq.vIn(self.channelAIn) < 2.5
[docs]class PowerSupply(object):
""" Class for the power supplies """
#from serial import SerialException
[docs] def __init__(self, daq, ID):
""" Initialize the power supply
:param daq: an instance from the DAQ class
:param ID: the ID of the power supply (1 or 2)
"""
if ID > 2 or ID < 1:
raise ValueError('ID can only be 1 or 2 (which of the two power supplies)')
self.id = ID
self.vMax = 32. # How many Volts can it deliver?
self.cMax = 20. # How many amps can it deliver?
self.chVIn = cfg.chPSVoltage # channel for input voltage
self.chCIn = cfg.chPSCurrent # channel for input current
self.daq = daq
if ID == 1:
self.chVOut = cfg.chPS1VoltageOut # channel for output voltage
self.chCOut = cfg.chPS1CurrentOut # channel for outpur current
elif ID == 2:
self.chVOut = cfg.chPS2VoltageOut # channel for output voltage
self.chCOut = cfg.chPS2CurrentOut # channel for outpur current
self.setVoltage(0)
self.setCurrent(0)
[docs] def getVoltage(self):
""" Gets output voltage of the power supply """
# There should be voltage dividers between the output of the power supply and the daq
return self.daq.vIn(self.chVOut)/10.*self.vMax
[docs] def getCurrent(self):
""" Gets output voltage of the power supply """
# There should be voltage dividers between the output of the power supply and the daq
return self.daq.vIn(self.chCOut)/10.*self.cMax
[docs] def setVoltage(self,V):
""" Sets output voltage of the power supply
Do NOT go higher than 24V, the peltiers will not like it
"""
# There should be voltage doublers between the input of the power supply and the daq
if V < 0:
V = 0
if V > 24: # DO NOT put more than 24V out of the power supply (it can go to 32V)
# The peltiers won't like it
V = 24.
self.daq.vOut(self.chVIn,V/self.vMax*5.)
[docs] def setCurrent(self,C):
""" Sets output current of the power supply """
# There should be voltage doublers between the input of the power supply and the daq
if C < 0:
C = 0
if C > self.cMax:
C = self.cMax
self.daq.vOut(self.chCIn,C/self.cMax*5.)
[docs] def turnOn(self,V=24,C=20):
"""Sets the voltage of the power supply to 24V, current to 0 A"""
self.setVoltage(V)
self.setCurrent(C)
[docs] def turnOff(self):
"""Sets the voltage of the power supply to 0V, current to 0 A"""
self.setCurrent(0)
[docs]class PDU(object):
[docs] def __init__(self, ip, username= creds.pdu_username, password= creds.pdu_password, timeout= 5):
'''Initilization of the PDU. The PDUs are accessed using Telnet protocol, thus they
have each a username and password to access the PDU.
:param ip: the IP address of the PDU
:keyword str username: the identifier of the pdu
:keyword str password: the password to access the pdu
:keyword int timeout: timeout for reading the PDU buffer
'''
self.ip = ip
self.username = username
self.password = password
self.timeout= timeout
self.connect()
self.attempts = 0
[docs] def connect(self):
'''Connect to the PDU using telnet
'''
try:
self.tn = telnetlib.Telnet(self.ip, timeout= self.timeout)
self.tn.read_until('User Name : ', timeout= self.timeout)
self.tn.write(self.username+'\r')
self.tn.read_until('Password : ', timeout= self.timeout)
self.tn.write(self.password+' -c\r')
self.tn.read_until('APC> ',timeout= self.timeout)
self.connected = True
except Exception as err:
print 'Connection error: '+err.message
self.connected = False
[docs] def close(self):
'''close the connection to the PDU
'''
if self.connected:
self.tn.close()
self.connected = False
[docs] def turn_on(self, outlet):
'''Turn on a given outlet
'''
self.trysend('on '+str(outlet),"turn on/off")
[docs] def turn_off(self, outlet):
'''Turn off a given outlet
'''
self.trysend('off '+str(outlet),"turn on/off")
[docs] def reboot(self, outlet):
'''Reboot a given outlet
'''
self.trysend('reboot '+str(outlet),"reboot")
time.sleep(30)
[docs] def turn_all_on(self):
'''Turn on all outlets
'''
self.trysend('on all',"turn on/off")
[docs] def turn_all_off(self):
'''Turn off all outlets
'''
self.trysend('off all',"turn on/off")
[docs] def reboot_all(self):
'''Turn off all outlets
'''
self.trysend('reboot all',"reboot")
time.sleep(30)
[docs] def get_status(self):
'''Get status of all outlets
'''
st = self.trysend('status all',"get status")
if not self.connected:
st = [' 1: ?',' 2: ?',' 3: ?',' 4: ?',' 5: ?',' 6: ?',' 7: ?',' 8: ?']
return st
[docs] def get_current(self):
'''Get total current draw in Amps
'''
return self.trysend('current',"get current")[0]
[docs] def get_power(self):
'''Get total power load
'''
return self.trysend('power',"get power")[0]
[docs] def trysend(self, cmd, msg):
'''Tries sending a command and try again once if it failed.
'''
try:
ans = self.send(cmd)
if self.attempts == 1:
print 'Success!'
self.attempts = 0
self.connected = True
except Exception as err:
self.attempts += 1
self.close()
self.connect()
if self.attempts == 1:
print "Attempt to "+msg+" failed. Now trying again ..."
ans = self.trysend(cmd, msg)
else:
try:
print "Attempt to "+msg+" failed: {0}".format(err.strerror)
except:
print "Attempt to "+msg+" failed again."
ans = None
self.connected = False
return ans
[docs] def send(self, cmd):
'''Sends a command (without carriage return) and returns the result as an array of strings
'''
self.tn.write(cmd+'\r')
ans = self.tn.read_until('APC> ',timeout= self.timeout)
ans = ans.split('\r\n')[2:-1]
return ans
# class IPrelay():
# '''Creates a interface for connecting and deconnecting the IP relay'''
# def __init__(self, host, port=23, timeout=3):
# '''IPrelay is an interface for controlling the IPrelay HWg-ER02b.
# Inputs: host, '192.168.0.xx'
# port, default is 23
# timeout, default is 3 seconds
# '''
#
# ##socket.setdefaulttimeout(timeout)
# ## The first run is to close any previously opened connection
# self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# self.is_closed = False
# self.host = host
# self.port = port
#
# try:
# self.s.connect((host, port))
# except socket.timeout:
# self.s.close()
# self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# self.s.connect((host,port))
# ## Turn on the station
# self.turnOn(cfg.station, flush=False)
#
# def turnOff(self, output):
# '''turn0 turns the output port to 0.
# input is the output port (0 or 1)
# '''
# #try:
# # self.s.recv(4098)
# #except:
# # print 'IP relay timed out'
# if self.is_closed:
# return
# if output ==0:
# self.s.send('\xff\xfa\x2c\x32\x20\xff\xf0')
# elif output == 1:
# self.s.send('\xff\xfa\x2c\x32\x21\xff\xf0')
#
# def turnOn(self, output, flush=True):
# '''turn1 turns the output port to 1.
# Input is the output port (0 or 1)
# '''
# if self.is_closed:
# return
# if output==0:
# if flush:
# self.s.recv(4098)
# self.s.send('\xff\xfa\x2c\x32\x10\xff\xf0')
#
# elif output==1:
# if flush:
# self.s.recv(4098)
# self.s.send('\xff\xfa\x2c\x32\x11\xff\xf0')
#
# def close(self):
# '''closes the connection with the IP relay and closes the relay'''
# #self.turnOff(0)
# self.turnOff(cfg.station)
# self.s.close()
# self.is_closed = True
#
# def connect(self):
# '''closes the connection with the IP relay without closing the relay'''
# self.is_closed = False
# self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# self.s.connect((self.host,self.port))
#
# def disconnect(self):
# '''closes the connection with the IP relay without closing the relay'''
# self.is_closed = True
# self.s.close()
#
#
# def areyouthere(self):
# ''' routine to verify that the IPrelay is still here and responding.
# Returns True is all is fine, else False
# '''
# self.s.send('\xff\xf6')
# time.sleep(0.1)
# try:
# resp = self.s.recv(1024)
# return resp.find('<WEB51 HW 6.0 SW 3.1 SN 020701 #01>')>=0
# except socket.timeout:
# return False
[docs]class Encoder(object):
'''Class for communicating with the #$% encoder using SSI protocol.
SSI stands for synchronous Serial Interface. It means that in order
to get any signal from the encoder, the computer has to send a continuous
pulse of N-bits. The response from the encoder is 20 bits long:
* the first 8 are the multiturn information,
* the last 12 the singleturn information.
The communication are made in BitBang mode, and hex and were a pain to
figure out.
'''
[docs] def __init__(self):
'''Initialise the encoder instance. Requires the ftd2xx.dll to be in the
working directory. No libusb is required.
'''
self.index = None
self.is_closed = True
try:
self.ftlib = ct.windll.ftd2xx
except WindowsError:
return
gype = ct.c_ulong()
locId = ct.c_ulong()
Id = ct.c_ulong()
serialNumber = ct.create_string_buffer(16)
description = ct.create_string_buffer(64)
flag = ct.c_ulong()
hand = ct.c_ulong()
ndev = ct.c_ushort()
FT_OK = self.ftlib.FT_CreateDeviceInfoList(ct.byref(ndev))
if (ndev.value >0)*(FT_OK ==0):
for index in range(ndev.value):
FT_OK = self.ftlib.FT_GetDeviceInfoDetail(index, ct.byref(flag), ct.byref(gype), \
ct.byref(Id), ct.byref(locId), serialNumber, description, \
ct.byref(hand))
if (serialNumber.value == cfg.encoder)*(FT_OK==0):
self.index = index
self.hand = hand
self.openEncoder()
self.setTimeout(timeout=20)
self.setBaudrate(baudrate=460800)
self.setBitMode(bitmode=4)
break
if self.index == None:
print 'the encoder was not found'
else:
print 'No Serial device connected to computer'
return
[docs] def setBitMode(self, bitmode=4):
'''Sets the special bitmode for conversing with the encoder.
* Normal bitmode is 0
* Asynchronous Bitbang is 1
* Synchronous Bitbang is 4
'''
mode = ct.c_ubyte(bitmode)
mask = ct.c_ubyte(1)
FT_OK = self.ftlib.FT_SetBitMode(self.hand, mask, mode)
if FT_OK !=0:
print 'Could not set the bitmode, error %g' %FT_OK
[docs] def setTimeout(self, timeout=20):
''' sets the timeouts in milliseconds
'''
tout = ct.c_ulong(timeout)
self.ftlib.FT_SetTimeouts(self.hand, tout, tout)
[docs] def openEncoder(self):
''' Opens the connection with the serial encoder'''
#self.hand = ct.c_ulong()
FT_OK = self.ftlib.FT_Open(self.index, ct.byref(self.hand))
if FT_OK !=0:
print 'Opening connection with encoder failed, error %g' %FT_OK
self.is_closed = False
[docs] def setBaudrate(self, baudrate=460800):
'''Sets the baudrate for the communication. The encoder needs quite a high
baudrate to even spit an answer.
Default baudrate is 460800. Better not change it.
'''
Baudrate = ct.c_ulong(baudrate)
FT_OK = self.ftlib.FT_SetBaudRate(self.hand, Baudrate)
if FT_OK != 0:
print 'Setting the baudrate failed, error %g' %FT_OK
[docs] def purge(self):
''' Purge the buffer of the input and output pins.
'''
clearbuff = ct.c_ulong(3)
FT_OK = self.ftlib.FT_Purge(self.hand, clearbuff)
if FT_OK != 0:
print 'Did not purge buffers, error %g' %FT_OK
[docs] def WritetoEncoder(self):
''' Sending the pulse signal to the encoder
'''
buff = ct.create_string_buffer('\x01\x00\x01\x00\x01\x00\x01\x00\x01\x00\x01\x00\x01\x00\x01\x00\x01\x00\x01\x00\x01\x00\x01\x00\x01\x00\x01\x00\x01\x00\x01\x00\x01\x00\x01\x00\x01\x00\x01\x00\x01\x00\x01\x00\x01\x00\x01\x00\x01\x00\x01\x00\x01\x00\x01\x00\x01\x00\x01\x00')
written = ct.c_ulong()
self.purge()
FT_OK = self.ftlib.FT_Write(self.hand, buff, 57, ct.byref(written))
if FT_OK != 0:
print 'Writing to buffer failed, error %g' %FT_OK
[docs] def ReadEncoder(self):
''' Reads the response of the encoder.
The response is already translated in the information multiand single turns.
:return mturns: the number of full turns
:return sturns: the fractional number of turns.
'''
read = ct.create_string_buffer(64)
rbytes = ct.c_ulong()
FT_OK = self.ftlib.FT_Read(self.hand, read, 64, ct.byref(rbytes))
if FT_OK!=0:
print 'Read the encoder failed, error %g' %FT_OK
return 0, 0
else:
multiturn = 0
for j in range(8):
bitval = (((ord(read[2*6+2*j]) & 2)/2))
multiturn = multiturn+bitval*(2**(7-j))
singleturn = 0
for j in range(12):
bitval = (((ord(read[2*14+2*j]) & 2)/2))
singleturn= singleturn+bitval*(2**(11-j))
return multiturn, singleturn*360/4096
[docs] def ConverseWithEncoder(self):
'''Wrapper around the writetoencoder and readencoder
'''
if self.is_closed:
return -1, 0
self.WritetoEncoder()
return self.ReadEncoder()
[docs] def closeEncoder(self):
'''Closes the conection to the encoder'''
if not self.is_closed:
self.ftlib.FT_Close(self.hand)
self.is_closed = True
[docs]class RHSensor(object):
""" Class for humidity/temperature sensor """
#from serial import SerialException
ser = None
serialNumber = None
[docs] def __init__(self, **kwargs):
""" Initialize and open the connection with the humidity
and temperature sensor.
Keywords:
- port= 'COM3'
- idVendor = '0403
- idProduct = '6001'
- serialNumber = 'FDXXXX'
- timeout = 1
"""
import time
port = kwargs.pop('port', None)
serialNumber = kwargs.pop('serialNumber', None)
idVendor = kwargs.pop('idVendor', '0403')
idProduct = kwargs.pop('idProduct', '6001')
timeout = kwargs.pop('timeout', 2)
self.port = port
if idVendor:
self.idVendor = idVendor
if idProduct:
self.idProduct = idProduct
if timeout:
self.timeout = timeout
if serialNumber:
self.serialNumber=serialNumber
if not self.serialNumber:
ports = self.findCOMPort()
## If more than one sensor is connected to the computer:
for m in ports:
self.port = m[0]
self.serialNumber = m[1]
print 'testing port %s ' %m[0]
try:
self.ser = openSerial(port=self.port, baudrate=9600, \
parity=serial.PARITY_NONE,xonxoff=False, \
stopbits=serial.STOPBITS_ONE,timeout=timeout)
print 'connected to port %s' %m[0]
if self.serialNumber == cfg.RHoutside:
self.name = 'dome'
elif self.serialNumber == cfg.RHinDome:
self.name = 'electronics_box'
elif self.serialNumber == cfg.RHinEnclosure:
self.name = 'enclosure'
else:
self.name = 'nolocation'
break
except serial.SerialException:
print 'Port %s already in use' %m[0]
else:
if self.serialNumber == cfg.RHoutside:
self.name = 'dome'
elif self.serialNumber == cfg.RHinDome:
self.name = 'electronics_box'
elif self.serialNumber == cfg.RHinEnclosure:
self.name = 'enclosure'
print 'Sensor name: '+self.name
self.port = matchID2Port(serial=self.serialNumber)
print self.port
self.ser = openSerial(port=self.port,baudrate=9600, \
parity=serial.PARITY_NONE, \
xonxoff=False, stopbits=1, timeout=timeout)
def findCOMPort(self, inport='COM'):
from os import name
#ports = listSerialPorts()
nisma = matchPort2ID(inport=inport, startstr='FTDIBUS\\')
if nisma == None:
return None
ports, serID = nisma
rhports = None
if name == 'nt': # for Windows 7
rhports = [(p[0], s) for p,s in zip(ports, serID) if 'VID_'+self.idVendor in p[1] and 'PID_'+self.idProduct in p[1]]
if name == 'posix': # for Mac
rhports = [(p[0],s) for p,s in zip(ports, serID) if 'VID:PID='+self.idVendor.lstrip('0')+':'+self.idProduct.lstrip('0') in p[2]]
if rhports == None:
raise Exception('Sorry ... Unsupported operating system')
if len(rhports) == 0:
raise Exception('Cannot find the humidity/temperature sensor. Please check that it is plugged in and that Vendor ID = '+self.idVendor+' and Product ID = '+self.idProduct)
return rhports
[docs] def isOpen(self):
"""Checks if the serial connection is opened or closed"""
return self.ser.isOpen();
[docs] def open(self):
"""Opens the serial connection if it was closed"""
if not isSerialOpen(self.ser):
self.ser.open()
[docs] def close(self):
"""Closes the serial connection if it was closed"""
if isSerialOpen(self.ser):
self.ser.close()
[docs] def getHumidity(self):
"""Reads humidity from sensor
Returns:
- the humidity as a string
"""
from time import time, sleep
import re
writeSerial(self.ser,'H'+chr(13))
sleep(0.1)
t0 = time()
t1 = time()
data = readSerial(self.ser);
while data == '' and t1<t0+self.timeout:
sleep(0.1)
data = readSerial(self.ser)
t1 = time()
data2 = re.search('(^\d{1,2}\.\d)',data)
if t1>t0+self.timeout:
warnings.warn('Operation timed out ('+str(t1-t0)+'). Read data: '+data)
return ''
if data2:
return data2.group(0)
else:
return '';
[docs] def getTemperature(self):
"""Reads temperature from sensor
Args:
Returns:
the temperature as a string
"""
from time import time, sleep
import re
writeSerial(self.ser,'C\r')
sleep(0.1)
t0 = time()
t1 = time()
data = readSerial(self.ser);
while data == '' and t1<t0+self.timeout:
sleep(0.1)
data = readSerial(self.ser)
t1 = time()
data2 = re.search('(^\d{1,2}\.\d)',data)
if t1>t0+self.timeout:
warnings.warn('Operation timed out ('+str(t1-t0)+'). Read data: '+data)
return ''
if data2:
return data2.group(0)
else:
return '';
[docs] def getAll(self):
"""Reads humidity and temperature from sensor
Args:
:return tuple (hum,temp): where
- hum, the humidity as a string
- temp, the temperature as a string
"""
try:
return (self.getTemperature(),self.getHumidity(), self.getStatus())
except serial.SerialException:
return (0.0, 0.0, 'Denied')
[docs] def getCHumidity(self,dt=0,npts=100,verbose=False, plotData=False):
"""Gets a continuous reading of the humidity.
:param int dt: time interval to wait in seconds between two measurements (default = 0)
:param int npts: total number of points (default = 100)
:returns tuple (t,hum): t, time as a float, and hum, humidity as a float
"""
import time
from numpy import zeros
tim = zeros(npts,dtype=float)
hum = zeros(npts,dtype=float)
if plotData:
from pylab import ion, figure, ioff, gca, show, pause, plot, draw, xlabel, ylabel
ion()
figure()
line, = plot([],[],'b+')
xlabel('Time (in seconds)')
ylabel('Humidity (in %)')
ax = gca()
j = 0
while j < npts:
try:
tim[j] = time.time()
hum[j] = float(self.getHumidity())
except ValueError:
pass
time.sleep(dt)
if verbose:
print str(tim[j])+' '+str(hum[j])+'- '+str(int(float(j+1)/float(npts)*100))+"% completed"
if plotData:
line.set_xdata(tim[0:j+1]-tim[0])
line.set_ydata(hum[0:j+1])
ax.relim()
ax.autoscale_view()
draw()
pause(0.01)
j += 1
if plotData:
ioff()
show()
return (tim,hum)
[docs] def getCTemperature(self,dt=0,npts=100,verbose=False, plotData=False):
"""
Gets a continuous reading of the temperature.
:param int dt: time interval to wait in seconds between two measurements (default = 0)
:param int npts: total number of points (default = 100)
:return tuple (t,hum): t, time as a float, and temp, temperature as a float
"""
import time
from numpy import zeros
tim = zeros(npts,dtype=float)
temp = zeros(npts,dtype=float)
if plotData:
from pylab import ion, figure, ioff, gca, show, pause, plot, draw, xlabel, ylabel
ion()
figure()
line, = plot([],[],'b+')
xlabel('Time (in seconds)')
ylabel('Temperature (in Celsius)')
ax = gca()
j = 0
while j < npts:
try:
tim[j] = time.time()
temp[j] = float(self.getTemperature())
except ValueError:
pass
time.sleep(dt)
if verbose:
print str(tim[j])+' '+str(temp[j])+'- '+str(int(float(j+1)/float(npts)*100))+"% completed"
if plotData:
line.set_xdata(tim[0:j+1]-tim[0])
line.set_ydata(temp[0:j+1])
ax.relim()
ax.autoscale_view()
draw()
pause(0.01)
j += 1
if plotData:
ioff()
show()
return (tim,temp)
[docs] def getCAll(self,dt=0,npts=100,verbose=False, plotData=False):
"""
Gets a continuous reading of the humidity and temperature.
:param int dt: time interval to wait in seconds between two measurements (default = 0)
:param int npts: total number of points (default = 100)
:return tuple (t,hum,temp): with:
- t: time as a float
- hum: humidity as a float
- temp: temperature as a float
"""
import time
from numpy import zeros
tim = zeros(npts,dtype=float)
temp = zeros(npts,dtype=float)
hum = zeros(npts,dtype=float)
#if plotData:
#from pylab import ion, plot, draw, xlabel, ylabel, xlim, ylim
#ion()
#line, = plot([],[],'b+')
#xlabel('Time (in seconds)')
#ylabel('Temperature (in Celsius)')
#xlim([0,npts*(dt+0.2)])
#ylim([0,40])
j = 0
while j < npts:
try:
tim[j] = time.time()
temp[j] = float(self.getTemperature())
hum[j] = float(self.getHumidity())
except ValueError:
pass
time.sleep(dt)
if verbose:
print str(tim[j])+' '+str(hum[j])+' '+str(temp[j])+'- '+str(int(float(j+1)/float(npts)*100))+"% completed"
#if plotData:
# line.set_xdata(tim[0:j+1]-tim[0])
# line.set_ydata(temp[0:j+1])
# draw()
j += 1
return (tim,hum,temp)
[docs] def getStatus(self):
'''
Read from WMI the status of the device connected to a specific port.
Answers are:
- OK
- Error
- Degraded
- Unknown
- PredFail
- Starting
- Stopping
- NonRecover
- No Contacct
- Lost Comm
'''
import wmi
u = wmi.WMI()
cu = u.Win32_USBControllerDevice()
for c in cu:
tmp = c.Dependent.DeviceID
if tmp.find(self.serialNumber) >0:
return c.Dependent.Status
[docs] def getInfo(self):
"""Reads sensor model number and firmware version from humidity/temperature sensor """
from time import time, sleep
writeSerial(self.ser,'ENQ\r')
sleep(0.1)
t0 = time()
t1 = time()
data = readSerial(self.ser);
while data == '' and t1<t0+self.timeout:
sleep(0.1)
data = readSerial(self.ser)
t1 = time()
if t1>t0+self.timeout:
warnings.warn('Operation timed out ('+str(t1-t0)+'). Read data: '+data)
return ''
return data
def listallports():
''' Lists all the ports used on the computer'''
import wmi
u = wmi.WMI()
cu = u.win32_USBControllerDevice()
#for lu in cu:
# tmp = lu.Dependent.Caption
# startport =
return len(cu)
def matchPort2ID(inport = 'COM', startstr = "FTDIBUS"):
''' For devices which are connected to the computer, matchPort2USB matches the
virtual serial port with the device serial nummer given by the provider.
'''
import wmi
u = wmi.WMI()
cu = u.Win32_USBControllerDevice()
port = []
serID = []
for item in cu:
if str(item.Dependent.DeviceID).startswith(startstr):
tmp = item.Dependent.Caption
startport = tmp.find(inport)
if startport >0:
tmpport = str(tmp[startport:startport+4])
#port.append(tmpport)
tmpo = item.Dependent.DeviceID
tmpo = tmpo.lstrip(startstr)
startID = tmpo.find('+N')
endID = tmpo.find('A')
port.append((tmpport, str(tmpo[0:startID])))
serID.append(str(tmpo[startID+1:endID]))
return port, serID
def matchID2Port(serial =None):
''' Matches a serial port to the given serial Number.
'''
import wmi
u = wmi.WMI()
cu = u.win32_USBControllerDevice()
for item in cu:
startID = str(item.Dependent.DeviceID).find(serial)
if startID > 0:
tmpo = item.Dependent.Caption
startport = tmpo.find('COM')
if startport > 0:
tmpport = str(tmpo[startport:startport+4])
return tmpport
def listSerialPorts():
""" Lists all existing serial connections
Args: None
Returns: list of tuples (portstr,descr,info) with
portstr: Port name
descr: Description
info: Additional information that might include vendor ID and product ID
"""
from serial.tools.list_ports import comports
return list(comports())
def isSerialOpen(ser):
""" Checks if a serial connection is open
Args: - ser: the serial connection to check
Returns: - True if the serial connection is open and False if it is not open or if it is of the wrong type
"""
from os import name
if name == 'nt' and str(type(ser)) != "<class 'serial.serialwin32.Serial'>":
raise TypeError('This is not a serial connection')
if name == 'posix' and str(type(ser)) != "<class 'serial.serialposix.Serial'>":
raise TypeError('This is not a serial connection')
return ser.isOpen();
def openSerial(**kwargs):
""" opens a serial connection
Args:
port: the name of the port to open (default = 'COM3')
baudrate: the baudrate (default = 9600)
parity: the parity (default = serial.PARITY_NONE)
stopbits: the stopbits (default = 1)
xonxoff: (default = False)
timeout: the timeout (default = 0)
Returns:
The serial connection if successfully open
"""
import time
port = kwargs.pop('port', cfg.defaultPort)
baudrate = kwargs.pop('baudrate', cfg.defaultBaudrate)
parity = kwargs.pop('parity', cfg.defaultParity)
stopbits = kwargs.pop('stopbits', cfg.defaultStops)
xonxoff = kwargs.pop('xonxoff', cfg.defaultXonXoff)
timeout = kwargs.pop('timeout', cfg.defaultTimeout)
ser = serial.Serial()
#time.sleep(1)
ser.port = port
#time.sleep(1)
print 'port selected'
ser.baudrate = baudrate
ser.parity = parity
ser.stopbits = stopbits
ser.xonxoff = xonxoff
ser.timeout = timeout
#time.sleep(1)
print 'Is serial port %s open? %g' %(port,ser.isOpen())
if isSerialOpen(ser):
print 'Cannot open the port %s. Closing it' %port
ser.close()
time.sleep(1)
print ser.isOpen()
#raise Exception("Cannot open the port "+port+". It is already open.")
#time.sleep(1)
ser.open()
#time.sleep(1)
print 'connected to port'
if isSerialOpen(ser):
#time.sleep(1)
return ser
else:
raise Exception("The serial connection could not be opened")
def closeSerial(ser):
""" Closes a serial connection
Args: - ser: the serial connection to close
Returns:
"""
if isSerialOpen(ser):
ser.close()
def writeSerial(ser,cmd):
""" writes a command to a serial connection
Args: - ser: the serial connection to write to
- cmd: the command to write (typically ending by '\r')
Returns: - the number of bytes written
"""
if not isSerialOpen(ser):
raise Exception("The serial connection is closed or not valid, you cannot write to it")
return ser.write(cmd)
def readSerial(ser):
""" reads from a serial connection
Args:
ser: the serial connection to write to
Returns:
the string read from the device
"""
if not isSerialOpen(ser):
raise Exception("The serial connection is closed or not valid, you cannot write to it")
c = ser.read(size=1)
str = ''
while c != '':
if c != '\r' and c != '\n':
str = str+c
c = ser.read(size=1)
return str