summaryrefslogtreecommitdiffstatshomepage
path: root/py/makecompresseddata.py
diff options
context:
space:
mode:
Diffstat (limited to 'py/makecompresseddata.py')
-rw-r--r--py/makecompresseddata.py200
1 files changed, 200 insertions, 0 deletions
diff --git a/py/makecompresseddata.py b/py/makecompresseddata.py
new file mode 100644
index 0000000000..26d6703a9e
--- /dev/null
+++ b/py/makecompresseddata.py
@@ -0,0 +1,200 @@
+from __future__ import print_function
+
+import collections
+import re
+import sys
+
+import gzip
+import zlib
+
+
+_COMPRESSED_MARKER = 0xFF
+
+
+def check_non_ascii(msg):
+ for c in msg:
+ if ord(c) >= 0x80:
+ print(
+ 'Unable to generate compressed data: message "{}" contains a non-ascii character "{}".'.format(
+ msg, c
+ ),
+ file=sys.stderr,
+ )
+ sys.exit(1)
+
+
+# Replace <char><space> with <char | 0x80>.
+# Trival scheme to demo/test.
+def space_compression(error_strings):
+ for line in error_strings:
+ check_non_ascii(line)
+ result = ""
+ for i in range(len(line)):
+ if i > 0 and line[i] == " ":
+ result = result[:-1]
+ result += "\\{:03o}".format(ord(line[i - 1]))
+ else:
+ result += line[i]
+ error_strings[line] = result
+ return None
+
+
+# Replace common words with <0x80 | index>.
+# Index is into a table of words stored as aaaaa<0x80|a>bbb<0x80|b>...
+# Replaced words are assumed to have spaces either side to avoid having to store the spaces in the compressed strings.
+def word_compression(error_strings):
+ topn = collections.Counter()
+
+ for line in error_strings.keys():
+ check_non_ascii(line)
+ for word in line.split(" "):
+ topn[word] += 1
+
+ # Order not just by frequency, but by expected saving. i.e. prefer a longer string that is used less frequently.
+ def bytes_saved(item):
+ w, n = item
+ return -((len(w) + 1) * (n - 1))
+
+ top128 = sorted(topn.items(), key=bytes_saved)[:128]
+
+ index = [w for w, _ in top128]
+ index_lookup = {w: i for i, w in enumerate(index)}
+
+ for line in error_strings.keys():
+ result = ""
+ need_space = False
+ for word in line.split(" "):
+ if word in index_lookup:
+ result += "\\{:03o}".format(0b10000000 | index_lookup[word])
+ need_space = False
+ else:
+ if need_space:
+ result += " "
+ need_space = True
+ result += word
+ error_strings[line] = result.strip()
+
+ return "".join(w[:-1] + "\\{:03o}".format(0b10000000 | ord(w[-1])) for w in index)
+
+
+# Replace chars in text with variable length bit sequence.
+# For comparison only (the table is not emitted).
+def huffman_compression(error_strings):
+ # https://github.com/tannewt/huffman
+ import huffman
+
+ all_strings = "".join(error_strings)
+ cb = huffman.codebook(collections.Counter(all_strings).items())
+
+ for line in error_strings:
+ b = "1"
+ for c in line:
+ b += cb[c]
+ n = len(b)
+ if n % 8 != 0:
+ n += 8 - (n % 8)
+ result = ""
+ for i in range(0, n, 8):
+ result += "\\{:03o}".format(int(b[i : i + 8], 2))
+ if len(result) > len(line) * 4:
+ result = line
+ error_strings[line] = result
+
+ # TODO: This would be the prefix lengths and the table ordering.
+ return "_" * (10 + len(cb))
+
+
+# Replace common N-letter sequences with <0x80 | index>, where
+# the common sequences are stored in a separate table.
+# This isn't very useful, need a smarter way to find top-ngrams.
+def ngram_compression(error_strings):
+ topn = collections.Counter()
+ N = 2
+
+ for line in error_strings.keys():
+ check_non_ascii(line)
+ if len(line) < N:
+ continue
+ for i in range(0, len(line) - N, N):
+ topn[line[i : i + N]] += 1
+
+ def bytes_saved(item):
+ w, n = item
+ return -(len(w) * (n - 1))
+
+ top128 = sorted(topn.items(), key=bytes_saved)[:128]
+
+ index = [w for w, _ in top128]
+ index_lookup = {w: i for i, w in enumerate(index)}
+
+ for line in error_strings.keys():
+ result = ""
+ for i in range(0, len(line) - N + 1, N):
+ word = line[i : i + N]
+ if word in index_lookup:
+ result += "\\{:03o}".format(0b10000000 | index_lookup[word])
+ else:
+ result += word
+ if len(line) % N != 0:
+ result += line[len(line) - len(line) % N :]
+ error_strings[line] = result.strip()
+
+ return "".join(index)
+
+
+def main(collected_path, fn):
+ error_strings = {}
+ max_uncompressed_len = 0
+ num_uses = 0
+
+ # Read in all MP_ERROR_TEXT strings.
+ with open(collected_path, "r") as f:
+ for line in f:
+ line = line.strip()
+ if not line:
+ continue
+ num_uses += 1
+ error_strings[line] = None
+ max_uncompressed_len = max(max_uncompressed_len, len(line))
+
+ # So that objexcept.c can figure out how big the buffer needs to be.
+ print("#define MP_MAX_UNCOMPRESSED_TEXT_LEN ({})".format(max_uncompressed_len))
+
+ # Run the compression.
+ compressed_data = fn(error_strings)
+
+ # Print the data table.
+ print('MP_COMPRESSED_DATA("{}")'.format(compressed_data))
+
+ # Print the replacements.
+ for uncomp, comp in error_strings.items():
+ print('MP_MATCH_COMPRESSED("{}", "\\{:03o}{}")'.format(uncomp, _COMPRESSED_MARKER, comp))
+
+ # Used to calculate the "true" length of the (escaped) compressed strings.
+ def unescape(s):
+ return re.sub(r"\\\d\d\d", "!", s)
+
+ # Stats. Note this doesn't include the cost of the decompressor code.
+ uncomp_len = sum(len(s) + 1 for s in error_strings.keys())
+ comp_len = sum(1 + len(unescape(s)) + 1 for s in error_strings.values())
+ data_len = len(compressed_data) + 1 if compressed_data else 0
+ print("// Total input length: {}".format(uncomp_len))
+ print("// Total compressed length: {}".format(comp_len))
+ print("// Total data length: {}".format(data_len))
+ print("// Predicted saving: {}".format(uncomp_len - comp_len - data_len))
+
+ # Somewhat meaningless comparison to zlib/gzip.
+ all_input_bytes = "\\0".join(error_strings.keys()).encode()
+ print()
+ if hasattr(gzip, "compress"):
+ gzip_len = len(gzip.compress(all_input_bytes)) + num_uses * 4
+ print("// gzip length: {}".format(gzip_len))
+ print("// Percentage of gzip: {:.1f}%".format(100 * (comp_len + data_len) / gzip_len))
+ if hasattr(zlib, "compress"):
+ zlib_len = len(zlib.compress(all_input_bytes)) + num_uses * 4
+ print("// zlib length: {}".format(zlib_len))
+ print("// Percentage of zlib: {:.1f}%".format(100 * (comp_len + data_len) / zlib_len))
+
+
+if __name__ == "__main__":
+ main(sys.argv[1], word_compression)