quick update test
This commit is contained in:
84
640steppertest.py
Normal file
84
640steppertest.py
Normal file
@@ -0,0 +1,84 @@
|
||||
import time
|
||||
from darkwater_640.darkwater_640 import dw_Controller, dw_Servo, dw_Motor
|
||||
|
||||
dw = dw_Controller( addr=0x60 )
|
||||
m1 = dw.getMotor(1)
|
||||
m2 = dw.getMotor(2)
|
||||
m3 = dw.getMotor(3)
|
||||
m4 = dw.getMotor(4)
|
||||
m5 = dw.getMotor(5)
|
||||
m6 = dw.getMotor(6)
|
||||
|
||||
m1.off()
|
||||
m2.off()
|
||||
m3.off()
|
||||
m4.off()
|
||||
m5.off()
|
||||
m6.off()
|
||||
time.sleep(1)
|
||||
|
||||
##time.sleep(10)
|
||||
print "Set forward - "
|
||||
print "Motor 1"
|
||||
m1.setMotorSpeed(255)
|
||||
time.sleep(1)
|
||||
print "Motor 2"
|
||||
m2.setMotorSpeed(255)
|
||||
time.sleep(1)
|
||||
print "Motor 3"
|
||||
m3.setMotorSpeed(255)
|
||||
time.sleep(1)
|
||||
print "Motor 4"
|
||||
m4.setMotorSpeed(255)
|
||||
time.sleep(1)
|
||||
print "Motor 5"
|
||||
m5.setMotorSpeed(255)
|
||||
time.sleep(1)
|
||||
print "Motor 6"
|
||||
m6.setMotorSpeed(255)
|
||||
time.sleep(1)
|
||||
print "Stopping - "
|
||||
print "Motor 1"
|
||||
m1.setMotorSpeed(0)
|
||||
time.sleep(1)
|
||||
print "Motor 2"
|
||||
m2.setMotorSpeed(0)
|
||||
time.sleep(1)
|
||||
print "Motor 3"
|
||||
m3.setMotorSpeed(0)
|
||||
time.sleep(1)
|
||||
print "Motor 4"
|
||||
m4.setMotorSpeed(0)
|
||||
time.sleep(1)
|
||||
print "Motor 5"
|
||||
m5.setMotorSpeed(0)
|
||||
time.sleep(1)
|
||||
print "Motor 6"
|
||||
m6.setMotorSpeed(0)
|
||||
time.sleep(1)
|
||||
print "Set reverse - "
|
||||
print "Motor 1"
|
||||
m1.setMotorSpeed(-255)
|
||||
time.sleep(1)
|
||||
print "Motor 2"
|
||||
m2.setMotorSpeed(-255)
|
||||
time.sleep(1)
|
||||
print "Motor 3"
|
||||
m3.setMotorSpeed(-255)
|
||||
time.sleep(1)
|
||||
print "Motor 4"
|
||||
m4.setMotorSpeed(-255)
|
||||
time.sleep(1)
|
||||
print "Motor 5"
|
||||
m5.setMotorSpeed(-255)
|
||||
time.sleep(1)
|
||||
print "Motor 6"
|
||||
m6.setMotorSpeed(-255)
|
||||
time.sleep(1)
|
||||
print "All off"
|
||||
m1.off()
|
||||
m2.off()
|
||||
m3.off()
|
||||
m4.off()
|
||||
m5.off()
|
||||
m6.off()
|
||||
@@ -93,8 +93,8 @@ class Device(object):
|
||||
self._address = address
|
||||
if i2c_interface is None:
|
||||
# Use pure python I2C interface if none is specified.
|
||||
import Adafruit_PureIO.smbus
|
||||
self._bus = Adafruit_PureIO.smbus.SMBus(busnum)
|
||||
import smbus
|
||||
self._bus = smbus.SMBus(busnum)
|
||||
else:
|
||||
# Otherwise use the provided class to create an smbus interface.
|
||||
self._bus = i2c_interface(busnum)
|
||||
|
||||
@@ -1 +1 @@
|
||||
from .darkwater_640 import dw_Motor, dw_Controller
|
||||
from .darkwater_640 import dw_Motor, dw_Servo, dw_Controller
|
||||
|
||||
@@ -5,167 +5,194 @@ from PCA9685 import PCA9685
|
||||
import time
|
||||
import math
|
||||
|
||||
# class Adafruit_StepperMotor:
|
||||
# MICROSTEPS = 8
|
||||
# MICROSTEP_CURVE = [0, 50, 98, 142, 180, 212, 236, 250, 255]
|
||||
#
|
||||
# #MICROSTEPS = 16
|
||||
# # a sinusoidal curve NOT LINEAR!
|
||||
# #MICROSTEP_CURVE = [0, 25, 50, 74, 98, 120, 141, 162, 180, 197, 212, 225, 236, 244, 250, 253, 255]
|
||||
#
|
||||
# def __init__(self, controller, num, steps=200):
|
||||
# self.MC = controller
|
||||
# self.revsteps = steps
|
||||
# self.motornum = num
|
||||
# self.sec_per_step = 0.1
|
||||
# self.steppingcounter = 0
|
||||
# self.currentstep = 0
|
||||
#
|
||||
# num -= 1
|
||||
#
|
||||
# if (num == 0):
|
||||
# self.PWMA = 8
|
||||
# self.AIN2 = 9
|
||||
# self.AIN1 = 10
|
||||
# self.PWMB = 13
|
||||
# self.BIN2 = 12
|
||||
# self.BIN1 = 11
|
||||
# elif (num == 1):
|
||||
# self.PWMA = 2
|
||||
# self.AIN2 = 3
|
||||
# self.AIN1 = 4
|
||||
# self.PWMB = 7
|
||||
# self.BIN2 = 6
|
||||
# self.BIN1 = 5
|
||||
# else:
|
||||
# raise NameError('MotorHAT Stepper must be between 1 and 2 inclusive')
|
||||
#
|
||||
# def setSpeed(self, rpm):
|
||||
# self.sec_per_step = 60.0 / (self.revsteps * rpm)
|
||||
# self.steppingcounter = 0
|
||||
#
|
||||
# def oneStep(self, dir, style):
|
||||
# pwm_a = pwm_b = 255
|
||||
#
|
||||
# # first determine what sort of stepping procedure we're up to
|
||||
# if (style == Adafruit_MotorHAT.SINGLE):
|
||||
# if ((self.currentstep/(self.MICROSTEPS/2)) % 2):
|
||||
# # we're at an odd step, weird
|
||||
# if (dir == Adafruit_MotorHAT.FORWARD):
|
||||
# self.currentstep += self.MICROSTEPS/2
|
||||
# else:
|
||||
# self.currentstep -= self.MICROSTEPS/2
|
||||
# else:
|
||||
# # go to next even step
|
||||
# if (dir == Adafruit_MotorHAT.FORWARD):
|
||||
# self.currentstep += self.MICROSTEPS
|
||||
# else:
|
||||
# self.currentstep -= self.MICROSTEPS
|
||||
# if (style == Adafruit_MotorHAT.DOUBLE):
|
||||
# if not (self.currentstep/(self.MICROSTEPS/2) % 2):
|
||||
# # we're at an even step, weird
|
||||
# if (dir == Adafruit_MotorHAT.FORWARD):
|
||||
# self.currentstep += self.MICROSTEPS/2
|
||||
# else:
|
||||
# self.currentstep -= self.MICROSTEPS/2
|
||||
# else:
|
||||
# # go to next odd step
|
||||
# if (dir == Adafruit_MotorHAT.FORWARD):
|
||||
# self.currentstep += self.MICROSTEPS
|
||||
# else:
|
||||
# self.currentstep -= self.MICROSTEPS
|
||||
# if (style == Adafruit_MotorHAT.INTERLEAVE):
|
||||
# if (dir == Adafruit_MotorHAT.FORWARD):
|
||||
# self.currentstep += self.MICROSTEPS/2
|
||||
# else:
|
||||
# self.currentstep -= self.MICROSTEPS/2
|
||||
#
|
||||
# if (style == Adafruit_MotorHAT.MICROSTEP):
|
||||
# if (dir == Adafruit_MotorHAT.FORWARD):
|
||||
# self.currentstep += 1
|
||||
# else:
|
||||
# self.currentstep -= 1
|
||||
#
|
||||
# # go to next 'step' and wrap around
|
||||
# self.currentstep += self.MICROSTEPS * 4
|
||||
# self.currentstep %= self.MICROSTEPS * 4
|
||||
#
|
||||
# pwm_a = pwm_b = 0
|
||||
# if (self.currentstep >= 0) and (self.currentstep < self.MICROSTEPS):
|
||||
# pwm_a = self.MICROSTEP_CURVE[self.MICROSTEPS - self.currentstep]
|
||||
# pwm_b = self.MICROSTEP_CURVE[self.currentstep]
|
||||
# elif (self.currentstep >= self.MICROSTEPS) and (self.currentstep < self.MICROSTEPS*2):
|
||||
# pwm_a = self.MICROSTEP_CURVE[self.currentstep - self.MICROSTEPS]
|
||||
# pwm_b = self.MICROSTEP_CURVE[self.MICROSTEPS*2 - self.currentstep]
|
||||
# elif (self.currentstep >= self.MICROSTEPS*2) and (self.currentstep < self.MICROSTEPS*3):
|
||||
# pwm_a = self.MICROSTEP_CURVE[self.MICROSTEPS*3 - self.currentstep]
|
||||
# pwm_b = self.MICROSTEP_CURVE[self.currentstep - self.MICROSTEPS*2]
|
||||
# elif (self.currentstep >= self.MICROSTEPS*3) and (self.currentstep < self.MICROSTEPS*4):
|
||||
# pwm_a = self.MICROSTEP_CURVE[self.currentstep - self.MICROSTEPS*3]
|
||||
# pwm_b = self.MICROSTEP_CURVE[self.MICROSTEPS*4 - self.currentstep]
|
||||
#
|
||||
#
|
||||
# # go to next 'step' and wrap around
|
||||
# self.currentstep += self.MICROSTEPS * 4
|
||||
# self.currentstep %= self.MICROSTEPS * 4
|
||||
#
|
||||
# # only really used for microstepping, otherwise always on!
|
||||
# self.MC._pwm.setPWM(self.PWMA, 0, pwm_a*16)
|
||||
# self.MC._pwm.setPWM(self.PWMB, 0, pwm_b*16)
|
||||
#
|
||||
# # set up coil energizing!
|
||||
# coils = [0, 0, 0, 0]
|
||||
#
|
||||
# if (style == Adafruit_MotorHAT.MICROSTEP):
|
||||
# if (self.currentstep >= 0) and (self.currentstep < self.MICROSTEPS):
|
||||
# coils = [1, 1, 0, 0]
|
||||
# elif (self.currentstep >= self.MICROSTEPS) and (self.currentstep < self.MICROSTEPS*2):
|
||||
# coils = [0, 1, 1, 0]
|
||||
# elif (self.currentstep >= self.MICROSTEPS*2) and (self.currentstep < self.MICROSTEPS*3):
|
||||
# coils = [0, 0, 1, 1]
|
||||
# elif (self.currentstep >= self.MICROSTEPS*3) and (self.currentstep < self.MICROSTEPS*4):
|
||||
# coils = [1, 0, 0, 1]
|
||||
# else:
|
||||
# step2coils = [ [1, 0, 0, 0],
|
||||
# [1, 1, 0, 0],
|
||||
# [0, 1, 0, 0],
|
||||
# [0, 1, 1, 0],
|
||||
# [0, 0, 1, 0],
|
||||
# [0, 0, 1, 1],
|
||||
# [0, 0, 0, 1],
|
||||
# [1, 0, 0, 1] ]
|
||||
# coils = step2coils[self.currentstep/(self.MICROSTEPS/2)]
|
||||
#
|
||||
# #print "coils state = " + str(coils)
|
||||
# self.MC.setPin(self.AIN2, coils[0])
|
||||
# self.MC.setPin(self.BIN1, coils[1])
|
||||
# self.MC.setPin(self.AIN1, coils[2])
|
||||
# self.MC.setPin(self.BIN2, coils[3])
|
||||
#
|
||||
# return self.currentstep
|
||||
#
|
||||
# def step(self, steps, direction, stepstyle):
|
||||
# s_per_s = self.sec_per_step
|
||||
# lateststep = 0
|
||||
#
|
||||
# if (stepstyle == Adafruit_MotorHAT.INTERLEAVE):
|
||||
# s_per_s = s_per_s / 2.0
|
||||
# if (stepstyle == Adafruit_MotorHAT.MICROSTEP):
|
||||
# s_per_s /= self.MICROSTEPS
|
||||
# steps *= self.MICROSTEPS
|
||||
#
|
||||
# print s_per_s, " sec per step"
|
||||
#
|
||||
# for s in range(steps):
|
||||
# lateststep = self.oneStep(direction, stepstyle)
|
||||
# time.sleep(s_per_s)
|
||||
#
|
||||
# if (stepstyle == Adafruit_MotorHAT.MICROSTEP):
|
||||
# # this is an edge case, if we are in between full steps, lets just keep going
|
||||
# # so we end on a full step
|
||||
# while (lateststep != 0) and (lateststep != self.MICROSTEPS):
|
||||
# lateststep = self.oneStep(dir, stepstyle)
|
||||
# time.sleep(s_per_s)
|
||||
class dw_Stepper:
|
||||
MICROSTEPS = 8
|
||||
MICROSTEP_CURVE = [0, 50, 98, 142, 180, 212, 236, 250, 255]
|
||||
|
||||
#MICROSTEPS = 16
|
||||
# a sinusoidal curve NOT LINEAR!
|
||||
#MICROSTEP_CURVE = [0, 25, 50, 74, 98, 120, 141, 162, 180, 197, 212, 225, 236, 244, 250, 253, 255]
|
||||
|
||||
def __init__(self, controller, num, steps=200):
|
||||
self.speed = 0
|
||||
self.MC = controller
|
||||
self.motornum = num
|
||||
modepin = in1 = in2 = 0
|
||||
|
||||
self.revsteps = steps
|
||||
self.sec_per_step = 0.1
|
||||
self.steppingcounter = 0
|
||||
self.currentstep = 0
|
||||
|
||||
if (num == 0):
|
||||
ain2 = 2 #phase
|
||||
ain1 = 3 #enable
|
||||
bin2 = 4 #phase
|
||||
bin1 = 5 #enable
|
||||
elif (num == 1):
|
||||
ain2 = 6 #phase
|
||||
ain1 = 7 #enable
|
||||
bin2 = 8 #phase
|
||||
bin1 = 9 #enable
|
||||
elif (num == 2):
|
||||
ain2 = 10 #phase
|
||||
ain1 = 11 #enable
|
||||
bin2 = 12 #phase
|
||||
bin1 = 13 #enable
|
||||
else:
|
||||
raise NameError('MotorHAT Stepper must be between 1 and 3 inclusive')
|
||||
|
||||
self.PHpinA = ain2
|
||||
self.ENpinA = ain1
|
||||
self.PHpinB = bin2
|
||||
self.ENpinB = bin1
|
||||
# switch off both drivers
|
||||
self.run(dw_Controller.RELEASE, 0)
|
||||
|
||||
def run(self, command, speed = 0):
|
||||
if not self.MC:
|
||||
return
|
||||
if (command == dw_Controller.FORWARD):
|
||||
self.MC.setPin(self.PHpin, 0)
|
||||
self.MC._pwm.set_pwm(self.ENpin, 0, speed*16)
|
||||
if (command == dw_Controller.BACKWARD):
|
||||
self.MC.setPin(self.PHpin, 1)
|
||||
self.MC._pwm.set_pwm(self.ENpin, 0, speed*16)
|
||||
if (command == dw_Controller.RELEASE):
|
||||
self.MC.setPin(self.PHpinA, 0)
|
||||
self.MC.setPin(self.ENpinA, 0)
|
||||
self.MC.setPin(self.PHpinB, 0)
|
||||
self.MC.setPin(self.ENpinB, 0)
|
||||
|
||||
def off(self):
|
||||
self.run(dw_Controller.RELEASE, 0)
|
||||
|
||||
def setSpeed(self, rpm):
|
||||
self.sec_per_step = 60.0 / (self.revsteps * rpm)
|
||||
self.steppingcounter = 0
|
||||
|
||||
def oneStep(self, dir, style):
|
||||
pwm_a = pwm_b = 255
|
||||
|
||||
# first determine what sort of stepping procedure we're up to
|
||||
if (style == dw_Controller.SINGLE):
|
||||
if ((self.currentstep/(self.MICROSTEPS/2)) % 2):
|
||||
# we're at an odd step, weird
|
||||
if (dir == dw_Controller.FORWARD):
|
||||
self.currentstep += self.MICROSTEPS/2
|
||||
else:
|
||||
self.currentstep -= self.MICROSTEPS/2
|
||||
else:
|
||||
# go to next even step
|
||||
if (dir == dw_Controller.FORWARD):
|
||||
self.currentstep += self.MICROSTEPS
|
||||
else:
|
||||
self.currentstep -= self.MICROSTEPS
|
||||
if (style == dw_Controller.DOUBLE):
|
||||
if not (self.currentstep/(self.MICROSTEPS/2) % 2):
|
||||
# we're at an even step, weird
|
||||
if (dir == dw_Controller.FORWARD):
|
||||
self.currentstep += self.MICROSTEPS/2
|
||||
else:
|
||||
self.currentstep -= self.MICROSTEPS/2
|
||||
else:
|
||||
# go to next odd step
|
||||
if (dir == dw_Controller.FORWARD):
|
||||
self.currentstep += self.MICROSTEPS
|
||||
else:
|
||||
self.currentstep -= self.MICROSTEPS
|
||||
if (style == dw_Controller.INTERLEAVE):
|
||||
if (dir == dw_Controller.FORWARD):
|
||||
self.currentstep += self.MICROSTEPS/2
|
||||
else:
|
||||
self.currentstep -= self.MICROSTEPS/2
|
||||
|
||||
if (style == dw_Controller.MICROSTEP):
|
||||
if (dir == dw_Controller.FORWARD):
|
||||
self.currentstep += 1
|
||||
else:
|
||||
self.currentstep -= 1
|
||||
|
||||
# go to next 'step' and wrap around
|
||||
self.currentstep += self.MICROSTEPS * 4
|
||||
self.currentstep %= self.MICROSTEPS * 4
|
||||
|
||||
pwm_a = pwm_b = 0
|
||||
if (self.currentstep >= 0) and (self.currentstep < self.MICROSTEPS):
|
||||
pwm_a = self.MICROSTEP_CURVE[self.MICROSTEPS - self.currentstep]
|
||||
pwm_b = self.MICROSTEP_CURVE[self.currentstep]
|
||||
elif (self.currentstep >= self.MICROSTEPS) and (self.currentstep < self.MICROSTEPS*2):
|
||||
pwm_a = self.MICROSTEP_CURVE[self.currentstep - self.MICROSTEPS]
|
||||
pwm_b = self.MICROSTEP_CURVE[self.MICROSTEPS*2 - self.currentstep]
|
||||
elif (self.currentstep >= self.MICROSTEPS*2) and (self.currentstep < self.MICROSTEPS*3):
|
||||
pwm_a = self.MICROSTEP_CURVE[self.MICROSTEPS*3 - self.currentstep]
|
||||
pwm_b = self.MICROSTEP_CURVE[self.currentstep - self.MICROSTEPS*2]
|
||||
elif (self.currentstep >= self.MICROSTEPS*3) and (self.currentstep < self.MICROSTEPS*4):
|
||||
pwm_a = self.MICROSTEP_CURVE[self.currentstep - self.MICROSTEPS*3]
|
||||
pwm_b = self.MICROSTEP_CURVE[self.MICROSTEPS*4 - self.currentstep]
|
||||
|
||||
|
||||
# go to next 'step' and wrap around
|
||||
self.currentstep += self.MICROSTEPS * 4
|
||||
self.currentstep %= self.MICROSTEPS * 4
|
||||
|
||||
# only really used for microstepping, otherwise always on!
|
||||
self.MC._pwm.setPWM(self.PWMA, 0, pwm_a*16)
|
||||
self.MC._pwm.setPWM(self.PWMB, 0, pwm_b*16)
|
||||
|
||||
# set up coil energizing!
|
||||
coils = [0, 0, 0, 0]
|
||||
|
||||
if (style == dw_Controller.MICROSTEP):
|
||||
if (self.currentstep >= 0) and (self.currentstep < self.MICROSTEPS):
|
||||
coils = [1, 1, 0, 0]
|
||||
elif (self.currentstep >= self.MICROSTEPS) and (self.currentstep < self.MICROSTEPS*2):
|
||||
coils = [0, 1, 1, 0]
|
||||
elif (self.currentstep >= self.MICROSTEPS*2) and (self.currentstep < self.MICROSTEPS*3):
|
||||
coils = [0, 0, 1, 1]
|
||||
elif (self.currentstep >= self.MICROSTEPS*3) and (self.currentstep < self.MICROSTEPS*4):
|
||||
coils = [1, 0, 0, 1]
|
||||
else:
|
||||
step2coils = [ [1, 0, 0, 0],
|
||||
[1, 1, 0, 0],
|
||||
[0, 1, 0, 0],
|
||||
[0, 1, 1, 0],
|
||||
[0, 0, 1, 0],
|
||||
[0, 0, 1, 1],
|
||||
[0, 0, 0, 1],
|
||||
[1, 0, 0, 1] ]
|
||||
coils = step2coils[self.currentstep/(self.MICROSTEPS/2)]
|
||||
|
||||
#print "coils state = " + str(coils)
|
||||
self.MC.setPin(self.AIN2, coils[0])
|
||||
self.MC.setPin(self.BIN1, coils[1])
|
||||
self.MC.setPin(self.AIN1, coils[2])
|
||||
self.MC.setPin(self.BIN2, coils[3])
|
||||
|
||||
return self.currentstep
|
||||
|
||||
def step(self, steps, direction, stepstyle):
|
||||
s_per_s = self.sec_per_step
|
||||
lateststep = 0
|
||||
|
||||
if (stepstyle == dw_Controller.INTERLEAVE):
|
||||
s_per_s = s_per_s / 2.0
|
||||
if (stepstyle == dw_Controller.MICROSTEP):
|
||||
s_per_s /= self.MICROSTEPS
|
||||
steps *= self.MICROSTEPS
|
||||
|
||||
print s_per_s, " sec per step"
|
||||
|
||||
for s in range(steps):
|
||||
lateststep = self.oneStep(direction, stepstyle)
|
||||
time.sleep(s_per_s)
|
||||
|
||||
if (stepstyle == dw_Controller.MICROSTEP):
|
||||
# this is an edge case, if we are in between full steps, lets just keep going
|
||||
# so we end on a full step
|
||||
while (lateststep != 0) and (lateststep != self.MICROSTEPS):
|
||||
lateststep = self.oneStep(dir, stepstyle)
|
||||
time.sleep(s_per_s)
|
||||
|
||||
class dw_Motor:
|
||||
def __init__(self, controller, num):
|
||||
@@ -306,11 +333,13 @@ class dw_Controller:
|
||||
GPIO.setmode(GPIO.BCM)
|
||||
GPIO.setwarnings(False)
|
||||
GPIO.setup(27, GPIO.OUT)
|
||||
GPIO.output(27, GPIO.HIGH)
|
||||
GPIO.output(27, GPIO.HIGH) # set for en/phase mode - low = in/in mode
|
||||
|
||||
self.motors = [ dw_Motor(self, m) for m in range(6) ]
|
||||
self.servos = [ dw_Servo(self, m, freq) for m in range(2) ]
|
||||
|
||||
self.steppers = [ dw_Stepper(self, m) for m in range(3) ]
|
||||
|
||||
def setPin(self, pin, value):
|
||||
if (pin < 0) or (pin > 15):
|
||||
raise NameError('PWM pin must be between 0 and 15 inclusive')
|
||||
|
||||
294
darkwater_640/smbus.py
Normal file
294
darkwater_640/smbus.py
Normal file
@@ -0,0 +1,294 @@
|
||||
# Copyright (c) 2016 Adafruit Industries
|
||||
# Author: Tony DiCola
|
||||
#
|
||||
# Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
# of this software and associated documentation files (the "Software"), to deal
|
||||
# in the Software without restriction, including without limitation the rights
|
||||
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
# copies of the Software, and to permit persons to whom the Software is
|
||||
# furnished to do so, subject to the following conditions:
|
||||
#
|
||||
# The above copyright notice and this permission notice shall be included in
|
||||
# all copies or substantial portions of the Software.
|
||||
#
|
||||
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||
# THE SOFTWARE.
|
||||
from ctypes import *
|
||||
from fcntl import ioctl
|
||||
import struct
|
||||
|
||||
# I2C C API constants (from linux kernel headers)
|
||||
I2C_M_TEN = 0x0010 # this is a ten bit chip address
|
||||
I2C_M_RD = 0x0001 # read data, from slave to master
|
||||
I2C_M_STOP = 0x8000 # if I2C_FUNC_PROTOCOL_MANGLING
|
||||
I2C_M_NOSTART = 0x4000 # if I2C_FUNC_NOSTART
|
||||
I2C_M_REV_DIR_ADDR = 0x2000 # if I2C_FUNC_PROTOCOL_MANGLING
|
||||
I2C_M_IGNORE_NAK = 0x1000 # if I2C_FUNC_PROTOCOL_MANGLING
|
||||
I2C_M_NO_RD_ACK = 0x0800 # if I2C_FUNC_PROTOCOL_MANGLING
|
||||
I2C_M_RECV_LEN = 0x0400 # length will be first received byte
|
||||
|
||||
I2C_SLAVE = 0x0703 # Use this slave address
|
||||
I2C_SLAVE_FORCE = 0x0706 # Use this slave address, even if
|
||||
# is already in use by a driver!
|
||||
I2C_TENBIT = 0x0704 # 0 for 7 bit addrs, != 0 for 10 bit
|
||||
I2C_FUNCS = 0x0705 # Get the adapter functionality mask
|
||||
I2C_RDWR = 0x0707 # Combined R/W transfer (one STOP only)
|
||||
I2C_PEC = 0x0708 # != 0 to use PEC with SMBus
|
||||
I2C_SMBUS = 0x0720 # SMBus transfer
|
||||
|
||||
|
||||
# ctypes versions of I2C structs defined by kernel.
|
||||
class i2c_msg(Structure):
|
||||
_fields_ = [
|
||||
('addr', c_uint16),
|
||||
('flags', c_uint16),
|
||||
('len', c_uint16),
|
||||
('buf', POINTER(c_uint8))
|
||||
]
|
||||
|
||||
class i2c_rdwr_ioctl_data(Structure):
|
||||
_fields_ = [
|
||||
('msgs', POINTER(i2c_msg)),
|
||||
('nmsgs', c_uint32)
|
||||
]
|
||||
|
||||
|
||||
def make_i2c_rdwr_data(messages):
|
||||
"""Utility function to create and return an i2c_rdwr_ioctl_data structure
|
||||
populated with a list of specified I2C messages. The messages parameter
|
||||
should be a list of tuples which represent the individual I2C messages to
|
||||
send in this transaction. Tuples should contain 4 elements: address value,
|
||||
flags value, buffer length, ctypes c_uint8 pointer to buffer.
|
||||
"""
|
||||
# Create message array and populate with provided data.
|
||||
msg_data_type = i2c_msg*len(messages)
|
||||
msg_data = msg_data_type()
|
||||
for i, m in enumerate(messages):
|
||||
msg_data[i].addr = m[0] & 0x7F
|
||||
msg_data[i].flags = m[1]
|
||||
msg_data[i].len = m[2]
|
||||
msg_data[i].buf = m[3]
|
||||
# Now build the data structure.
|
||||
data = i2c_rdwr_ioctl_data()
|
||||
data.msgs = msg_data
|
||||
data.nmsgs = len(messages)
|
||||
return data
|
||||
|
||||
|
||||
# Create an interface that mimics the Python SMBus API.
|
||||
class SMBus(object):
|
||||
"""I2C interface that mimics the Python SMBus API but is implemented with
|
||||
pure Python calls to ioctl and direct /dev/i2c device access.
|
||||
"""
|
||||
|
||||
def __init__(self, bus=None):
|
||||
"""Create a new smbus instance. Bus is an optional parameter that
|
||||
specifies the I2C bus number to use, for example 1 would use device
|
||||
/dev/i2c-1. If bus is not specified then the open function should be
|
||||
called to open the bus.
|
||||
"""
|
||||
self._device = None
|
||||
if bus is not None:
|
||||
self.open(bus)
|
||||
|
||||
def __del__(self):
|
||||
"""Clean up any resources used by the SMBus instance."""
|
||||
self.close()
|
||||
|
||||
def __enter__(self):
|
||||
"""Context manager enter function."""
|
||||
# Just return this object so it can be used in a with statement, like
|
||||
# with SMBus(1) as bus:
|
||||
# # do stuff!
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
"""Context manager exit function, ensures resources are cleaned up."""
|
||||
self.close()
|
||||
return False # Don't suppress exceptions.
|
||||
|
||||
def open(self, bus):
|
||||
"""Open the smbus interface on the specified bus."""
|
||||
# Close the device if it's already open.
|
||||
if self._device is not None:
|
||||
self.close()
|
||||
# Try to open the file for the specified bus. Must turn off buffering
|
||||
# or else Python 3 fails (see: https://bugs.python.org/issue20074)
|
||||
self._device = open('/dev/i2c-{0}'.format(bus), 'r+b', buffering=0)
|
||||
# TODO: Catch IOError and throw a better error message that describes
|
||||
# what's wrong (i.e. I2C may not be enabled or the bus doesn't exist).
|
||||
|
||||
def close(self):
|
||||
"""Close the smbus connection. You cannot make any other function
|
||||
calls on the bus unless open is called!"""
|
||||
if self._device is not None:
|
||||
self._device.close()
|
||||
self._device = None
|
||||
|
||||
def _select_device(self, addr):
|
||||
"""Set the address of the device to communicate with on the I2C bus."""
|
||||
ioctl(self._device.fileno(), I2C_SLAVE, addr & 0x7F)
|
||||
|
||||
def read_byte(self, addr):
|
||||
"""Read a single byte from the specified device."""
|
||||
assert self._device is not None, 'Bus must be opened before operations are made against it!'
|
||||
self._select_device(addr)
|
||||
return ord(self._device.read(1))
|
||||
|
||||
def read_byte_data(self, addr, cmd):
|
||||
"""Read a single byte from the specified cmd register of the device."""
|
||||
assert self._device is not None, 'Bus must be opened before operations are made against it!'
|
||||
# Build ctypes values to marshall between ioctl and Python.
|
||||
reg = c_uint8(cmd)
|
||||
result = c_uint8()
|
||||
# Build ioctl request.
|
||||
request = make_i2c_rdwr_data([
|
||||
(addr, 0, 1, pointer(reg)), # Write cmd register.
|
||||
(addr, I2C_M_RD, 1, pointer(result)) # Read 1 byte as result.
|
||||
])
|
||||
# Make ioctl call and return result data.
|
||||
ioctl(self._device.fileno(), I2C_RDWR, request)
|
||||
return result.value
|
||||
|
||||
def read_word_data(self, addr, cmd):
|
||||
"""Read a word (2 bytes) from the specified cmd register of the device.
|
||||
Note that this will interpret data using the endianness of the processor
|
||||
running Python (typically little endian)!
|
||||
"""
|
||||
assert self._device is not None, 'Bus must be opened before operations are made against it!'
|
||||
# Build ctypes values to marshall between ioctl and Python.
|
||||
reg = c_uint8(cmd)
|
||||
result = c_uint16()
|
||||
# Build ioctl request.
|
||||
request = make_i2c_rdwr_data([
|
||||
(addr, 0, 1, pointer(reg)), # Write cmd register.
|
||||
(addr, I2C_M_RD, 2, cast(pointer(result), POINTER(c_uint8))) # Read word (2 bytes).
|
||||
])
|
||||
# Make ioctl call and return result data.
|
||||
ioctl(self._device.fileno(), I2C_RDWR, request)
|
||||
return result.value
|
||||
|
||||
def read_block_data(self, addr, cmd):
|
||||
"""Perform a block read from the specified cmd register of the device.
|
||||
The amount of data read is determined by the first byte send back by
|
||||
the device. Data is returned as a bytearray.
|
||||
"""
|
||||
# TODO: Unfortunately this will require calling the low level I2C
|
||||
# access ioctl to trigger a proper read_block_data. The amount of data
|
||||
# returned isn't known until the device starts responding so an I2C_RDWR
|
||||
# ioctl won't work.
|
||||
raise NotImplementedError()
|
||||
|
||||
def read_i2c_block_data(self, addr, cmd, length=32):
|
||||
"""Perform a read from the specified cmd register of device. Length number
|
||||
of bytes (default of 32) will be read and returned as a bytearray.
|
||||
"""
|
||||
assert self._device is not None, 'Bus must be opened before operations are made against it!'
|
||||
# Build ctypes values to marshall between ioctl and Python.
|
||||
reg = c_uint8(cmd)
|
||||
result = create_string_buffer(length)
|
||||
# Build ioctl request.
|
||||
request = make_i2c_rdwr_data([
|
||||
(addr, 0, 1, pointer(reg)), # Write cmd register.
|
||||
(addr, I2C_M_RD, length, cast(result, POINTER(c_uint8))) # Read data.
|
||||
])
|
||||
# Make ioctl call and return result data.
|
||||
ioctl(self._device.fileno(), I2C_RDWR, request)
|
||||
return bytearray(result.raw) # Use .raw instead of .value which will stop at a null byte!
|
||||
|
||||
def write_quick(self, addr):
|
||||
"""Write a single byte to the specified device."""
|
||||
# What a strange function, from the python-smbus source this appears to
|
||||
# just write a single byte that initiates a write to the specified device
|
||||
# address (but writes no data!). The functionality is duplicated below
|
||||
# but the actual use case for this is unknown.
|
||||
assert self._device is not None, 'Bus must be opened before operations are made against it!'
|
||||
# Build ioctl request.
|
||||
request = make_i2c_rdwr_data([
|
||||
(addr, 0, 0, None), # Write with no data.
|
||||
])
|
||||
# Make ioctl call and return result data.
|
||||
ioctl(self._device.fileno(), I2C_RDWR, request)
|
||||
|
||||
def write_byte(self, addr, val):
|
||||
"""Write a single byte to the specified device."""
|
||||
assert self._device is not None, 'Bus must be opened before operations are made against it!'
|
||||
self._select_device(addr)
|
||||
data = bytearray(1)
|
||||
data[0] = val & 0xFF
|
||||
self._device.write(data)
|
||||
|
||||
def write_byte_data(self, addr, cmd, val):
|
||||
"""Write a byte of data to the specified cmd register of the device.
|
||||
"""
|
||||
assert self._device is not None, 'Bus must be opened before operations are made against it!'
|
||||
# Construct a string of data to send with the command register and byte value.
|
||||
data = bytearray(2)
|
||||
data[0] = cmd & 0xFF
|
||||
data[1] = val & 0xFF
|
||||
# Send the data to the device.
|
||||
self._select_device(addr)
|
||||
self._device.write(data)
|
||||
|
||||
def write_word_data(self, addr, cmd, val):
|
||||
"""Write a word (2 bytes) of data to the specified cmd register of the
|
||||
device. Note that this will write the data in the endianness of the
|
||||
processor running Python (typically little endian)!
|
||||
"""
|
||||
assert self._device is not None, 'Bus must be opened before operations are made against it!'
|
||||
# Construct a string of data to send with the command register and word value.
|
||||
data = struct.pack('=BH', cmd & 0xFF, val & 0xFFFF)
|
||||
# Send the data to the device.
|
||||
self._select_device(addr)
|
||||
self._device.write(data)
|
||||
|
||||
def write_block_data(self, addr, cmd, vals):
|
||||
"""Write a block of data to the specified cmd register of the device.
|
||||
The amount of data to write should be the first byte inside the vals
|
||||
string/bytearray and that count of bytes of data to write should follow
|
||||
it.
|
||||
"""
|
||||
# Just use the I2C block data write to write the provided values and
|
||||
# their length as the first byte.
|
||||
data = bytearray(len(vals)+1)
|
||||
data[0] = len(vals) & 0xFF
|
||||
data[1:] = vals[0:]
|
||||
self.write_i2c_block_data(addr, cmd, data)
|
||||
|
||||
def write_i2c_block_data(self, addr, cmd, vals):
|
||||
"""Write a buffer of data to the specified cmd register of the device.
|
||||
"""
|
||||
assert self._device is not None, 'Bus must be opened before operations are made against it!'
|
||||
# Construct a string of data to send, including room for the command register.
|
||||
data = bytearray(len(vals)+1)
|
||||
data[0] = cmd & 0xFF # Command register at the start.
|
||||
data[1:] = vals[0:] # Copy in the block data (ugly but necessary to ensure
|
||||
# the entire write happens in one transaction).
|
||||
# Send the data to the device.
|
||||
self._select_device(addr)
|
||||
self._device.write(data)
|
||||
|
||||
def process_call(self, addr, cmd, val):
|
||||
"""Perform a smbus process call by writing a word (2 byte) value to
|
||||
the specified register of the device, and then reading a word of response
|
||||
data (which is returned).
|
||||
"""
|
||||
assert self._device is not None, 'Bus must be opened before operations are made against it!'
|
||||
# Build ctypes values to marshall between ioctl and Python.
|
||||
data = create_string_buffer(struct.pack('=BH', cmd, val))
|
||||
result = c_uint16()
|
||||
# Build ioctl request.
|
||||
request = make_i2c_rdwr_data([
|
||||
(addr, 0, 3, cast(pointer(data), POINTER(c_uint8))), # Write data.
|
||||
(addr, I2C_M_RD, 2, cast(pointer(result), POINTER(c_uint8))) # Read word (2 bytes).
|
||||
])
|
||||
# Make ioctl call and return result data.
|
||||
ioctl(self._device.fileno(), I2C_RDWR, request)
|
||||
# Note the python-smbus code appears to have a rather serious bug and
|
||||
# does not return the result value! This is fixed below by returning it.
|
||||
return result.value
|
||||
Reference in New Issue
Block a user