summaryrefslogtreecommitdiffstatshomepage
diff options
context:
space:
mode:
authorPaul Sokolovsky <pfalcon@users.sourceforge.net>2017-04-04 17:46:02 +0300
committerPaul Sokolovsky <pfalcon@users.sourceforge.net>2017-04-04 17:46:02 +0300
commit647e72ca63a345a5d6de16fe359bbc3b7c6615ec (patch)
tree385288a083a2deda810447e7e7a6c6177381b412
parent58168c8e6bcf3619d281a8c877ef20e5efe049d2 (diff)
downloadmicropython-647e72ca63a345a5d6de16fe359bbc3b7c6615ec.tar.gz
micropython-647e72ca63a345a5d6de16fe359bbc3b7c6615ec.zip
tools/pyboard: Add "exec" and "execpty" pseudo-devices support.
This allows to execute a command and communicate with its stdin/stdout via pipes ("exec") or with command-created pseudo-terminal ("execpty"), to emulate serial access. Immediate usecase is controlling a QEMU process which emulates board's serial via normal console, but it could be used e.g. with helper binaries to access real board over other hadware protocols, etc. An example of device specification for these cases is: --device exec:../zephyr/qemu.sh --device execpty:../zephyr/qemu2.sh Where qemu.sh contains long-long qemu startup line, or calls another command. There's a special support in this patch for running the command in a new terminal session, to support shell wrappers like that (without new terminal session, only wrapper script would be terminated, but its child processes would continue to run).
-rwxr-xr-xtools/pyboard.py85
1 files changed, 84 insertions, 1 deletions
diff --git a/tools/pyboard.py b/tools/pyboard.py
index f368455f5e..131634c906 100755
--- a/tools/pyboard.py
+++ b/tools/pyboard.py
@@ -39,6 +39,7 @@ Or:
import sys
import time
+import os
try:
stdout = sys.stdout.buffer
@@ -116,9 +117,91 @@ class TelnetToSerial:
else:
return n_waiting
+
+class ProcessToSerial:
+ "Execute a process and emulate serial connection using its stdin/stdout."
+
+ def __init__(self, cmd):
+ import subprocess
+ self.subp = subprocess.Popen(cmd.split(), bufsize=0, shell=True, preexec_fn=os.setsid,
+ stdin=subprocess.PIPE, stdout=subprocess.PIPE)
+
+ # Initially was implemented with selectors, but that adds Python3
+ # dependency. However, there can be race conditions communicating
+ # with a particular child process (like QEMU), and selectors may
+ # still work better in that case, so left inplace for now.
+ #
+ #import selectors
+ #self.sel = selectors.DefaultSelector()
+ #self.sel.register(self.subp.stdout, selectors.EVENT_READ)
+
+ import select
+ self.poll = select.poll()
+ self.poll.register(self.subp.stdout.fileno())
+
+ def close(self):
+ import signal
+ os.killpg(os.getpgid(self.subp.pid), signal.SIGTERM)
+
+ def read(self, size=1):
+ data = b""
+ while len(data) < size:
+ data += self.subp.stdout.read(size - len(data))
+ return data
+
+ def write(self, data):
+ self.subp.stdin.write(data)
+ return len(data)
+
+ def inWaiting(self):
+ #res = self.sel.select(0)
+ res = self.poll.poll(0)
+ if res:
+ return 1
+ return 0
+
+
+class ProcessPtyToTerminal:
+ """Execute a process which creates a PTY and prints slave PTY as
+ first line of its output, and emulate serial connection using
+ this PTY."""
+
+ def __init__(self, cmd):
+ import subprocess
+ import re
+ import serial
+ self.subp = subprocess.Popen(cmd.split(), bufsize=0, shell=True, preexec_fn=os.setsid,
+ stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ pty_line = self.subp.stderr.readline().decode("utf-8")
+ m = re.search(r"/dev/pts/[0-9]+", pty_line)
+ if not m:
+ print("Error: unable to find PTY device in startup line:", pty_line)
+ self.close()
+ sys.exit(1)
+ pty = m.group()
+ self.ser = serial.Serial(pty, interCharTimeout=1)
+
+ def close(self):
+ import signal
+ os.killpg(os.getpgid(self.subp.pid), signal.SIGTERM)
+
+ def read(self, size=1):
+ return self.ser.read(size)
+
+ def write(self, data):
+ return self.ser.write(data)
+
+ def inWaiting(self):
+ return self.ser.inWaiting()
+
+
class Pyboard:
def __init__(self, device, baudrate=115200, user='micro', password='python', wait=0):
- if device and device[0].isdigit() and device[-1].isdigit() and device.count('.') == 3:
+ if device.startswith("exec:"):
+ self.serial = ProcessToSerial(device[len("exec:"):])
+ elif device.startswith("execpty:"):
+ self.serial = ProcessPtyToTerminal(device[len("qemupty:"):])
+ elif device and device[0].isdigit() and device[-1].isdigit() and device.count('.') == 3:
# device looks like an IP address
self.serial = TelnetToSerial(device, user, password, read_timeout=10)
else: