From 8cb8d76f352f17885fb177369fdc872189941b3b Mon Sep 17 00:00:00 2001 From: shrkey Date: Mon, 12 Sep 2016 18:41:28 +0100 Subject: [PATCH] quick update test --- 640steppertest.py | 84 ++++++++ darkwater_640/I2C.py | 4 +- darkwater_640/__init__.py | 2 +- darkwater_640/darkwater_640.py | 353 ++++++++++++++++++--------------- darkwater_640/smbus.py | 294 +++++++++++++++++++++++++++ 5 files changed, 572 insertions(+), 165 deletions(-) create mode 100644 640steppertest.py create mode 100644 darkwater_640/smbus.py diff --git a/640steppertest.py b/640steppertest.py new file mode 100644 index 0000000..568d40f --- /dev/null +++ b/640steppertest.py @@ -0,0 +1,84 @@ +import time +from darkwater_640.darkwater_640 import dw_Controller, dw_Servo, dw_Motor + +dw = dw_Controller( addr=0x60 ) +m1 = dw.getMotor(1) +m2 = dw.getMotor(2) +m3 = dw.getMotor(3) +m4 = dw.getMotor(4) +m5 = dw.getMotor(5) +m6 = dw.getMotor(6) + +m1.off() +m2.off() +m3.off() +m4.off() +m5.off() +m6.off() +time.sleep(1) + +##time.sleep(10) +print "Set forward - " +print "Motor 1" +m1.setMotorSpeed(255) +time.sleep(1) +print "Motor 2" +m2.setMotorSpeed(255) +time.sleep(1) +print "Motor 3" +m3.setMotorSpeed(255) +time.sleep(1) +print "Motor 4" +m4.setMotorSpeed(255) +time.sleep(1) +print "Motor 5" +m5.setMotorSpeed(255) +time.sleep(1) +print "Motor 6" +m6.setMotorSpeed(255) +time.sleep(1) +print "Stopping - " +print "Motor 1" +m1.setMotorSpeed(0) +time.sleep(1) +print "Motor 2" +m2.setMotorSpeed(0) +time.sleep(1) +print "Motor 3" +m3.setMotorSpeed(0) +time.sleep(1) +print "Motor 4" +m4.setMotorSpeed(0) +time.sleep(1) +print "Motor 5" +m5.setMotorSpeed(0) +time.sleep(1) +print "Motor 6" +m6.setMotorSpeed(0) +time.sleep(1) +print "Set reverse - " +print "Motor 1" +m1.setMotorSpeed(-255) +time.sleep(1) +print "Motor 2" +m2.setMotorSpeed(-255) +time.sleep(1) +print "Motor 3" +m3.setMotorSpeed(-255) +time.sleep(1) +print "Motor 4" +m4.setMotorSpeed(-255) +time.sleep(1) +print "Motor 5" +m5.setMotorSpeed(-255) +time.sleep(1) +print "Motor 6" +m6.setMotorSpeed(-255) +time.sleep(1) +print "All off" +m1.off() +m2.off() +m3.off() +m4.off() +m5.off() +m6.off() diff --git a/darkwater_640/I2C.py b/darkwater_640/I2C.py index 6097ed5..a45a21d 100644 --- a/darkwater_640/I2C.py +++ b/darkwater_640/I2C.py @@ -93,8 +93,8 @@ class Device(object): self._address = address if i2c_interface is None: # Use pure python I2C interface if none is specified. - import Adafruit_PureIO.smbus - self._bus = Adafruit_PureIO.smbus.SMBus(busnum) + import smbus + self._bus = smbus.SMBus(busnum) else: # Otherwise use the provided class to create an smbus interface. self._bus = i2c_interface(busnum) diff --git a/darkwater_640/__init__.py b/darkwater_640/__init__.py index 43f46fb..9d5f80d 100644 --- a/darkwater_640/__init__.py +++ b/darkwater_640/__init__.py @@ -1 +1 @@ -from .darkwater_640 import dw_Motor, dw_Controller +from .darkwater_640 import dw_Motor, dw_Servo, dw_Controller diff --git a/darkwater_640/darkwater_640.py b/darkwater_640/darkwater_640.py index 80ed6e1..a770642 100644 --- a/darkwater_640/darkwater_640.py +++ b/darkwater_640/darkwater_640.py @@ -5,167 +5,194 @@ from PCA9685 import PCA9685 import time import math -# class Adafruit_StepperMotor: -# MICROSTEPS = 8 -# MICROSTEP_CURVE = [0, 50, 98, 142, 180, 212, 236, 250, 255] -# -# #MICROSTEPS = 16 -# # a sinusoidal curve NOT LINEAR! -# #MICROSTEP_CURVE = [0, 25, 50, 74, 98, 120, 141, 162, 180, 197, 212, 225, 236, 244, 250, 253, 255] -# -# def __init__(self, controller, num, steps=200): -# self.MC = controller -# self.revsteps = steps -# self.motornum = num -# self.sec_per_step = 0.1 -# self.steppingcounter = 0 -# self.currentstep = 0 -# -# num -= 1 -# -# if (num == 0): -# self.PWMA = 8 -# self.AIN2 = 9 -# self.AIN1 = 10 -# self.PWMB = 13 -# self.BIN2 = 12 -# self.BIN1 = 11 -# elif (num == 1): -# self.PWMA = 2 -# self.AIN2 = 3 -# self.AIN1 = 4 -# self.PWMB = 7 -# self.BIN2 = 6 -# self.BIN1 = 5 -# else: -# raise NameError('MotorHAT Stepper must be between 1 and 2 inclusive') -# -# def setSpeed(self, rpm): -# self.sec_per_step = 60.0 / (self.revsteps * rpm) -# self.steppingcounter = 0 -# -# def oneStep(self, dir, style): -# pwm_a = pwm_b = 255 -# -# # first determine what sort of stepping procedure we're up to -# if (style == Adafruit_MotorHAT.SINGLE): -# if ((self.currentstep/(self.MICROSTEPS/2)) % 2): -# # we're at an odd step, weird -# if (dir == Adafruit_MotorHAT.FORWARD): -# self.currentstep += self.MICROSTEPS/2 -# else: -# self.currentstep -= self.MICROSTEPS/2 -# else: -# # go to next even step -# if (dir == Adafruit_MotorHAT.FORWARD): -# self.currentstep += self.MICROSTEPS -# else: -# self.currentstep -= self.MICROSTEPS -# if (style == Adafruit_MotorHAT.DOUBLE): -# if not (self.currentstep/(self.MICROSTEPS/2) % 2): -# # we're at an even step, weird -# if (dir == Adafruit_MotorHAT.FORWARD): -# self.currentstep += self.MICROSTEPS/2 -# else: -# self.currentstep -= self.MICROSTEPS/2 -# else: -# # go to next odd step -# if (dir == Adafruit_MotorHAT.FORWARD): -# self.currentstep += self.MICROSTEPS -# else: -# self.currentstep -= self.MICROSTEPS -# if (style == Adafruit_MotorHAT.INTERLEAVE): -# if (dir == Adafruit_MotorHAT.FORWARD): -# self.currentstep += self.MICROSTEPS/2 -# else: -# self.currentstep -= self.MICROSTEPS/2 -# -# if (style == Adafruit_MotorHAT.MICROSTEP): -# if (dir == Adafruit_MotorHAT.FORWARD): -# self.currentstep += 1 -# else: -# self.currentstep -= 1 -# -# # go to next 'step' and wrap around -# self.currentstep += self.MICROSTEPS * 4 -# self.currentstep %= self.MICROSTEPS * 4 -# -# pwm_a = pwm_b = 0 -# if (self.currentstep >= 0) and (self.currentstep < self.MICROSTEPS): -# pwm_a = self.MICROSTEP_CURVE[self.MICROSTEPS - self.currentstep] -# pwm_b = self.MICROSTEP_CURVE[self.currentstep] -# elif (self.currentstep >= self.MICROSTEPS) and (self.currentstep < self.MICROSTEPS*2): -# pwm_a = self.MICROSTEP_CURVE[self.currentstep - self.MICROSTEPS] -# pwm_b = self.MICROSTEP_CURVE[self.MICROSTEPS*2 - self.currentstep] -# elif (self.currentstep >= self.MICROSTEPS*2) and (self.currentstep < self.MICROSTEPS*3): -# pwm_a = self.MICROSTEP_CURVE[self.MICROSTEPS*3 - self.currentstep] -# pwm_b = self.MICROSTEP_CURVE[self.currentstep - self.MICROSTEPS*2] -# elif (self.currentstep >= self.MICROSTEPS*3) and (self.currentstep < self.MICROSTEPS*4): -# pwm_a = self.MICROSTEP_CURVE[self.currentstep - self.MICROSTEPS*3] -# pwm_b = self.MICROSTEP_CURVE[self.MICROSTEPS*4 - self.currentstep] -# -# -# # go to next 'step' and wrap around -# self.currentstep += self.MICROSTEPS * 4 -# self.currentstep %= self.MICROSTEPS * 4 -# -# # only really used for microstepping, otherwise always on! -# self.MC._pwm.setPWM(self.PWMA, 0, pwm_a*16) -# self.MC._pwm.setPWM(self.PWMB, 0, pwm_b*16) -# -# # set up coil energizing! -# coils = [0, 0, 0, 0] -# -# if (style == Adafruit_MotorHAT.MICROSTEP): -# if (self.currentstep >= 0) and (self.currentstep < self.MICROSTEPS): -# coils = [1, 1, 0, 0] -# elif (self.currentstep >= self.MICROSTEPS) and (self.currentstep < self.MICROSTEPS*2): -# coils = [0, 1, 1, 0] -# elif (self.currentstep >= self.MICROSTEPS*2) and (self.currentstep < self.MICROSTEPS*3): -# coils = [0, 0, 1, 1] -# elif (self.currentstep >= self.MICROSTEPS*3) and (self.currentstep < self.MICROSTEPS*4): -# coils = [1, 0, 0, 1] -# else: -# step2coils = [ [1, 0, 0, 0], -# [1, 1, 0, 0], -# [0, 1, 0, 0], -# [0, 1, 1, 0], -# [0, 0, 1, 0], -# [0, 0, 1, 1], -# [0, 0, 0, 1], -# [1, 0, 0, 1] ] -# coils = step2coils[self.currentstep/(self.MICROSTEPS/2)] -# -# #print "coils state = " + str(coils) -# self.MC.setPin(self.AIN2, coils[0]) -# self.MC.setPin(self.BIN1, coils[1]) -# self.MC.setPin(self.AIN1, coils[2]) -# self.MC.setPin(self.BIN2, coils[3]) -# -# return self.currentstep -# -# def step(self, steps, direction, stepstyle): -# s_per_s = self.sec_per_step -# lateststep = 0 -# -# if (stepstyle == Adafruit_MotorHAT.INTERLEAVE): -# s_per_s = s_per_s / 2.0 -# if (stepstyle == Adafruit_MotorHAT.MICROSTEP): -# s_per_s /= self.MICROSTEPS -# steps *= self.MICROSTEPS -# -# print s_per_s, " sec per step" -# -# for s in range(steps): -# lateststep = self.oneStep(direction, stepstyle) -# time.sleep(s_per_s) -# -# if (stepstyle == Adafruit_MotorHAT.MICROSTEP): -# # this is an edge case, if we are in between full steps, lets just keep going -# # so we end on a full step -# while (lateststep != 0) and (lateststep != self.MICROSTEPS): -# lateststep = self.oneStep(dir, stepstyle) -# time.sleep(s_per_s) +class dw_Stepper: + MICROSTEPS = 8 + MICROSTEP_CURVE = [0, 50, 98, 142, 180, 212, 236, 250, 255] + + #MICROSTEPS = 16 + # a sinusoidal curve NOT LINEAR! + #MICROSTEP_CURVE = [0, 25, 50, 74, 98, 120, 141, 162, 180, 197, 212, 225, 236, 244, 250, 253, 255] + + def __init__(self, controller, num, steps=200): + self.speed = 0 + self.MC = controller + self.motornum = num + modepin = in1 = in2 = 0 + + self.revsteps = steps + self.sec_per_step = 0.1 + self.steppingcounter = 0 + self.currentstep = 0 + + if (num == 0): + ain2 = 2 #phase + ain1 = 3 #enable + bin2 = 4 #phase + bin1 = 5 #enable + elif (num == 1): + ain2 = 6 #phase + ain1 = 7 #enable + bin2 = 8 #phase + bin1 = 9 #enable + elif (num == 2): + ain2 = 10 #phase + ain1 = 11 #enable + bin2 = 12 #phase + bin1 = 13 #enable + else: + raise NameError('MotorHAT Stepper must be between 1 and 3 inclusive') + + self.PHpinA = ain2 + self.ENpinA = ain1 + self.PHpinB = bin2 + self.ENpinB = bin1 + # switch off both drivers + self.run(dw_Controller.RELEASE, 0) + + def run(self, command, speed = 0): + if not self.MC: + return + if (command == dw_Controller.FORWARD): + self.MC.setPin(self.PHpin, 0) + self.MC._pwm.set_pwm(self.ENpin, 0, speed*16) + if (command == dw_Controller.BACKWARD): + self.MC.setPin(self.PHpin, 1) + self.MC._pwm.set_pwm(self.ENpin, 0, speed*16) + if (command == dw_Controller.RELEASE): + self.MC.setPin(self.PHpinA, 0) + self.MC.setPin(self.ENpinA, 0) + self.MC.setPin(self.PHpinB, 0) + self.MC.setPin(self.ENpinB, 0) + + def off(self): + self.run(dw_Controller.RELEASE, 0) + + def setSpeed(self, rpm): + self.sec_per_step = 60.0 / (self.revsteps * rpm) + self.steppingcounter = 0 + + def oneStep(self, dir, style): + pwm_a = pwm_b = 255 + + # first determine what sort of stepping procedure we're up to + if (style == dw_Controller.SINGLE): + if ((self.currentstep/(self.MICROSTEPS/2)) % 2): + # we're at an odd step, weird + if (dir == dw_Controller.FORWARD): + self.currentstep += self.MICROSTEPS/2 + else: + self.currentstep -= self.MICROSTEPS/2 + else: + # go to next even step + if (dir == dw_Controller.FORWARD): + self.currentstep += self.MICROSTEPS + else: + self.currentstep -= self.MICROSTEPS + if (style == dw_Controller.DOUBLE): + if not (self.currentstep/(self.MICROSTEPS/2) % 2): + # we're at an even step, weird + if (dir == dw_Controller.FORWARD): + self.currentstep += self.MICROSTEPS/2 + else: + self.currentstep -= self.MICROSTEPS/2 + else: + # go to next odd step + if (dir == dw_Controller.FORWARD): + self.currentstep += self.MICROSTEPS + else: + self.currentstep -= self.MICROSTEPS + if (style == dw_Controller.INTERLEAVE): + if (dir == dw_Controller.FORWARD): + self.currentstep += self.MICROSTEPS/2 + else: + self.currentstep -= self.MICROSTEPS/2 + + if (style == dw_Controller.MICROSTEP): + if (dir == dw_Controller.FORWARD): + self.currentstep += 1 + else: + self.currentstep -= 1 + + # go to next 'step' and wrap around + self.currentstep += self.MICROSTEPS * 4 + self.currentstep %= self.MICROSTEPS * 4 + + pwm_a = pwm_b = 0 + if (self.currentstep >= 0) and (self.currentstep < self.MICROSTEPS): + pwm_a = self.MICROSTEP_CURVE[self.MICROSTEPS - self.currentstep] + pwm_b = self.MICROSTEP_CURVE[self.currentstep] + elif (self.currentstep >= self.MICROSTEPS) and (self.currentstep < self.MICROSTEPS*2): + pwm_a = self.MICROSTEP_CURVE[self.currentstep - self.MICROSTEPS] + pwm_b = self.MICROSTEP_CURVE[self.MICROSTEPS*2 - self.currentstep] + elif (self.currentstep >= self.MICROSTEPS*2) and (self.currentstep < self.MICROSTEPS*3): + pwm_a = self.MICROSTEP_CURVE[self.MICROSTEPS*3 - self.currentstep] + pwm_b = self.MICROSTEP_CURVE[self.currentstep - self.MICROSTEPS*2] + elif (self.currentstep >= self.MICROSTEPS*3) and (self.currentstep < self.MICROSTEPS*4): + pwm_a = self.MICROSTEP_CURVE[self.currentstep - self.MICROSTEPS*3] + pwm_b = self.MICROSTEP_CURVE[self.MICROSTEPS*4 - self.currentstep] + + + # go to next 'step' and wrap around + self.currentstep += self.MICROSTEPS * 4 + self.currentstep %= self.MICROSTEPS * 4 + + # only really used for microstepping, otherwise always on! + self.MC._pwm.setPWM(self.PWMA, 0, pwm_a*16) + self.MC._pwm.setPWM(self.PWMB, 0, pwm_b*16) + + # set up coil energizing! + coils = [0, 0, 0, 0] + + if (style == dw_Controller.MICROSTEP): + if (self.currentstep >= 0) and (self.currentstep < self.MICROSTEPS): + coils = [1, 1, 0, 0] + elif (self.currentstep >= self.MICROSTEPS) and (self.currentstep < self.MICROSTEPS*2): + coils = [0, 1, 1, 0] + elif (self.currentstep >= self.MICROSTEPS*2) and (self.currentstep < self.MICROSTEPS*3): + coils = [0, 0, 1, 1] + elif (self.currentstep >= self.MICROSTEPS*3) and (self.currentstep < self.MICROSTEPS*4): + coils = [1, 0, 0, 1] + else: + step2coils = [ [1, 0, 0, 0], + [1, 1, 0, 0], + [0, 1, 0, 0], + [0, 1, 1, 0], + [0, 0, 1, 0], + [0, 0, 1, 1], + [0, 0, 0, 1], + [1, 0, 0, 1] ] + coils = step2coils[self.currentstep/(self.MICROSTEPS/2)] + + #print "coils state = " + str(coils) + self.MC.setPin(self.AIN2, coils[0]) + self.MC.setPin(self.BIN1, coils[1]) + self.MC.setPin(self.AIN1, coils[2]) + self.MC.setPin(self.BIN2, coils[3]) + + return self.currentstep + + def step(self, steps, direction, stepstyle): + s_per_s = self.sec_per_step + lateststep = 0 + + if (stepstyle == dw_Controller.INTERLEAVE): + s_per_s = s_per_s / 2.0 + if (stepstyle == dw_Controller.MICROSTEP): + s_per_s /= self.MICROSTEPS + steps *= self.MICROSTEPS + + print s_per_s, " sec per step" + + for s in range(steps): + lateststep = self.oneStep(direction, stepstyle) + time.sleep(s_per_s) + + if (stepstyle == dw_Controller.MICROSTEP): + # this is an edge case, if we are in between full steps, lets just keep going + # so we end on a full step + while (lateststep != 0) and (lateststep != self.MICROSTEPS): + lateststep = self.oneStep(dir, stepstyle) + time.sleep(s_per_s) class dw_Motor: def __init__(self, controller, num): @@ -306,11 +333,13 @@ class dw_Controller: GPIO.setmode(GPIO.BCM) GPIO.setwarnings(False) GPIO.setup(27, GPIO.OUT) - GPIO.output(27, GPIO.HIGH) + GPIO.output(27, GPIO.HIGH) # set for en/phase mode - low = in/in mode self.motors = [ dw_Motor(self, m) for m in range(6) ] self.servos = [ dw_Servo(self, m, freq) for m in range(2) ] + self.steppers = [ dw_Stepper(self, m) for m in range(3) ] + def setPin(self, pin, value): if (pin < 0) or (pin > 15): raise NameError('PWM pin must be between 0 and 15 inclusive') diff --git a/darkwater_640/smbus.py b/darkwater_640/smbus.py new file mode 100644 index 0000000..67660e7 --- /dev/null +++ b/darkwater_640/smbus.py @@ -0,0 +1,294 @@ +# Copyright (c) 2016 Adafruit Industries +# Author: Tony DiCola +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +from ctypes import * +from fcntl import ioctl +import struct + +# I2C C API constants (from linux kernel headers) +I2C_M_TEN = 0x0010 # this is a ten bit chip address +I2C_M_RD = 0x0001 # read data, from slave to master +I2C_M_STOP = 0x8000 # if I2C_FUNC_PROTOCOL_MANGLING +I2C_M_NOSTART = 0x4000 # if I2C_FUNC_NOSTART +I2C_M_REV_DIR_ADDR = 0x2000 # if I2C_FUNC_PROTOCOL_MANGLING +I2C_M_IGNORE_NAK = 0x1000 # if I2C_FUNC_PROTOCOL_MANGLING +I2C_M_NO_RD_ACK = 0x0800 # if I2C_FUNC_PROTOCOL_MANGLING +I2C_M_RECV_LEN = 0x0400 # length will be first received byte + +I2C_SLAVE = 0x0703 # Use this slave address +I2C_SLAVE_FORCE = 0x0706 # Use this slave address, even if + # is already in use by a driver! +I2C_TENBIT = 0x0704 # 0 for 7 bit addrs, != 0 for 10 bit +I2C_FUNCS = 0x0705 # Get the adapter functionality mask +I2C_RDWR = 0x0707 # Combined R/W transfer (one STOP only) +I2C_PEC = 0x0708 # != 0 to use PEC with SMBus +I2C_SMBUS = 0x0720 # SMBus transfer + + +# ctypes versions of I2C structs defined by kernel. +class i2c_msg(Structure): + _fields_ = [ + ('addr', c_uint16), + ('flags', c_uint16), + ('len', c_uint16), + ('buf', POINTER(c_uint8)) + ] + +class i2c_rdwr_ioctl_data(Structure): + _fields_ = [ + ('msgs', POINTER(i2c_msg)), + ('nmsgs', c_uint32) + ] + + +def make_i2c_rdwr_data(messages): + """Utility function to create and return an i2c_rdwr_ioctl_data structure + populated with a list of specified I2C messages. The messages parameter + should be a list of tuples which represent the individual I2C messages to + send in this transaction. Tuples should contain 4 elements: address value, + flags value, buffer length, ctypes c_uint8 pointer to buffer. + """ + # Create message array and populate with provided data. + msg_data_type = i2c_msg*len(messages) + msg_data = msg_data_type() + for i, m in enumerate(messages): + msg_data[i].addr = m[0] & 0x7F + msg_data[i].flags = m[1] + msg_data[i].len = m[2] + msg_data[i].buf = m[3] + # Now build the data structure. + data = i2c_rdwr_ioctl_data() + data.msgs = msg_data + data.nmsgs = len(messages) + return data + + +# Create an interface that mimics the Python SMBus API. +class SMBus(object): + """I2C interface that mimics the Python SMBus API but is implemented with + pure Python calls to ioctl and direct /dev/i2c device access. + """ + + def __init__(self, bus=None): + """Create a new smbus instance. Bus is an optional parameter that + specifies the I2C bus number to use, for example 1 would use device + /dev/i2c-1. If bus is not specified then the open function should be + called to open the bus. + """ + self._device = None + if bus is not None: + self.open(bus) + + def __del__(self): + """Clean up any resources used by the SMBus instance.""" + self.close() + + def __enter__(self): + """Context manager enter function.""" + # Just return this object so it can be used in a with statement, like + # with SMBus(1) as bus: + # # do stuff! + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Context manager exit function, ensures resources are cleaned up.""" + self.close() + return False # Don't suppress exceptions. + + def open(self, bus): + """Open the smbus interface on the specified bus.""" + # Close the device if it's already open. + if self._device is not None: + self.close() + # Try to open the file for the specified bus. Must turn off buffering + # or else Python 3 fails (see: https://bugs.python.org/issue20074) + self._device = open('/dev/i2c-{0}'.format(bus), 'r+b', buffering=0) + # TODO: Catch IOError and throw a better error message that describes + # what's wrong (i.e. I2C may not be enabled or the bus doesn't exist). + + def close(self): + """Close the smbus connection. You cannot make any other function + calls on the bus unless open is called!""" + if self._device is not None: + self._device.close() + self._device = None + + def _select_device(self, addr): + """Set the address of the device to communicate with on the I2C bus.""" + ioctl(self._device.fileno(), I2C_SLAVE, addr & 0x7F) + + def read_byte(self, addr): + """Read a single byte from the specified device.""" + assert self._device is not None, 'Bus must be opened before operations are made against it!' + self._select_device(addr) + return ord(self._device.read(1)) + + def read_byte_data(self, addr, cmd): + """Read a single byte from the specified cmd register of the device.""" + assert self._device is not None, 'Bus must be opened before operations are made against it!' + # Build ctypes values to marshall between ioctl and Python. + reg = c_uint8(cmd) + result = c_uint8() + # Build ioctl request. + request = make_i2c_rdwr_data([ + (addr, 0, 1, pointer(reg)), # Write cmd register. + (addr, I2C_M_RD, 1, pointer(result)) # Read 1 byte as result. + ]) + # Make ioctl call and return result data. + ioctl(self._device.fileno(), I2C_RDWR, request) + return result.value + + def read_word_data(self, addr, cmd): + """Read a word (2 bytes) from the specified cmd register of the device. + Note that this will interpret data using the endianness of the processor + running Python (typically little endian)! + """ + assert self._device is not None, 'Bus must be opened before operations are made against it!' + # Build ctypes values to marshall between ioctl and Python. + reg = c_uint8(cmd) + result = c_uint16() + # Build ioctl request. + request = make_i2c_rdwr_data([ + (addr, 0, 1, pointer(reg)), # Write cmd register. + (addr, I2C_M_RD, 2, cast(pointer(result), POINTER(c_uint8))) # Read word (2 bytes). + ]) + # Make ioctl call and return result data. + ioctl(self._device.fileno(), I2C_RDWR, request) + return result.value + + def read_block_data(self, addr, cmd): + """Perform a block read from the specified cmd register of the device. + The amount of data read is determined by the first byte send back by + the device. Data is returned as a bytearray. + """ + # TODO: Unfortunately this will require calling the low level I2C + # access ioctl to trigger a proper read_block_data. The amount of data + # returned isn't known until the device starts responding so an I2C_RDWR + # ioctl won't work. + raise NotImplementedError() + + def read_i2c_block_data(self, addr, cmd, length=32): + """Perform a read from the specified cmd register of device. Length number + of bytes (default of 32) will be read and returned as a bytearray. + """ + assert self._device is not None, 'Bus must be opened before operations are made against it!' + # Build ctypes values to marshall between ioctl and Python. + reg = c_uint8(cmd) + result = create_string_buffer(length) + # Build ioctl request. + request = make_i2c_rdwr_data([ + (addr, 0, 1, pointer(reg)), # Write cmd register. + (addr, I2C_M_RD, length, cast(result, POINTER(c_uint8))) # Read data. + ]) + # Make ioctl call and return result data. + ioctl(self._device.fileno(), I2C_RDWR, request) + return bytearray(result.raw) # Use .raw instead of .value which will stop at a null byte! + + def write_quick(self, addr): + """Write a single byte to the specified device.""" + # What a strange function, from the python-smbus source this appears to + # just write a single byte that initiates a write to the specified device + # address (but writes no data!). The functionality is duplicated below + # but the actual use case for this is unknown. + assert self._device is not None, 'Bus must be opened before operations are made against it!' + # Build ioctl request. + request = make_i2c_rdwr_data([ + (addr, 0, 0, None), # Write with no data. + ]) + # Make ioctl call and return result data. + ioctl(self._device.fileno(), I2C_RDWR, request) + + def write_byte(self, addr, val): + """Write a single byte to the specified device.""" + assert self._device is not None, 'Bus must be opened before operations are made against it!' + self._select_device(addr) + data = bytearray(1) + data[0] = val & 0xFF + self._device.write(data) + + def write_byte_data(self, addr, cmd, val): + """Write a byte of data to the specified cmd register of the device. + """ + assert self._device is not None, 'Bus must be opened before operations are made against it!' + # Construct a string of data to send with the command register and byte value. + data = bytearray(2) + data[0] = cmd & 0xFF + data[1] = val & 0xFF + # Send the data to the device. + self._select_device(addr) + self._device.write(data) + + def write_word_data(self, addr, cmd, val): + """Write a word (2 bytes) of data to the specified cmd register of the + device. Note that this will write the data in the endianness of the + processor running Python (typically little endian)! + """ + assert self._device is not None, 'Bus must be opened before operations are made against it!' + # Construct a string of data to send with the command register and word value. + data = struct.pack('=BH', cmd & 0xFF, val & 0xFFFF) + # Send the data to the device. + self._select_device(addr) + self._device.write(data) + + def write_block_data(self, addr, cmd, vals): + """Write a block of data to the specified cmd register of the device. + The amount of data to write should be the first byte inside the vals + string/bytearray and that count of bytes of data to write should follow + it. + """ + # Just use the I2C block data write to write the provided values and + # their length as the first byte. + data = bytearray(len(vals)+1) + data[0] = len(vals) & 0xFF + data[1:] = vals[0:] + self.write_i2c_block_data(addr, cmd, data) + + def write_i2c_block_data(self, addr, cmd, vals): + """Write a buffer of data to the specified cmd register of the device. + """ + assert self._device is not None, 'Bus must be opened before operations are made against it!' + # Construct a string of data to send, including room for the command register. + data = bytearray(len(vals)+1) + data[0] = cmd & 0xFF # Command register at the start. + data[1:] = vals[0:] # Copy in the block data (ugly but necessary to ensure + # the entire write happens in one transaction). + # Send the data to the device. + self._select_device(addr) + self._device.write(data) + + def process_call(self, addr, cmd, val): + """Perform a smbus process call by writing a word (2 byte) value to + the specified register of the device, and then reading a word of response + data (which is returned). + """ + assert self._device is not None, 'Bus must be opened before operations are made against it!' + # Build ctypes values to marshall between ioctl and Python. + data = create_string_buffer(struct.pack('=BH', cmd, val)) + result = c_uint16() + # Build ioctl request. + request = make_i2c_rdwr_data([ + (addr, 0, 3, cast(pointer(data), POINTER(c_uint8))), # Write data. + (addr, I2C_M_RD, 2, cast(pointer(result), POINTER(c_uint8))) # Read word (2 bytes). + ]) + # Make ioctl call and return result data. + ioctl(self._device.fileno(), I2C_RDWR, request) + # Note the python-smbus code appears to have a rather serious bug and + # does not return the result value! This is fixed below by returning it. + return result.value