Backend some buffers

This commit is contained in:
rocky
2019-11-09 18:49:40 -05:00
parent 5de5d2357f
commit 1c0c54991e
2 changed files with 108 additions and 62 deletions

View File

@@ -34,12 +34,11 @@ from __future__ import print_function
import sys
from collections import deque
import uncompyle6
from xdis.code import iscode
from xdis.load import check_object_path, load_module
from uncompyle6.scanner import get_scanner
def disco(version, co, out=None, is_pypy=False):
"""
diassembles and deparses a given code block 'co'
@@ -49,10 +48,9 @@ def disco(version, co, out=None, is_pypy=False):
# store final output stream for case of error
real_out = out or sys.stdout
print('# Python %s' % version, file=real_out)
print("# Python %s" % version, file=real_out)
if co.co_filename:
print('# Embedded file name: %s' % co.co_filename,
file=real_out)
print("# Embedded file name: %s" % co.co_filename, file=real_out)
scanner = get_scanner(version, is_pypy=is_pypy)
@@ -63,10 +61,12 @@ def disco(version, co, out=None, is_pypy=False):
def disco_loop(disasm, queue, real_out):
while len(queue) > 0:
co = queue.popleft()
if co.co_name != '<module>':
print('\n# %s line %d of %s' %
(co.co_name, co.co_firstlineno, co.co_filename),
file=real_out)
if co.co_name != "<module>":
print(
"\n# %s line %d of %s"
% (co.co_name, co.co_firstlineno, co.co_filename),
file=real_out,
)
tokens, customize = disasm(co)
for t in tokens:
if iscode(t.pattr):
@@ -77,6 +77,7 @@ def disco_loop(disasm, queue, real_out):
pass
pass
# def disassemble_fp(fp, outstream=None):
# """
# disassemble Python byte-code from an open file
@@ -90,6 +91,7 @@ def disco_loop(disasm, queue, real_out):
# disco(version, co, outstream, is_pypy=is_pypy)
# co = None
def disassemble_file(filename, outstream=None):
"""
disassemble Python byte-code file (.pyc)
@@ -98,8 +100,7 @@ def disassemble_file(filename, outstream=None):
try to find the corresponding compiled object.
"""
filename = check_object_path(filename)
(version, timestamp, magic_int, co, is_pypy,
source_size) = load_module(filename)
(version, timestamp, magic_int, co, is_pypy, source_size) = load_module(filename)
if type(co) == list:
for con in co:
disco(version, con, outstream)
@@ -107,6 +108,7 @@ def disassemble_file(filename, outstream=None):
disco(version, co, outstream, is_pypy=is_pypy)
co = None
def _test():
"""Simple test program to disassemble a file."""
argc = len(sys.argv)

View File

