diff options
author | Paul Sokolovsky <pfalcon@users.sourceforge.net> | 2017-04-04 17:46:02 +0300 |
---|---|---|
committer | Paul Sokolovsky <pfalcon@users.sourceforge.net> | 2017-04-04 17:46:02 +0300 |
commit | 647e72ca63a345a5d6de16fe359bbc3b7c6615ec (patch) | |
tree | 385288a083a2deda810447e7e7a6c6177381b412 | |
parent | 58168c8e6bcf3619d281a8c877ef20e5efe049d2 (diff) | |
download | micropython-647e72ca63a345a5d6de16fe359bbc3b7c6615ec.tar.gz micropython-647e72ca63a345a5d6de16fe359bbc3b7c6615ec.zip |
tools/pyboard: Add "exec" and "execpty" pseudo-devices support.
This allows to execute a command and communicate with its stdin/stdout
via pipes ("exec") or with command-created pseudo-terminal ("execpty"),
to emulate serial access. Immediate usecase is controlling a QEMU process
which emulates board's serial via normal console, but it could be used
e.g. with helper binaries to access real board over other hadware
protocols, etc.
An example of device specification for these cases is:
--device exec:../zephyr/qemu.sh
--device execpty:../zephyr/qemu2.sh
Where qemu.sh contains long-long qemu startup line, or calls another
command. There's a special support in this patch for running the command
in a new terminal session, to support shell wrappers like that (without
new terminal session, only wrapper script would be terminated, but its
child processes would continue to run).
-rwxr-xr-x | tools/pyboard.py | 85 |
1 files changed, 84 insertions, 1 deletions
diff --git a/tools/pyboard.py b/tools/pyboard.py index f368455f5e..131634c906 100755 --- a/tools/pyboard.py +++ b/tools/pyboard.py @@ -39,6 +39,7 @@ Or: import sys import time +import os try: stdout = sys.stdout.buffer @@ -116,9 +117,91 @@ class TelnetToSerial: else: return n_waiting + +class ProcessToSerial: + "Execute a process and emulate serial connection using its stdin/stdout." + + def __init__(self, cmd): + import subprocess + self.subp = subprocess.Popen(cmd.split(), bufsize=0, shell=True, preexec_fn=os.setsid, + stdin=subprocess.PIPE, stdout=subprocess.PIPE) + + # Initially was implemented with selectors, but that adds Python3 + # dependency. However, there can be race conditions communicating + # with a particular child process (like QEMU), and selectors may + # still work better in that case, so left inplace for now. + # + #import selectors + #self.sel = selectors.DefaultSelector() + #self.sel.register(self.subp.stdout, selectors.EVENT_READ) + + import select + self.poll = select.poll() + self.poll.register(self.subp.stdout.fileno()) + + def close(self): + import signal + os.killpg(os.getpgid(self.subp.pid), signal.SIGTERM) + + def read(self, size=1): + data = b"" + while len(data) < size: + data += self.subp.stdout.read(size - len(data)) + return data + + def write(self, data): + self.subp.stdin.write(data) + return len(data) + + def inWaiting(self): + #res = self.sel.select(0) + res = self.poll.poll(0) + if res: + return 1 + return 0 + + +class ProcessPtyToTerminal: + """Execute a process which creates a PTY and prints slave PTY as + first line of its output, and emulate serial connection using + this PTY.""" + + def __init__(self, cmd): + import subprocess + import re + import serial + self.subp = subprocess.Popen(cmd.split(), bufsize=0, shell=True, preexec_fn=os.setsid, + stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + pty_line = self.subp.stderr.readline().decode("utf-8") + m = re.search(r"/dev/pts/[0-9]+", pty_line) + if not m: + print("Error: unable to find PTY device in startup line:", pty_line) + self.close() + sys.exit(1) + pty = m.group() + self.ser = serial.Serial(pty, interCharTimeout=1) + + def close(self): + import signal + os.killpg(os.getpgid(self.subp.pid), signal.SIGTERM) + + def read(self, size=1): + return self.ser.read(size) + + def write(self, data): + return self.ser.write(data) + + def inWaiting(self): + return self.ser.inWaiting() + + class Pyboard: def __init__(self, device, baudrate=115200, user='micro', password='python', wait=0): - if device and device[0].isdigit() and device[-1].isdigit() and device.count('.') == 3: + if device.startswith("exec:"): + self.serial = ProcessToSerial(device[len("exec:"):]) + elif device.startswith("execpty:"): + self.serial = ProcessPtyToTerminal(device[len("qemupty:"):]) + elif device and device[0].isdigit() and device[-1].isdigit() and device.count('.') == 3: # device looks like an IP address self.serial = TelnetToSerial(device, user, password, read_timeout=10) else: |