aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/Tools/c-analyzer/c_common/tables.py
diff options
context:
space:
mode:
Diffstat (limited to 'Tools/c-analyzer/c_common/tables.py')
-rw-r--r--Tools/c-analyzer/c_common/tables.py213
1 files changed, 213 insertions, 0 deletions
diff --git a/Tools/c-analyzer/c_common/tables.py b/Tools/c-analyzer/c_common/tables.py
new file mode 100644
index 00000000000..70a230a90b6
--- /dev/null
+++ b/Tools/c-analyzer/c_common/tables.py
@@ -0,0 +1,213 @@
+import csv
+
+from . import NOT_SET, strutil, fsutil
+
+
+EMPTY = '-'
+UNKNOWN = '???'
+
+
+def parse_markers(markers, default=None):
+ if markers is NOT_SET:
+ return default
+ if not markers:
+ return None
+ if type(markers) is not str:
+ return markers
+ if markers == markers[0] * len(markers):
+ return [markers]
+ return list(markers)
+
+
+def fix_row(row, **markers):
+ if isinstance(row, str):
+ raise NotImplementedError(row)
+ empty = parse_markers(markers.pop('empty', ('-',)))
+ unknown = parse_markers(markers.pop('unknown', ('???',)))
+ row = (val if val else None for val in row)
+ if not empty:
+ if not unknown:
+ return row
+ return (UNKNOWN if val in unknown else val for val in row)
+ elif not unknown:
+ return (EMPTY if val in empty else val for val in row)
+ return (EMPTY if val in empty else (UNKNOWN if val in unknown else val)
+ for val in row)
+
+
+def _fix_read_default(row):
+ for value in row:
+ yield value.strip()
+
+
+def _fix_write_default(row, empty=''):
+ for value in row:
+ yield empty if value is None else str(value)
+
+
+def _normalize_fix_read(fix):
+ if fix is None:
+ fix = ''
+ if callable(fix):
+ def fix_row(row):
+ values = fix(row)
+ return _fix_read_default(values)
+ elif isinstance(fix, str):
+ def fix_row(row):
+ values = _fix_read_default(row)
+ return (None if v == fix else v
+ for v in values)
+ else:
+ raise NotImplementedError(fix)
+ return fix_row
+
+
+def _normalize_fix_write(fix, empty=''):
+ if fix is None:
+ fix = empty
+ if callable(fix):
+ def fix_row(row):
+ values = fix(row)
+ return _fix_write_default(values, empty)
+ elif isinstance(fix, str):
+ def fix_row(row):
+ return _fix_write_default(row, fix)
+ else:
+ raise NotImplementedError(fix)
+ return fix_row
+
+
+def read_table(infile, header, *,
+ sep='\t',
+ fix=None,
+ _open=open,
+ _get_reader=csv.reader,
+ ):
+ """Yield each row of the given ???-separated (e.g. tab) file."""
+ if isinstance(infile, str):
+ with _open(infile, newline='') as infile:
+ yield from read_table(
+ infile,
+ header,
+ sep=sep,
+ fix=fix,
+ _open=_open,
+ _get_reader=_get_reader,
+ )
+ return
+ lines = strutil._iter_significant_lines(infile)
+
+ # Validate the header.
+ if not isinstance(header, str):
+ header = sep.join(header)
+ try:
+ actualheader = next(lines).strip()
+ except StopIteration:
+ actualheader = ''
+ if actualheader != header:
+ raise ValueError(f'bad header {actualheader!r}')
+
+ fix_row = _normalize_fix_read(fix)
+ for row in _get_reader(lines, delimiter=sep or '\t'):
+ yield tuple(fix_row(row))
+
+
+def write_table(outfile, header, rows, *,
+ sep='\t',
+ fix=None,
+ backup=True,
+ _open=open,
+ _get_writer=csv.writer,
+ ):
+ """Write each of the rows to the given ???-separated (e.g. tab) file."""
+ if backup:
+ fsutil.create_backup(outfile, backup)
+ if isinstance(outfile, str):
+ with _open(outfile, 'w', newline='') as outfile:
+ return write_table(
+ outfile,
+ header,
+ rows,
+ sep=sep,
+ fix=fix,
+ backup=backup,
+ _open=_open,
+ _get_writer=_get_writer,
+ )
+
+ if isinstance(header, str):
+ header = header.split(sep or '\t')
+ fix_row = _normalize_fix_write(fix)
+ writer = _get_writer(outfile, delimiter=sep or '\t')
+ writer.writerow(header)
+ for row in rows:
+ writer.writerow(
+ tuple(fix_row(row))
+ )
+
+
+def parse_table(entries, sep, header=None, rawsep=None, *,
+ default=NOT_SET,
+ strict=True,
+ ):
+ header, sep = _normalize_table_file_props(header, sep)
+ if not sep:
+ raise ValueError('missing "sep"')
+
+ ncols = None
+ if header:
+ if strict:
+ ncols = len(header.split(sep))
+ cur_file = None
+ for line, filename in strutil.parse_entries(entries, ignoresep=sep):
+ _sep = sep
+ if filename:
+ if header and cur_file != filename:
+ cur_file = filename
+ # Skip the first line if it's the header.
+ if line.strip() == header:
+ continue
+ else:
+ # We expected the header.
+ raise NotImplementedError((header, line))
+ elif rawsep and sep not in line:
+ _sep = rawsep
+
+ row = _parse_row(line, _sep, ncols, default)
+ if strict and not ncols:
+ ncols = len(row)
+ yield row, filename
+
+
+def parse_row(line, sep, *, ncols=None, default=NOT_SET):
+ if not sep:
+ raise ValueError('missing "sep"')
+ return _parse_row(line, sep, ncols, default)
+
+
+def _parse_row(line, sep, ncols, default):
+ row = tuple(v.strip() for v in line.split(sep))
+ if (ncols or 0) > 0:
+ diff = ncols - len(row)
+ if diff:
+ if default is NOT_SET or diff < 0:
+ raise Exception(f'bad row (expected {ncols} columns, got {row!r})')
+ row += (default,) * diff
+ return row
+
+
+def _normalize_table_file_props(header, sep):
+ if not header:
+ return None, sep
+
+ if not isinstance(header, str):
+ if not sep:
+ raise NotImplementedError(header)
+ header = sep.join(header)
+ elif not sep:
+ for sep in ('\t', ',', ' '):
+ if sep in header:
+ break
+ else:
+ sep = None
+ return header, sep