summaryrefslogtreecommitdiffstatshomepage
diff options
context:
space:
mode:
-rwxr-xr-xtools/pyboard.py85
1 files changed, 84 insertions, 1 deletions
diff --git a/tools/pyboard.py b/tools/pyboard.py
index f368455f5e..131634c906 100755
--- a/tools/pyboard.py
+++ b/tools/pyboard.py
@@ -39,6 +39,7 @@ Or:
import sys
import time
+import os
try:
stdout = sys.stdout.buffer
@@ -116,9 +117,91 @@ class TelnetToSerial:
else:
return n_waiting
+
+class ProcessToSerial:
+ "Execute a process and emulate serial connection using its stdin/stdout."
+
+ def __init__(self, cmd):
+ import subprocess
+ self.subp = subprocess.Popen(cmd.split(), bufsize=0, shell=True, preexec_fn=os.setsid,
+ stdin=subprocess.PIPE, stdout=subprocess.PIPE)
+
+ # Initially was implemented with selectors, but that adds Python3
+ # dependency. However, there can be race conditions communicating
+ # with a particular child process (like QEMU), and selectors may
+ # still work better in that case, so left inplace for now.
+ #
+ #import selectors
+ #self.sel = selectors.DefaultSelector()
+ #self.sel.register(self.subp.stdout, selectors.EVENT_READ)
+
+ import select
+ self.poll = select.poll()
+ self.poll.register(self.subp.stdout.fileno())
+
+ def close(self):
+ import signal
+ os.killpg(os.getpgid(self.subp.pid), signal.SIGTERM)
+
+ def read(self, size=1):
+ data = b""
+ while len(data) < size:
+ data += self.subp.stdout.read(size - len(data))
+ return data
+
+ def write(self, data):
+ self.subp.stdin.write(data)
+ return len(data)
+
+ def inWaiting(self):
+ #res = self.sel.select(0)
+ res = self.poll.poll(0)
+ if res:
+ return 1
+ return 0
+
+
+class ProcessPtyToTerminal:
+ """Execute a process which creates a PTY and prints slave PTY as
+ first line of its output, and emulate serial connection using
+ this PTY."""
+
+ def __init__(self, cmd):
+ import subprocess
+ import re
+ import serial
+ self.subp = subprocess.Popen(cmd.split(), bufsize=0, shell=True, preexec_fn=os.setsid,
+ stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ pty_line = self.subp.stderr.readline().decode("utf-8")
+ m = re.search(r"/dev/pts/[0-9]+", pty_line)
+ if not m:
+ print("Error: unable to find PTY device in startup line:", pty_line)
+ self.close()
+ sys.exit(1)
+ pty = m.group()
+ self.ser = serial.Serial(pty, interCharTimeout=1)
+
+ def close(self):
+ import signal
+ os.killpg(os.getpgid(self.subp.pid), signal.SIGTERM)
+
+ def read(self, size=1):
+ return self.ser.read(size)
+
+ def write(self, data):
+ return self.ser.write(data)
+
+ def inWaiting(self):
+ return self.ser.inWaiting()
+
+
class Pyboard:
def __init__(self, device, baudrate=115200, user='micro', password='python', wait=0):
- if device and device[0].isdigit() and device[-1].isdigit() and device.count('.') == 3:
+ if device.startswith("exec:"):
+ self.serial = ProcessToSerial(device[len("exec:"):])
+ elif device.startswith("execpty:"):
+ self.serial = ProcessPtyToTerminal(device[len("qemupty:"):])
+ elif device and device[0].isdigit() and device[-1].isdigit() and device.count('.') == 3:
# device looks like an IP address
self.serial = TelnetToSerial(device, user, password, read_timeout=10)
else: