Source code for newfocus8742.usb
import usb.core
import usb.util
from .protocol import NewFocus8742Protocol
[docs]class NewFocus8742USB(NewFocus8742Protocol):
eol_write = b"\r"
eol_read = b"\r\n"
def __init__(self, dev):
self.dev = dev
# dev.set_configuration() # breaks the second invocation
cfg = dev.get_active_configuration()
intf = cfg[(0, 0)]
self.ep_out = usb.util.find_descriptor(intf,
custom_match=lambda e:
usb.util.endpoint_direction(e.bEndpointAddress) ==
usb.util.ENDPOINT_OUT)
assert self.ep_out is not None
assert self.ep_out.wMaxPacketSize == 64
self.ep_in = usb.util.find_descriptor(intf,
custom_match=lambda e:
usb.util.endpoint_direction(e.bEndpointAddress) ==
usb.util.ENDPOINT_IN)
assert self.ep_in is not None
assert self.ep_in.wMaxPacketSize == 64
self.flush()
[docs] @classmethod
async def connect(cls, idVendor=0x104d, idProduct=0x4000, **kwargs):
"""Connect to a Newfocus/Newport 8742 controller over USB.
Args:
**kwargs: passed to `usb.core.find`
Returns:
NewFocus8742: Driver instance.
"""
dev = usb.core.find(idProduct=idProduct, idVendor=idVendor,
**kwargs)
if dev is None:
raise ValueError("Device not found")
return cls(dev)
[docs] def flush(self):
"""Drain the input buffer from read data."""
while True:
try:
self.ep_in.read(64, timeout=10)
except usb.core.USBError:
break
def close(self):
usb.util.dispose_resources(self.dev)
def __enter__(self):
return self
def __exit__(self, *exc):
self.close()
def _writeline(self, cmd):
self.ep_out.write(cmd.encode() + self.eol_write)
async def _readline(self):
# This is obviously not asynchronous
r = self.ep_in.read(64).tobytes()
assert r.endswith(self.eol_read)
r = r[:-2].decode()
return r