Source code for newfocus8742.sim

import asyncio
import logging
import re
import random


from .protocol import NewFocus8742Protocol


logger = logging.getLogger(__name__)


[docs]class NewFocus8742Sim(NewFocus8742Protocol): channels = 4 def __init__(self): self.position = [0 for i in range(self.channels)] self.home = [0 for i in range(self.channels)] self.target = [0 for i in range(self.channels)] self.velocity = [2000 for i in range(self.channels)] self.acceleration = [100000 for i in range(self.channels)] self.pending = []
[docs] @classmethod async def connect(cls, *args, **kwargs): """Connect to a Newfocus/Newport 8742 controller simulation. Args: any: ignored Returns: NewFocus8742: Driver instance. """ return cls()
def __enter__(self): return self def __exit__(self, *exc): self.close() def close(self): if self.pending: raise ValueError("pending data {}".format(self.pending)) def _writeline(self, cmd): m = re.match(r"^(?P<xx>\d)?\s*\*?(?P<cmd>[a-zA-Z]+)\s*" r"(?P<nn>\d+(,\s*\d+)*)?(?P<ask>\?)?$", cmd) assert m d = m.groupdict() if d["ask"] == "?": f = getattr(self, "ask_{}".format(d["cmd"].lower()), None) else: f = getattr(self, "do_{}".format(d["cmd"].lower()), None) kwargs = {} if d["xx"] is not None: kwargs["xx"] = int(d["xx"]) if d["nn"]: args = tuple(int(i) for i in d["nn"].split(",")) else: args = () ret = None if f: ret = str(f(*args, **kwargs)) else: logger.warning("cmd ignored: %s", d["cmd"]) if d["ask"] == "?": assert ret self.pending.append(ret) async def _readline(self): return self.pending.pop(0) def ask_tb(self): return "0, NO ERROR" def ask_te(self): return 0 def ask_idn(self): return "Newfocus 8742, simulated" def do_va(self, nn, xx): assert 1 <= xx <= 4 self.velocity[xx - 1] = nn def ask_va(self, xx): assert 1 <= xx <= 4 return self.velocity[xx - 1] def do_pa(self, nn, xx): assert 1 <= xx <= 4 self.position[xx - 1] = nn def ask_pa(self, xx): assert 1 <= xx <= 4 return self.position[xx - 1] def do_pr(self, nn, xx): assert 1 <= xx <= 4 self.position[xx - 1] += nn def ask_pr(self, xx): assert 1 <= xx <= 4 return 0 def ask_tp(self, xx): assert 1 <= xx <= 4 return self.position[xx - 1] - self.home[xx - 1] def do_ac(self, nn, xx): assert 1 <= xx <= 4 self.acceleration[xx - 1] = nn def ask_ac(self, xx): assert 1 <= xx <= 4 return self.acceleration[xx - 1] def do_sm(self, xx=None): pass def do_ab(self, xx=None): pass def do_qm(self, *nn, xx): assert 1 <= xx <= 4 pass def ask_qm(self, xx): return 2 def ask_dh(self, xx): assert 1 <= xx <= 4 return self.home[xx - 1] def do_dh(self, *nn, xx): assert 1 <= xx <= 4 nn = nn[0] if nn else 0 self.position[xx - 1] += self.home[xx - 1] - nn self.home[xx - 1] = nn def ask_md(self, xx): assert 1 <= xx <= 4 return random.randint(0, 1) def do_mv(self, *nn, xx): assert 1 <= xx <= 4 nn = nn[0] if nn else 1 self.position[xx - 1] += nn def ask_sa(self): return 0 def ask_sc(self): return 0 def ask_sd(self): return 0 def ask_ve(self): return self.ask_idn() def ask_zz(self): return 0 def ask_gateway(self): return 0 def ask_netmask(self): return 0 def ask_hostname(self): return 0 def ask_ipaddr(self): return 0 def ask_ipmode(self): return 0 def ask_macaddr(self): return 0