summaryrefslogtreecommitdiffstatshomepage
path: root/tools
diff options
context:
space:
mode:
authorJim Mussared <jim.mussared@gmail.com>2023-06-09 14:05:44 +1000
committerDamien George <damien@micropython.org>2024-10-09 16:12:30 +1100
commitdb59e55fe7a0b67d3af868990468e7b8056afe42 (patch)
treebfce46ffc4b753c154a675931e329c781129bcc1 /tools
parent10910219958a6b122de908beb1d5bd32b37972a2 (diff)
downloadmicropython-db59e55fe7a0b67d3af868990468e7b8056afe42.tar.gz
micropython-db59e55fe7a0b67d3af868990468e7b8056afe42.zip
tools/mpremote: Make filesystem commands use transport API.
This introduces a Python filesystem API on `Transport` that is implemented entirely with eval/exec provided by the underlying transport subclass. Updates existing mpremote filesystem commands (and `edit) to use this API. Also re-implements recursive `cp` to allow arbitrary source / destination. This work was funded through GitHub Sponsors. Signed-off-by: Jim Mussared <jim.mussared@gmail.com> Signed-off-by: Damien George <damien@micropython.org>
Diffstat (limited to 'tools')
-rw-r--r--tools/mpremote/mpremote/commands.py277
-rw-r--r--tools/mpremote/mpremote/main.py2
-rw-r--r--tools/mpremote/mpremote/mip.py32
-rw-r--r--tools/mpremote/mpremote/transport.py145
-rw-r--r--tools/mpremote/mpremote/transport_serial.py228
5 files changed, 379 insertions, 305 deletions
diff --git a/tools/mpremote/mpremote/commands.py b/tools/mpremote/mpremote/commands.py
index fcda62ad26..db2be3b131 100644
--- a/tools/mpremote/mpremote/commands.py
+++ b/tools/mpremote/mpremote/commands.py
@@ -4,8 +4,8 @@ import tempfile
import serial.tools.list_ports
-from .transport import TransportError
-from .transport_serial import SerialTransport, stdout_write_bytes
+from .transport import TransportError, stdout_write_bytes
+from .transport_serial import SerialTransport
class CommandError(Exception):
@@ -106,61 +106,238 @@ def show_progress_bar(size, total_size, op="copying"):
)
+def _remote_path_join(a, *b):
+ if not a:
+ a = "./"
+ result = a.rstrip("/")
+ for x in b:
+ result += "/" + x.strip("/")
+ return result
+
+
+def _remote_path_dirname(a):
+ a = a.rsplit("/", 1)
+ if len(a) == 1:
+ return ""
+ else:
+ return a[0]
+
+
+def _remote_path_basename(a):
+ return a.rsplit("/", 1)[-1]
+
+
+def do_filesystem_cp(state, src, dest, multiple):
+ if dest.startswith(":"):
+ dest_exists = state.transport.fs_exists(dest[1:])
+ dest_isdir = dest_exists and state.transport.fs_isdir(dest[1:])
+ else:
+ dest_exists = os.path.exists(dest)
+ dest_isdir = dest_exists and os.path.isdir(dest)
+
+ if multiple:
+ if not dest_exists:
+ raise CommandError("cp: destination does not exist")
+ if not dest_isdir:
+ raise CommandError("cp: destination is not a directory")
+
+ # Download the contents of source.
+ try:
+ if src.startswith(":"):
+ data = state.transport.fs_readfile(src[1:], progress_callback=show_progress_bar)
+ filename = _remote_path_basename(src[1:])
+ else:
+ with open(src, "rb") as f:
+ data = f.read()
+ filename = os.path.basename(src)
+ except IsADirectoryError:
+ raise CommandError("cp: -r not specified; omitting directory")
+
+ # Write back to dest.
+ if dest.startswith(":"):
+ # If the destination path is just the directory, then add the source filename.
+ if dest_isdir:
+ dest = ":" + _remote_path_join(dest[1:], filename)
+
+ # Write to remote.
+ state.transport.fs_writefile(dest[1:], data, progress_callback=show_progress_bar)
+ else:
+ # If the destination path is just the directory, then add the source filename.
+ if dest_isdir:
+ dest = os.path.join(dest, filename)
+
+ # Write to local file.
+ with open(dest, "wb") as f:
+ f.write(data)
+
+
+def do_filesystem_recursive_cp(state, src, dest, multiple):
+ # Ignore trailing / on both src and dest. (Unix cp ignores them too)
+ src = src.rstrip("/" + os.path.sep + (os.path.altsep if os.path.altsep else ""))
+ dest = dest.rstrip("/" + os.path.sep + (os.path.altsep if os.path.altsep else ""))
+
+ # If the destination directory exists, then we copy into it. Otherwise we
+ # use the destination as the target.
+ if dest.startswith(":"):
+ dest_exists = state.transport.fs_exists(dest[1:])
+ else:
+ dest_exists = os.path.exists(dest)
+
+ # Recursively find all files to copy from a directory.
+ # `dirs` will be a list of dest split paths.
+ # `files` will be a list of `(dest split path, src joined path)`.
+ dirs = []
+ files = []
+
+ # For example, if src=/tmp/foo, with /tmp/foo/x.py and /tmp/foo/a/b/c.py,
+ # and if the destination directory exists, then we will have:
+ # dirs = [['foo'], ['foo', 'a'], ['foo', 'a', 'b']]
+ # files = [(['foo', 'x.py'], '/tmp/foo/x.py'), (['foo', 'a', 'b', 'c.py'], '/tmp/foo/a/b/c.py')]
+ # If the destination doesn't exist, then we will have:
+ # dirs = [['a'], ['a', 'b']]
+ # files = [(['x.py'], '/tmp/foo/x.py'), (['a', 'b', 'c.py'], '/tmp/foo/a/b/c.py')]
+
+ def _list_recursive(base, src_path, dest_path, src_join_fun, src_isdir_fun, src_listdir_fun):
+ src_path_joined = src_join_fun(base, *src_path)
+ if src_isdir_fun(src_path_joined):
+ if dest_path:
+ dirs.append(dest_path)
+ for entry in src_listdir_fun(src_path_joined):
+ _list_recursive(
+ base,
+ src_path + [entry],
+ dest_path + [entry],
+ src_join_fun,
+ src_isdir_fun,
+ src_listdir_fun,
+ )
+ else:
+ files.append(
+ (
+ dest_path,
+ src_path_joined,
+ )
+ )
+
+ if src.startswith(":"):
+ src_dirname = [_remote_path_basename(src[1:])]
+ dest_dirname = src_dirname if dest_exists else []
+ _list_recursive(
+ _remote_path_dirname(src[1:]),
+ src_dirname,
+ dest_dirname,
+ src_join_fun=_remote_path_join,
+ src_isdir_fun=state.transport.fs_isdir,
+ src_listdir_fun=lambda p: [x.name for x in state.transport.fs_listdir(p)],
+ )
+ else:
+ src_dirname = [os.path.basename(src)]
+ dest_dirname = src_dirname if dest_exists else []
+ _list_recursive(
+ os.path.dirname(src),
+ src_dirname,
+ dest_dirname,
+ src_join_fun=os.path.join,
+ src_isdir_fun=os.path.isdir,
+ src_listdir_fun=os.listdir,
+ )
+
+ # If no directories were encountered then we must have just had a file.
+ if not dirs:
+ return do_filesystem_cp(state, src, dest, multiple)
+
+ def _mkdir(a, *b):
+ try:
+ if a.startswith(":"):
+ state.transport.fs_mkdir(_remote_path_join(a[1:], *b))
+ else:
+ os.mkdir(os.path.join(a, *b))
+ except FileExistsError:
+ pass
+
+ # Create the destination if necessary.
+ if not dest_exists:
+ _mkdir(dest)
+
+ # Create all sub-directories relative to the destination.
+ for d in dirs:
+ _mkdir(dest, *d)
+
+ # Copy all files, in sorted order to help it be deterministic.
+ files.sort()
+ for dest_path_split, src_path_joined in files:
+ if src.startswith(":"):
+ src_path_joined = ":" + src_path_joined
+
+ if dest.startswith(":"):
+ dest_path_joined = ":" + _remote_path_join(dest[1:], *dest_path_split)
+ else:
+ dest_path_joined = os.path.join(dest, *dest_path_split)
+
+ do_filesystem_cp(state, src_path_joined, dest_path_joined, multiple=False)
+
+
def do_filesystem(state, args):
state.ensure_raw_repl()
state.did_action()
- def _list_recursive(files, path):
- if os.path.isdir(path):
- for entry in os.listdir(path):
- _list_recursive(files, "/".join((path, entry)))
- else:
- files.append(os.path.split(path))
-
command = args.command[0]
paths = args.path
if command == "cat":
- # Don't be verbose by default when using cat, so output can be
- # redirected to something.
+ # Don't do verbose output for `cat` unless explicitly requested.
verbose = args.verbose is True
else:
verbose = args.verbose is not False
- if command == "cp" and args.recursive:
- if paths[-1] != ":":
- raise CommandError("'cp -r' destination must be ':'")
- paths.pop()
- src_files = []
- for path in paths:
- if path.startswith(":"):
- raise CommandError("'cp -r' source files must be local")
- _list_recursive(src_files, path)
- known_dirs = {""}
- state.transport.exec("import os")
- for dir, file in src_files:
- dir_parts = dir.split("/")
- for i in range(len(dir_parts)):
- d = "/".join(dir_parts[: i + 1])
- if d not in known_dirs:
- state.transport.exec(
- "try:\n os.mkdir('%s')\nexcept OSError as e:\n print(e)" % d
- )
- known_dirs.add(d)
- state.transport.filesystem_command(
- ["cp", "/".join((dir, file)), ":" + dir + "/"],
- progress_callback=show_progress_bar,
- verbose=verbose,
- )
+ if command == "cp":
+ # Note: cp requires the user to specify local/remote explicitly via
+ # leading ':'.
+
+ # The last argument must be the destination.
+ if len(paths) <= 1:
+ raise CommandError("cp: missing destination path")
+ cp_dest = paths[-1]
+ paths = paths[:-1]
else:
- if args.recursive:
- raise CommandError("'-r' only supported for 'cp'")
- try:
- state.transport.filesystem_command(
- [command] + paths, progress_callback=show_progress_bar, verbose=verbose
- )
- except OSError as er:
- raise CommandError(er)
+ # All other commands implicitly use remote paths. Strip the
+ # leading ':' if the user included them.
+ paths = [path[1:] if path.startswith(":") else path for path in paths]
+
+ # ls implicitly lists the cwd.
+ if command == "ls" and not paths:
+ paths = [""]
+
+ # Handle each path sequentially.
+ for path in paths:
+ if verbose:
+ if command == "cp":
+ print("{} {} {}".format(command, path, cp_dest))
+ else:
+ print("{} :{}".format(command, path))
+
+ if command == "cat":
+ state.transport.fs_printfile(path)
+ elif command == "ls":
+ for result in state.transport.fs_listdir(path):
+ print(
+ "{:12} {}{}".format(
+ result.st_size, result.name, "/" if result.st_mode & 0x4000 else ""
+ )
+ )
+ elif command == "mkdir":
+ state.transport.fs_mkdir(path)
+ elif command == "rm":
+ state.transport.fs_rmfile(path)
+ elif command == "rmdir":
+ state.transport.fs_rmdir(path)
+ elif command == "touch":
+ state.transport.fs_touchfile(path)
+ elif command == "cp":
+ if args.recursive:
+ do_filesystem_recursive_cp(state, path, cp_dest, len(paths) > 1)
+ else:
+ do_filesystem_cp(state, path, cp_dest, len(paths) > 1)
def do_edit(state, args):
@@ -174,11 +351,15 @@ def do_edit(state, args):
dest_fd, dest = tempfile.mkstemp(suffix=os.path.basename(src))
try:
print("edit :%s" % (src,))
- os.close(dest_fd)
- state.transport.fs_touch(src)
- state.transport.fs_get(src, dest, progress_callback=show_progress_bar)
+ state.transport.fs_touchfile(src)
+ data = state.transport.fs_readfile(src, progress_callback=show_progress_bar)
+ with open(dest_fd, "wb") as f:
+ f.write(data)
if os.system('%s "%s"' % (os.getenv("EDITOR"), dest)) == 0:
- state.transport.fs_put(dest, src, progress_callback=show_progress_bar)
+ with open(dest, "rb") as f:
+ state.transport.fs_writefile(
+ src, f.read(), progress_callback=show_progress_bar
+ )
finally:
os.unlink(dest)
diff --git a/tools/mpremote/mpremote/main.py b/tools/mpremote/mpremote/main.py
index e4d15fb47c..427e11c744 100644
--- a/tools/mpremote/mpremote/main.py
+++ b/tools/mpremote/mpremote/main.py
@@ -190,7 +190,7 @@ def argparse_filesystem():
"enable verbose output (defaults to True for all commands except cat)",
)
cmd_parser.add_argument(
- "command", nargs=1, help="filesystem command (e.g. cat, cp, ls, rm, touch)"
+ "command", nargs=1, help="filesystem command (e.g. cat, cp, ls, rm, rmdir, touch)"
)
cmd_parser.add_argument("path", nargs="+", help="local and remote paths")
return cmd_parser
diff --git a/tools/mpremote/mpremote/mip.py b/tools/mpremote/mpremote/mip.py
index 04f5042524..d23a0e2cbc 100644
--- a/tools/mpremote/mpremote/mip.py
+++ b/tools/mpremote/mpremote/mip.py
@@ -12,13 +12,10 @@ from .commands import CommandError, show_progress_bar
_PACKAGE_INDEX = "https://micropython.org/pi/v2"
-_CHUNK_SIZE = 128
# This implements os.makedirs(os.dirname(path))
def _ensure_path_exists(transport, path):
- import os
-
split = path.split("/")
# Handle paths starting with "/".
@@ -34,22 +31,6 @@ def _ensure_path_exists(transport, path):
prefix += "/"
-# Copy from src (stream) to dest (function-taking-bytes)
-def _chunk(src, dest, length=None, op="downloading"):
- buf = memoryview(bytearray(_CHUNK_SIZE))
- total = 0
- if length:
- show_progress_bar(0, length, op)
- while True:
- n = src.readinto(buf)
- if n == 0:
- break
- dest(buf if n == _CHUNK_SIZE else buf[:n])
- total += n
- if length:
- show_progress_bar(total, length, op)
-
-
def _rewrite_url(url, branch=None):
if not branch:
branch = "HEAD"
@@ -83,15 +64,10 @@ def _rewrite_url(url, branch=None):
def _download_file(transport, url, dest):
try:
with urllib.request.urlopen(url) as src:
- fd, path = tempfile.mkstemp()
- try:
- print("Installing:", dest)
- with os.fdopen(fd, "wb") as f:
- _chunk(src, f.write, src.length)
- _ensure_path_exists(transport, dest)
- transport.fs_put(path, dest, progress_callback=show_progress_bar)
- finally:
- os.unlink(path)
+ data = src.read()
+ print("Installing:", dest)
+ _ensure_path_exists(transport, dest)
+ transport.fs_writefile(dest, data, progress_callback=show_progress_bar)
except urllib.error.HTTPError as e:
if e.status == 404:
raise CommandError(f"File not found: {url}")
diff --git a/tools/mpremote/mpremote/transport.py b/tools/mpremote/mpremote/transport.py
index 6e9a77b2bb..2acc8c3b2b 100644
--- a/tools/mpremote/mpremote/transport.py
+++ b/tools/mpremote/mpremote/transport.py
@@ -24,10 +24,153 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
+import ast, os, sys
+from collections import namedtuple
+
+
+def stdout_write_bytes(b):
+ b = b.replace(b"\x04", b"")
+ if hasattr(sys.stdout, "buffer"):
+ sys.stdout.buffer.write(b)
+ sys.stdout.buffer.flush()
+ else:
+ text = b.decode(sys.stdout.encoding, "strict")
+ sys.stdout.write(text)
+
class TransportError(Exception):
pass
+listdir_result = namedtuple("dir_result", ["name", "st_mode", "st_ino", "st_size"])
+
+
+# Takes a Transport error (containing the text of an OSError traceback) and
+# raises it as the corresponding OSError-derived exception.
+def _convert_filesystem_error(e, info):
+ if len(e.args) >= 3:
+ if b"OSError" in e.args[2] and b"ENOENT" in e.args[2]:
+ return FileNotFoundError(info)
+ if b"OSError" in e.args[2] and b"EISDIR" in e.args[2]:
+ return IsADirectoryError(info)
+ if b"OSError" in e.args[2] and b"EEXIST" in e.args[2]:
+ return FileExistsError(info)
+ return e
+
+
class Transport:
- pass
+ def fs_listdir(self, src=""):
+ buf = bytearray()
+
+ def repr_consumer(b):
+ buf.extend(b.replace(b"\x04", b""))
+
+ cmd = "import os\nfor f in os.ilistdir(%s):\n" " print(repr(f), end=',')" % (
+ ("'%s'" % src) if src else ""
+ )
+ try:
+ buf.extend(b"[")
+ self.exec(cmd, data_consumer=repr_consumer)
+ buf.extend(b"]")
+ except TransportError as e:
+ raise _convert_filesystem_error(e, src) from None
+
+ return [
+ listdir_result(*f) if len(f) == 4 else listdir_result(*(f + (0,)))
+ for f in ast.literal_eval(buf.decode())
+ ]
+
+ def fs_stat(self, src):
+ try:
+ self.exec("import os")
+ return os.stat_result(self.eval("os.stat(%s)" % ("'%s'" % src)))
+ except TransportError as e:
+ raise _convert_filesystem_error(e, src) from None
+
+ def fs_exists(self, src):
+ try:
+ self.fs_stat(src)
+ return True
+ except OSError:
+ return False
+
+ def fs_isdir(self, src):
+ try:
+ mode = self.fs_stat(src).st_mode
+ return (mode & 0x4000) != 0
+ except OSError:
+ # Match CPython, a non-existent path is not a directory.
+ return False
+
+ def fs_printfile(self, src, chunk_size=256):
+ cmd = (
+ "with open('%s') as f:\n while 1:\n"
+ " b=f.read(%u)\n if not b:break\n print(b,end='')" % (src, chunk_size)
+ )
+ try:
+ self.exec(cmd, data_consumer=stdout_write_bytes)
+ except TransportError as e:
+ raise _convert_filesystem_error(e, src) from None
+
+ def fs_readfile(self, src, chunk_size=256, progress_callback=None):
+ if progress_callback:
+ src_size = self.fs_stat(src).st_size
+
+ contents = bytearray()
+
+ try:
+ self.exec("f=open('%s','rb')\nr=f.read" % src)
+ while True:
+ chunk = self.eval("r({})".format(chunk_size))
+ if not chunk:
+ break
+ contents.extend(chunk)
+ if progress_callback:
+ progress_callback(len(contents), src_size)
+ self.exec("f.close()")
+ except TransportError as e:
+ raise _convert_filesystem_error(e, src) from None
+
+ return contents
+
+ def fs_writefile(self, dest, data, chunk_size=256, progress_callback=None):
+ if progress_callback:
+ src_size = len(data)
+ written = 0
+
+ try:
+ self.exec("f=open('%s','wb')\nw=f.write" % dest)
+ while data:
+ chunk = data[:chunk_size]
+ self.exec("w(" + repr(chunk) + ")")
+ written += len(chunk)
+ data = data[len(chunk) :]
+ if progress_callback:
+ progress_callback(written, src_size)
+ self.exec("f.close()")
+ except TransportError as e:
+ raise _convert_filesystem_error(e, dest) from None
+
+ def fs_mkdir(self, path):
+ try:
+ self.exec("import os\nos.mkdir('%s')" % path)
+ except TransportError as e:
+ raise _convert_filesystem_error(e, path) from None
+
+ def fs_rmdir(self, path):
+ try:
+ self.exec("import os\nos.rmdir('%s')" % path)
+ except TransportError as e:
+ raise _convert_filesystem_error(e, path) from None
+
+ def fs_rmfile(self, path):
+ try:
+ self.exec("import os\nos.remove('%s')" % path)
+ except TransportError as e:
+ raise _convert_filesystem_error(e, path) from None
+
+ def fs_touchfile(self, path):
+ try:
+ self.exec("f=open('%s','a')\nf.close()" % path)
+ except TransportError as e:
+ raise _convert_filesystem_error(e, path) from None
diff --git a/tools/mpremote/mpremote/transport_serial.py b/tools/mpremote/mpremote/transport_serial.py
index 69ebf916a2..e6a41e1ffa 100644
--- a/tools/mpremote/mpremote/transport_serial.py
+++ b/tools/mpremote/mpremote/transport_serial.py
@@ -35,29 +35,12 @@
# Once the API is stabilised, the idea is that mpremote can be used both
# as a command line tool and a library for interacting with devices.
-import ast, io, errno, os, re, struct, sys, time
-from collections import namedtuple
+import ast, io, os, re, struct, sys, time
from errno import EPERM
from .console import VT_ENABLED
from .transport import TransportError, Transport
-def stdout_write_bytes(b):
- b = b.replace(b"\x04", b"")
- sys.stdout.buffer.write(b)
- sys.stdout.buffer.flush()
-
-
-listdir_result = namedtuple("dir_result", ["name", "st_mode", "st_ino", "st_size"])
-
-
-def reraise_filesystem_error(e, info):
- if len(e.args) >= 3:
- if b"OSError" in e.args[2] and b"ENOENT" in e.args[2]:
- raise FileNotFoundError(info)
- raise
-
-
class SerialTransport(Transport):
def __init__(self, device, baudrate=115200, wait=0, exclusive=True):
self.in_raw_repl = False
@@ -292,215 +275,6 @@ class SerialTransport(Transport):
pyfile = f.read()
return self.exec(pyfile)
- def fs_exists(self, src):
- try:
- self.exec("import os\nos.stat(%s)" % (("'%s'" % src) if src else ""))
- return True
- except TransportError:
- return False
-
- def fs_ls(self, src):
- cmd = (
- "import os\nfor f in os.ilistdir(%s):\n"
- " print('{:12} {}{}'.format(f[3]if len(f)>3 else 0,f[0],'/'if f[1]&0x4000 else ''))"
- % (("'%s'" % src) if src else "")
- )
- self.exec(cmd, data_consumer=stdout_write_bytes)
-
- def fs_listdir(self, src=""):
- buf = bytearray()
-
- def repr_consumer(b):
- buf.extend(b.replace(b"\x04", b""))
-
- cmd = "import os\nfor f in os.ilistdir(%s):\n" " print(repr(f), end=',')" % (
- ("'%s'" % src) if src else ""
- )
- try:
- buf.extend(b"[")
- self.exec(cmd, data_consumer=repr_consumer)
- buf.extend(b"]")
- except TransportError as e:
- reraise_filesystem_error(e, src)
-
- return [
- listdir_result(*f) if len(f) == 4 else listdir_result(*(f + (0,)))
- for f in ast.literal_eval(buf.decode())
- ]
-
- def fs_stat(self, src):
- try:
- self.exec("import os")
- return os.stat_result(self.eval("os.stat(%s)" % ("'%s'" % src)))
- except TransportError as e:
- reraise_filesystem_error(e, src)
-
- def fs_cat(self, src, chunk_size=256):
- cmd = (
- "with open('%s') as f:\n while 1:\n"
- " b=f.read(%u)\n if not b:break\n print(b,end='')" % (src, chunk_size)
- )
- self.exec(cmd, data_consumer=stdout_write_bytes)
-
- def fs_readfile(self, src, chunk_size=256):
- buf = bytearray()
-
- def repr_consumer(b):
- buf.extend(b.replace(b"\x04", b""))
-
- cmd = (
- "with open('%s', 'rb') as f:\n while 1:\n"
- " b=f.read(%u)\n if not b:break\n print(b,end='')" % (src, chunk_size)
- )
- try:
- self.exec(cmd, data_consumer=repr_consumer)
- except TransportError as e:
- reraise_filesystem_error(e, src)
- return ast.literal_eval(buf.decode())
-
- def fs_writefile(self, dest, data, chunk_size=256):
- self.exec("f=open('%s','wb')\nw=f.write" % dest)
- while data:
- chunk = data[:chunk_size]
- self.exec("w(" + repr(chunk) + ")")
- data = data[len(chunk) :]
- self.exec("f.close()")
-
- def fs_cp(self, src, dest, chunk_size=256, progress_callback=None):
- if progress_callback:
- src_size = self.fs_stat(src).st_size
- written = 0
- self.exec("fr=open('%s','rb')\nr=fr.read\nfw=open('%s','wb')\nw=fw.write" % (src, dest))
- while True:
- data_len = int(self.exec("d=r(%u)\nw(d)\nprint(len(d))" % chunk_size))
- if not data_len:
- break
- if progress_callback:
- written += data_len
- progress_callback(written, src_size)
- self.exec("fr.close()\nfw.close()")
-
- def fs_get(self, src, dest, chunk_size=256, progress_callback=None):
- if progress_callback:
- src_size = self.fs_stat(src).st_size
- written = 0
- self.exec("f=open('%s','rb')\nr=f.read" % src)
- with open(dest, "wb") as f:
- while True:
- data = bytearray()
- self.exec("print(r(%u))" % chunk_size, data_consumer=lambda d: data.extend(d))
- assert data.endswith(b"\r\n\x04")
- try:
- data = ast.literal_eval(str(data[:-3], "ascii"))
- if not isinstance(data, bytes):
- raise ValueError("Not bytes")
- except (UnicodeError, ValueError) as e:
- raise TransportError("fs_get: Could not interpret received data: %s" % str(e))
- if not data:
- break
- f.write(data)
- if progress_callback:
- written += len(data)
- progress_callback(written, src_size)
- self.exec("f.close()")
-
- def fs_put(self, src, dest, chunk_size=256, progress_callback=None):
- if progress_callback:
- src_size = os.path.getsize(src)
- written = 0
- self.exec("f=open('%s','wb')\nw=f.write" % dest)
- with open(src, "rb") as f:
- while True:
- data = f.read(chunk_size)
- if not data:
- break
- if sys.version_info < (3,):
- self.exec("w(b" + repr(data) + ")")
- else:
- self.exec("w(" + repr(data) + ")")
- if progress_callback:
- written += len(data)
- progress_callback(written, src_size)
- self.exec("f.close()")
-
- def fs_mkdir(self, dir):
- self.exec("import os\nos.mkdir('%s')" % dir)
-
- def fs_rmdir(self, dir):
- self.exec("import os\nos.rmdir('%s')" % dir)
-
- def fs_rm(self, src):
- self.exec("import os\nos.remove('%s')" % src)
-
- def fs_touch(self, src):
- self.exec("f=open('%s','a')\nf.close()" % src)
-
- def filesystem_command(self, args, progress_callback=None, verbose=False):
- def fname_remote(src):
- if src.startswith(":"):
- src = src[1:]
- # Convert all path separators to "/", because that's what a remote device uses.
- return src.replace(os.path.sep, "/")
-
- def fname_cp_dest(src, dest):
- _, src = os.path.split(src)
- if dest is None or dest == "":
- dest = src
- elif dest == ".":
- dest = "./" + src
- elif dest.endswith("/"):
- dest += src
- return dest
-
- cmd = args[0]
- args = args[1:]
- try:
- if cmd == "cp":
- srcs = args[:-1]
- dest = args[-1]
- if dest.startswith(":"):
- op_remote_src = self.fs_cp
- op_local_src = self.fs_put
- else:
- op_remote_src = self.fs_get
- op_local_src = lambda src, dest, **_: __import__("shutil").copy(src, dest)
- for src in srcs:
- if verbose:
- print("cp %s %s" % (src, dest))
- if src.startswith(":"):
- op = op_remote_src
- else:
- op = op_local_src
- src2 = fname_remote(src)
- dest2 = fname_cp_dest(src2, fname_remote(dest))
- op(src2, dest2, progress_callback=progress_callback)
- else:
- ops = {
- "cat": self.fs_cat,
- "ls": self.fs_ls,
- "mkdir": self.fs_mkdir,
- "rm": self.fs_rm,
- "rmdir": self.fs_rmdir,
- "touch": self.fs_touch,
- }
- if cmd not in ops:
- raise TransportError("'{}' is not a filesystem command".format(cmd))
- if cmd == "ls" and not args:
- args = [""]
- for src in args:
- src = fname_remote(src)
- if verbose:
- print("%s :%s" % (cmd, src))
- ops[cmd](src)
- except TransportError as er:
- if len(er.args) > 1:
- print(str(er.args[2], "ascii"))
- else:
- print(er)
- self.exit_raw_repl()
- self.close()
- sys.exit(1)
-
def mount_local(self, path, unsafe_links=False):
fout = self.serial
if not self.eval('"RemoteFS" in globals()'):