@@ -30,22 +30,44 @@ import sys
from uncompyle6 import PYTHON3, IS_PYPY
from uncompyle6.scanners.tok import Token
import xdis
from xdis.bytecode import (
Bytecode, instruction_size, extended_arg_val, next_offset)
from xdis.bytecode import Bytecode, instruction_size, extended_arg_val, next_offset
from xdis.magics import canonic_python_version
from xdis.util import code2num
# The byte code versions we support.
# Note: these all have to be floats
PYTHON_VERSIONS = frozenset((1.0, 1.1, 1.3, 1.4, 1.5, 1.6,
2.1, 2.2, 2.3, 2.4, 2.5, 2.6, 2.7,
3.0, 3.1, 3.2, 3.3, 3.4, 3.5, 3.6, 3.7, 3.8))
PYTHON_VERSIONS = frozenset(
(
1.0,
1.1,
1.3,
1.4,
1.5,
1.6,
2.1,
2.2,
2.3,
2.4,
2.5,
2.6,
2.7,
3.0,
3.1,
3.2,
3.3,
3.4,
3.5,
3.6,
3.7,
3.8,
)
)
CANONIC2VERSION = dict((canonic_python_version[str(v)], v) for v in PYTHON_VERSIONS)
# Magic changed mid version for Python 3.5.2. Compatibility was added for
# the older 3.5 interpreter magic.
CANONIC2VERSION['3.5.2'] = 3.5
CANONIC2VERSION["3.5.2"] = 3.5
# FIXME: DRY
@@ -55,24 +77,28 @@ if PYTHON3:
def long(l):
return l
else:
L65536 = long(65536) # NOQA
L65536 = long(65536) # NOQA
class Code(object):
'''
"""
Class for representing code-objects.
This is similar to the original code object, but additionally
the diassembled code is stored in the attribute '_tokens'.
'''
"""
def __init__(self, co, scanner, classname=None):
for i in dir(co):
if i.startswith('co_'):
if i.startswith("co_"):
setattr(self, i, getattr(co, i))
self._tokens, self._customize = scanner.ingest(co, classname)
class Scanner(object):
class Scanner(object):
def __init__(self, version, show_asm=None, is_pypy=False):
self.version = version
self.show_asm = show_asm
@@ -100,7 +126,7 @@ class Scanner(object):
"""
# FIXME: remove this when all subsidiary functions have been removed.
# We should be able to get everything from the self.insts list.
self.code = array('B', co.co_code)
self.code = array("B", co.co_code)
bytecode = Bytecode(co, self.opc)
self.build_prev_op()
@@ -128,7 +154,7 @@ class Scanner(object):
# 'List-map' which shows line number of current op and offset of
# first op on following line, given offset of op as index
lines = []
LineTuple = namedtuple('LineTuple', ['l_no', 'next'])
LineTuple = namedtuple("LineTuple", ["l_no", "next"])
# Iterate through available linestarts, and fill
# the data for all code offsets encountered until
@@ -171,14 +197,14 @@ class Scanner(object):
goes forward.
"""
opname = self.get_inst(offset).opname
if opname == 'JUMP_FORWARD':
if opname == "JUMP_FORWARD":
return True
if opname != 'JUMP_ABSOLUTE':
if opname != "JUMP_ABSOLUTE":
return False
return offset < self.get_target(offset)
def prev_offset(self, offset):
return self.insts[self.offset2inst_index[offset]-1].offset
return self.insts[self.offset2inst_index[offset] - 1].offset
def get_inst(self, offset):
# Instructions can get moved as a result of EXTENDED_ARGS removal.
@@ -205,7 +231,7 @@ class Scanner(object):
return target
def get_argument(self, pos):
arg = self.code[pos+1] + self.code[pos+2] * 256
arg = self.code[pos + 1] + self.code[pos + 2] * 256
return arg
def next_offset(self, op, offset):
@@ -216,9 +242,9 @@ class Scanner(object):
op = self.code[i]
if op in self.JUMP_OPS:
dest = self.get_target(i, op)
print('%i\t%s\t%i' % (i, self.opname[op], dest))
print("%i\t%s\t%i" % (i, self.opname[op], dest))
else:
print('%i\t%s\t' % (i, self.opname[op]))
print("%i\t%s\t" % (i, self.opname[op]))
def first_instr(self, start, end, instr, target=None, exact=True):
"""
@@ -232,11 +258,9 @@ class Scanner(object):
Return index to it or None if not found.
"""
code = self.code
assert(start >= 0 and end <= len(code))
assert start >= 0 and end <= len(code)
try:
None in instr
except:
if not isinstance(instr, list):
instr = [instr]
result_offset = None
@@ -274,9 +298,7 @@ class Scanner(object):
if not (start >= 0 and end <= len(code)):
return None
try:
None in instr
except:
if not isinstance(instr, list):
instr = [instr]
result_offset = None
@@ -287,7 +309,7 @@ class Scanner(object):
op = code[offset]
if op == self.opc.EXTENDED_ARG:
arg = code2num(code, offset+1) | extended_arg
arg = code2num(code, offset + 1) | extended_arg
extended_arg = extended_arg_val(self.opc, arg)
continue
@@ -365,7 +387,7 @@ class Scanner(object):
"""
code = self.code
assert(start >= 0 and end <= len(code))
assert start >= 0 and end <= len(code)
try:
None in instr
@@ -379,7 +401,7 @@ class Scanner(object):
op = code[offset]
if op == self.opc.EXTENDED_ARG:
arg = code2num(code, offset+1) | extended_arg
arg = code2num(code, offset + 1) | extended_arg
extended_arg = extended_arg_val(self.opc, arg)
continue
@@ -423,8 +445,11 @@ class Scanner(object):
last_was_extarg = False
n = len(instructions)
for i, inst in enumerate(instructions):
if (inst.opname == 'EXTENDED_ARG'
and i+1 < n and instructions[i+1].opname != 'MAKE_FUNCTION'):
if (
inst.opname == "EXTENDED_ARG"
and i + 1 < n
and instructions[i + 1].opname != "MAKE_FUNCTION"
):
last_was_extarg = True
starts_line = inst.starts_line
is_jump_target = inst.is_jump_target
@@ -435,13 +460,15 @@ class Scanner(object):
# j = self.stmts.index(inst.offset)
# self.lines[j] = offset
new_inst = inst._replace(starts_line=starts_line,
is_jump_target=is_jump_target,
offset=offset)
new_inst = inst._replace(
starts_line=starts_line,
is_jump_target=is_jump_target,
offset=offset,
)
inst = new_inst
if i < n:
new_prev = self.prev_op[instructions[i].offset]
j = instructions[i+1].offset
j = instructions[i + 1].offset
old_prev = self.prev_op[j]
while self.prev_op[j] == old_prev and j < n:
self.prev_op[j] = new_prev
@@ -463,9 +490,12 @@ class Scanner(object):
for i in ifs:
# For each offset, if line number of current and next op
# is the same
if self.lines[i].l_no == self.lines[i+3].l_no:
if self.lines[i].l_no == self.lines[i + 3].l_no:
# Skip last op on line if it is some sort of POP_JUMP.
if self.code[self.prev[self.lines[i].next]] in (self.opc.PJIT, self.opc.PJIF):
if self.code[self.prev[self.lines[i].next]] in (
self.opc.PJIT,
self.opc.PJIF,
):
continue
filtered.append(i)
return filtered
@@ -475,8 +505,8 @@ class Scanner(object):
def restrict_to_parent(self, target, parent):
"""Restrict target to parent structure boundaries."""
if not (parent['start'] < target < parent['end']):
target = parent['end']
if not (parent["start"] < target < parent["end"]):
target = parent["end"]
return target
def setTokenClass(self, tokenClass):
@@ -484,6 +514,7 @@ class Scanner(object):
self.Token = tokenClass
return self.Token
def parse_fn_counts(argc):
return ((argc & 0xFF), (argc >> 8) & 0xFF, (argc >> 16) & 0x7FFF)
@@ -496,8 +527,10 @@ def get_scanner(version, is_pypy=False, show_asm=None):
raise RuntimeError("Unknown Python version in xdis %s" % version)
canonic_version = canonic_python_version[version]
if canonic_version not in CANONIC2VERSION:
raise RuntimeError("Unsupported Python version %s (canonic %s)"
% (version, canonic_version))
raise RuntimeError(
"Unsupported Python version %s (canonic %s)"
% (version, canonic_version)
)
version = CANONIC2VERSION[canonic_version]
# Pick up appropriate scanner
@@ -505,24 +538,34 @@ def get_scanner(version, is_pypy=False, show_asm=None):
v_str = "%s" % (int(version * 10))
try:
import importlib
if is_pypy:
scan = importlib.import_module("uncompyle6.scanners.pypy%s" % v_str)
else:
scan = importlib.import_module("uncompyle6.scanners.scanner%s" % v_str)
if False: print(scan) # Avoid unused scan
if False:
print(scan) # Avoid unused scan
except ImportError:
if is_pypy:
exec("import uncompyle6.scanners.pypy%s as scan" % v_str,
locals(), globals())
exec(
"import uncompyle6.scanners.pypy%s as scan" % v_str,
locals(),
globals(),
)
else:
exec("import uncompyle6.scanners.scanner%s as scan" % v_str,
locals(), globals())
exec(
"import uncompyle6.scanners.scanner%s as scan" % v_str,
locals(),
globals(),
)
if is_pypy:
scanner = eval("scan.ScannerPyPy%s(show_asm=show_asm)" % v_str,
locals(), globals())
scanner = eval(
"scan.ScannerPyPy%s(show_asm=show_asm)" % v_str, locals(), globals()
)
else:
scanner = eval("scan.Scanner%s(show_asm=show_asm)" % v_str,
locals(), globals())
scanner = eval(
"scan.Scanner%s(show_asm=show_asm)" % v_str, locals(), globals()
)
else:
raise RuntimeError("Unsupported Python version %s" % version)
return scanner
@@ -530,8 +573,9 @@ def get_scanner(version, is_pypy=False, show_asm=None):
if __name__ == "__main__":
import inspect, uncompyle6
co = inspect.currentframe().f_code
# scanner = get_scanner('2.7.13', True)
# scanner = get_scanner(sys.version[:5], False)
scanner = get_scanner(uncompyle6.PYTHON_VERSION, IS_PYPY, True)
tokens, customize = scanner.ingest(co, {}, show_asm='after')
tokens, customize = scanner.ingest(co, {}, show_asm="after")