Source code for newfocus8742.tcp
import logging
import asyncio
from .protocol import NewFocus8742Protocol
logger = logging.getLogger(__name__)
[docs]class NewFocus8742TCP(NewFocus8742Protocol):
eol_write = b"\r"
eol_read = b"\r\n"
def __init__(self, reader, writer):
self._reader = reader
self._writer = writer
[docs] @classmethod
async def connect(cls, host, port=23, **kwargs):
"""Connect to a Newfocus/Newport 8742 controller over Ethernet/TCP.
Args:
host (str): Hostname or IP address of the target device.
Returns:
NewFocus8742: Driver instance.
"""
reader, writer = await asyncio.open_connection(host, port, **kwargs)
# undocumented? garbage?
v = await reader.read(6)
logger.debug("identifier/serial (?): %s", v)
return cls(reader, writer)
def __enter__(self):
return self
def __exit__(self, *exc):
self.close()
def close(self):
self._writer.close()
def _writeline(self, cmd):
self._writer.write(cmd.encode() + self.eol_write)
async def _readline(self):
r = await self._reader.readline()
assert r.endswith(self.eol_read)
return r[:-2].decode()