Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

USB2 Stick Driver #21

Merged
merged 4 commits into from
Jan 1, 2012
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 7 additions & 0 deletions buildout.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ parts =
pep8
importchecker
sphinx
pyusb
develop = .
eggs = ant

Expand Down Expand Up @@ -48,3 +49,9 @@ recipe = zc.recipe.egg
eggs =
${buildout:eggs}
sphinx

[pyusb]
recipe = zc.recipe.egg
eggs =
${buildout:eggs}
pyusb
30 changes: 30 additions & 0 deletions demos/ant.core/02-capabilities-USB.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
"""
Interrogate stick for supported capabilities.

"""

import sys

from ant.core import driver
from ant.core import node

from config import *

# Initialize
stick = driver.USB2Driver(SERIAL, log=LOG, debug=DEBUG)
antnode = node.Node(stick)
antnode.start()

# Interrogate stick
# Note: This method will return immediately, as the stick's capabilities are
# interrogated on node initialization (node.start()) in order to set proper
# internal Node instance state.
capabilities = antnode.getCapabilities()

print 'Maximum channels:', capabilities[0]
print 'Maximum network keys:', capabilities[1]
print 'Standard options: %X' % capabilities[2][0]
print 'Advanced options: %X' % capabilities[2][1]

# Shutdown
antnode.stop()
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def read(fname):
install_requires=[
'distribute',
'pyserial',
'pyusb',
'msgpack-python'
],
)
62 changes: 62 additions & 0 deletions src/ant/core/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,16 @@

import thread

# USB1 driver uses a USB<->Serial bridge
import serial
# USB2 driver uses direct USB connection. Requires PyUSB
import usb.core
import usb.util

from ant.core.exceptions import DriverError

from array import *


class Driver(object):
_lock = thread.allocate_lock()
Expand Down Expand Up @@ -173,3 +179,59 @@ def _write(self, data):
raise DriverError(str(e))

return count


class USB2Driver(Driver):
def _open(self):
# Most of this is straight from the PyUSB example documentation
dev = usb.core.find(idVendor=0x0fcf, idProduct=0x1008)

if dev is None:
raise DriverError('Could not open device (not found)')
dev.set_configuration()
cfg = dev.get_active_configuration()
interface_number = cfg[(0,0)].bInterfaceNumber
alternate_setting = usb.control.get_interface(dev, interface_number)
intf = usb.util.find_descriptor(
cfg, bInterfaceNumber = interface_number,
AlternateSetting = alternate_setting
)
usb.util.claim_interface(dev, interface_number)
ep_out = usb.util.find_descriptor(
intf,
custom_match = \
lambda e: \
usb.util.endpoint_direction(e.bEndpointAddress) == \
usb.util.ENDPOINT_OUT
)
assert ep_out is not None
ep_in = usb.util.find_descriptor(
intf,
custom_match = \
lambda e: \
usb.util.endpoint_direction(e.bEndpointAddress) == \
usb.util.ENDPOINT_IN
)
assert ep_in is not None
self._ep_out = ep_out
self._ep_in = ep_in
self._dev = dev
self._int = interface_number

def _close(self):
usb.util.release_interface(self._dev, self._int)

def _read(self, count):
arr_inp = array('B')
try:
arr_inp = self._ep_in.read(count)
except usb.core.USBError:
# Timeout errors seem to occasionally be expected
pass

return arr_inp.tostring()

def _write(self, data):
count = self._ep_out.write(data)

return count