summaryrefslogtreecommitdiffstatshomepage
path: root/tools/pyboard.py
diff options
context:
space:
mode:
Diffstat (limited to 'tools/pyboard.py')
-rwxr-xr-xtools/pyboard.py91
1 files changed, 89 insertions, 2 deletions
diff --git a/tools/pyboard.py b/tools/pyboard.py
index d4ce8b7887..5eac030bdd 100755
--- a/tools/pyboard.py
+++ b/tools/pyboard.py
@@ -39,6 +39,7 @@ Or:
import sys
import time
+import os
try:
stdout = sys.stdout.buffer
@@ -116,9 +117,93 @@ class TelnetToSerial:
else:
return n_waiting
+
+class ProcessToSerial:
+ "Execute a process and emulate serial connection using its stdin/stdout."
+
+ def __init__(self, cmd):
+ import subprocess
+ self.subp = subprocess.Popen(cmd.split(), bufsize=0, shell=True, preexec_fn=os.setsid,
+ stdin=subprocess.PIPE, stdout=subprocess.PIPE)
+
+ # Initially was implemented with selectors, but that adds Python3
+ # dependency. However, there can be race conditions communicating
+ # with a particular child process (like QEMU), and selectors may
+ # still work better in that case, so left inplace for now.
+ #
+ #import selectors
+ #self.sel = selectors.DefaultSelector()
+ #self.sel.register(self.subp.stdout, selectors.EVENT_READ)
+
+ import select
+ self.poll = select.poll()
+ self.poll.register(self.subp.stdout.fileno())
+
+ def close(self):
+ import signal
+ os.killpg(os.getpgid(self.subp.pid), signal.SIGTERM)
+
+ def read(self, size=1):
+ data = b""
+ while len(data) < size:
+ data += self.subp.stdout.read(size - len(data))
+ return data
+
+ def write(self, data):
+ self.subp.stdin.write(data)
+ return len(data)
+
+ def inWaiting(self):
+ #res = self.sel.select(0)
+ res = self.poll.poll(0)
+ if res:
+ return 1
+ return 0
+
+
+class ProcessPtyToTerminal:
+ """Execute a process which creates a PTY and prints slave PTY as
+ first line of its output, and emulate serial connection using
+ this PTY."""
+
+ def __init__(self, cmd):
+ import subprocess
+ import re
+ import serial
+ self.subp = subprocess.Popen(cmd.split(), bufsize=0, shell=False, preexec_fn=os.setsid,
+ stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ pty_line = self.subp.stderr.readline().decode("utf-8")
+ m = re.search(r"/dev/pts/[0-9]+", pty_line)
+ if not m:
+ print("Error: unable to find PTY device in startup line:", pty_line)
+ self.close()
+ sys.exit(1)
+ pty = m.group()
+ # rtscts, dsrdtr params are to workaround pyserial bug:
+ # http://stackoverflow.com/questions/34831131/pyserial-does-not-play-well-with-virtual-port
+ self.ser = serial.Serial(pty, interCharTimeout=1, rtscts=True, dsrdtr=True)
+
+ def close(self):
+ import signal
+ os.killpg(os.getpgid(self.subp.pid), signal.SIGTERM)
+
+ def read(self, size=1):
+ return self.ser.read(size)
+
+ def write(self, data):
+ return self.ser.write(data)
+
+ def inWaiting(self):
+ return self.ser.inWaiting()
+
+
class Pyboard:
def __init__(self, device, baudrate=115200, user='micro', password='python', wait=0):
- if device and device[0].isdigit() and device[-1].isdigit() and device.count('.') == 3:
+ if device.startswith("exec:"):
+ self.serial = ProcessToSerial(device[len("exec:"):])
+ elif device.startswith("execpty:"):
+ self.serial = ProcessPtyToTerminal(device[len("qemupty:"):])
+ elif device and device[0].isdigit() and device[-1].isdigit() and device.count('.') == 3:
# device looks like an IP address
self.serial = TelnetToSerial(device, user, password, read_timeout=10)
else:
@@ -234,7 +319,7 @@ class Pyboard:
# check if we could exec command
data = self.serial.read(2)
if data != b'OK':
- raise PyboardError('could not exec command')
+ raise PyboardError('could not exec command (response: %s)' % data)
def exec_raw(self, command, timeout=10, data_consumer=None):
self.exec_raw_no_follow(command);
@@ -300,6 +385,7 @@ def main():
pyb.enter_raw_repl()
except PyboardError as er:
print(er)
+ pyb.close()
sys.exit(1)
def execbuffer(buf):
@@ -307,6 +393,7 @@ def main():
ret, ret_err = pyb.exec_raw(buf, timeout=None, data_consumer=stdout_write_bytes)
except PyboardError as er:
print(er)
+ pyb.close()
sys.exit(1)
except KeyboardInterrupt:
sys.exit(1)