You've already forked python-uncompyle6
mirror of
https://github.com/rocky/python-uncompyle6.git
synced 2025-08-03 00:45:53 +08:00
1529 lines
62 KiB
Python
1529 lines
62 KiB
Python
# Copyright (c) 2015-2019, 2021-2023 by Rocky Bernstein
|
|
# Copyright (c) 2005 by Dan Pascu <dan@windowmaker.org>
|
|
# Copyright (c) 2000-2002 by hartmut Goebel <h.goebel@crazy-compilers.com>
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
"""
|
|
Python 3 Generic bytecode scanner/deparser
|
|
|
|
This overlaps various Python3's dis module, but it can be run from
|
|
Python versions other than the version running this code. Notably,
|
|
run from Python version 2.
|
|
|
|
Also we *modify* the instruction sequence to assist deparsing code.
|
|
For example:
|
|
- we add "COME_FROM" instructions to help in figuring out
|
|
conditional branching and looping.
|
|
- LOAD_CONSTs are classified further into the type of thing
|
|
they load:
|
|
lambda's, genexpr's, {dict,set,list} comprehension's,
|
|
- PARAMETER counts appended {CALL,MAKE}_FUNCTION, BUILD_{TUPLE,SET,SLICE}
|
|
|
|
Finally we save token information.
|
|
"""
|
|
|
|
import xdis
|
|
|
|
# Get all the opcodes into globals
|
|
import xdis.opcodes.opcode_33 as op3
|
|
from xdis import Instruction, instruction_size, iscode
|
|
from xdis.bytecode import _get_const_info
|
|
|
|
from uncompyle6.scanner import CONST_COLLECTIONS, Scanner, parse_fn_counts_30_35
|
|
from uncompyle6.scanners.tok import Token
|
|
from uncompyle6.util import get_code_name
|
|
|
|
globals().update(op3.opmap)
|
|
|
|
|
|
class Scanner3(Scanner):
|
|
def __init__(self, version, show_asm=None, is_pypy=False):
|
|
Scanner.__init__(self, version, show_asm, is_pypy)
|
|
|
|
# Create opcode classification sets
|
|
# Note: super initialization above initializes self.opc
|
|
|
|
# For ops that start SETUP_ ... we will add COME_FROM with these names
|
|
# at the their targets.
|
|
# Some blocks and END_ statements. And they can start
|
|
# a new statement
|
|
if self.version < (3, 8):
|
|
setup_ops = [
|
|
self.opc.SETUP_LOOP,
|
|
self.opc.SETUP_EXCEPT,
|
|
self.opc.SETUP_FINALLY,
|
|
]
|
|
self.setup_ops_no_loop = frozenset(setup_ops) - frozenset(
|
|
[self.opc.SETUP_LOOP]
|
|
)
|
|
else:
|
|
setup_ops = [self.opc.SETUP_FINALLY]
|
|
self.setup_ops_no_loop = frozenset(setup_ops)
|
|
|
|
if self.version >= (3, 2):
|
|
setup_ops.append(self.opc.SETUP_WITH)
|
|
self.setup_ops = frozenset(setup_ops)
|
|
|
|
if self.version[:2] == (3, 0):
|
|
self.pop_jump_tf = frozenset(
|
|
[self.opc.JUMP_IF_FALSE, self.opc.JUMP_IF_TRUE]
|
|
)
|
|
self.not_continue_follow = ("END_FINALLY", "POP_BLOCK", "POP_TOP")
|
|
else:
|
|
self.pop_jump_tf = frozenset([self.opc.PJIF, self.opc.PJIT])
|
|
self.not_continue_follow = ("END_FINALLY", "POP_BLOCK")
|
|
|
|
# Opcodes that can start a statement.
|
|
statement_opcodes = [
|
|
self.opc.POP_BLOCK,
|
|
self.opc.STORE_FAST,
|
|
self.opc.DELETE_FAST,
|
|
self.opc.STORE_DEREF,
|
|
self.opc.STORE_GLOBAL,
|
|
self.opc.DELETE_GLOBAL,
|
|
self.opc.STORE_NAME,
|
|
self.opc.DELETE_NAME,
|
|
self.opc.STORE_ATTR,
|
|
self.opc.DELETE_ATTR,
|
|
self.opc.STORE_SUBSCR,
|
|
self.opc.POP_TOP,
|
|
self.opc.DELETE_SUBSCR,
|
|
self.opc.END_FINALLY,
|
|
self.opc.RETURN_VALUE,
|
|
self.opc.RAISE_VARARGS,
|
|
self.opc.PRINT_EXPR,
|
|
self.opc.JUMP_ABSOLUTE,
|
|
]
|
|
|
|
if self.version < (3, 8):
|
|
statement_opcodes += [self.opc.BREAK_LOOP, self.opc.CONTINUE_LOOP]
|
|
|
|
self.statement_opcodes = frozenset(statement_opcodes) | self.setup_ops_no_loop
|
|
|
|
# Opcodes that can start a "store" non-terminal.
|
|
# FIXME: JUMP_ABSOLUTE is weird. What's up with that?
|
|
self.designator_ops = frozenset(
|
|
[
|
|
self.opc.STORE_FAST,
|
|
self.opc.STORE_NAME,
|
|
self.opc.STORE_GLOBAL,
|
|
self.opc.STORE_DEREF,
|
|
self.opc.STORE_ATTR,
|
|
self.opc.STORE_SUBSCR,
|
|
self.opc.UNPACK_SEQUENCE,
|
|
self.opc.JUMP_ABSOLUTE,
|
|
self.opc.UNPACK_EX,
|
|
]
|
|
)
|
|
|
|
if self.version > (3, 0):
|
|
self.jump_if_pop = frozenset(
|
|
[self.opc.JUMP_IF_FALSE_OR_POP, self.opc.JUMP_IF_TRUE_OR_POP]
|
|
)
|
|
|
|
self.pop_jump_if_pop = frozenset(
|
|
[
|
|
self.opc.JUMP_IF_FALSE_OR_POP,
|
|
self.opc.JUMP_IF_TRUE_OR_POP,
|
|
self.opc.POP_JUMP_IF_TRUE,
|
|
self.opc.POP_JUMP_IF_FALSE,
|
|
]
|
|
)
|
|
# Not really a set, but still clasification-like
|
|
self.statement_opcode_sequences = [
|
|
(self.opc.POP_JUMP_IF_FALSE, self.opc.JUMP_FORWARD),
|
|
(self.opc.POP_JUMP_IF_FALSE, self.opc.JUMP_ABSOLUTE),
|
|
(self.opc.POP_JUMP_IF_TRUE, self.opc.JUMP_FORWARD),
|
|
(self.opc.POP_JUMP_IF_TRUE, self.opc.JUMP_ABSOLUTE),
|
|
]
|
|
|
|
else:
|
|
self.jump_if_pop = frozenset([])
|
|
self.pop_jump_if_pop = frozenset([])
|
|
# Not really a set, but still clasification-like
|
|
self.statement_opcode_sequences = [
|
|
(self.opc.JUMP_FORWARD,),
|
|
(self.opc.JUMP_ABSOLUTE,),
|
|
(self.opc.JUMP_FORWARD,),
|
|
(self.opc.JUMP_ABSOLUTE,),
|
|
]
|
|
|
|
# FIXME: remove this and use instead info from xdis.
|
|
# Opcodes that take a variable number of arguments
|
|
# (expr's)
|
|
varargs_ops = set(
|
|
[
|
|
self.opc.BUILD_LIST,
|
|
self.opc.BUILD_TUPLE,
|
|
self.opc.BUILD_SET,
|
|
self.opc.BUILD_SLICE,
|
|
self.opc.BUILD_MAP,
|
|
self.opc.UNPACK_SEQUENCE,
|
|
self.opc.RAISE_VARARGS,
|
|
]
|
|
)
|
|
|
|
if is_pypy or self.version >= (3, 7):
|
|
varargs_ops.add(self.opc.CALL_METHOD)
|
|
if self.version >= (3, 5):
|
|
varargs_ops |= set(
|
|
[
|
|
self.opc.BUILD_SET_UNPACK,
|
|
self.opc.BUILD_MAP_UNPACK, # we will handle this later
|
|
self.opc.BUILD_LIST_UNPACK,
|
|
self.opc.BUILD_TUPLE_UNPACK,
|
|
]
|
|
)
|
|
if self.version >= (3, 6):
|
|
varargs_ops.add(self.opc.BUILD_CONST_KEY_MAP)
|
|
# Below is in bit order, "default = bit 0, closure = bit 3
|
|
self.MAKE_FUNCTION_FLAGS = tuple(
|
|
"""
|
|
default keyword-only annotation closure""".split()
|
|
)
|
|
|
|
self.varargs_ops = frozenset(varargs_ops)
|
|
# FIXME: remove the above in favor of:
|
|
# self.varargs_ops = frozenset(self.opc.hasvargs)
|
|
return
|
|
|
|
def bound_collection_from_inst(
|
|
self, insts, next_tokens, inst, t, i, collection_type
|
|
):
|
|
"""
|
|
Try to a replace sequence of instruction that ends with a
|
|
BUILD_xxx with a sequence that can be parsed much faster, but
|
|
inserting the token boundary at the beginning of the sequence.
|
|
"""
|
|
count = t.attr
|
|
assert isinstance(count, int)
|
|
|
|
assert count <= i
|
|
|
|
if collection_type == "CONST_DICT":
|
|
# constant dictionaries work via BUILD_CONST_KEY_MAP and
|
|
# handle the values() like sets and lists.
|
|
# However the keys() are an LOAD_CONST of the keys.
|
|
# adjust offset to account for this
|
|
count += 1
|
|
|
|
# For small lists don't bother
|
|
if count < 5:
|
|
return None
|
|
|
|
collection_start = i - count
|
|
|
|
for j in range(collection_start, i):
|
|
if insts[j].opname not in (
|
|
"LOAD_ASSERT",
|
|
"LOAD_CODE",
|
|
"LOAD_CONST",
|
|
"LOAD_FAST",
|
|
"LOAD_GLOBAL",
|
|
"LOAD_NAME",
|
|
"LOAD_STR",
|
|
):
|
|
return None
|
|
|
|
collection_enum = CONST_COLLECTIONS.index(collection_type)
|
|
|
|
# If we get here, all instructions before tokens[i] are LOAD_CONST and we can replace
|
|
# add a boundary marker and change LOAD_CONST to something else
|
|
new_tokens = next_tokens[:-count]
|
|
start_offset = insts[collection_start].offset
|
|
new_tokens.append(
|
|
Token(
|
|
opname="COLLECTION_START",
|
|
attr=collection_enum,
|
|
pattr=collection_type,
|
|
offset="%s_0" % start_offset,
|
|
linestart=False,
|
|
has_arg=True,
|
|
has_extended_arg=False,
|
|
opc=self.opc,
|
|
)
|
|
)
|
|
for j in range(collection_start, i):
|
|
new_tokens.append(
|
|
Token(
|
|
opname="ADD_VALUE",
|
|
attr=insts[j].argval,
|
|
pattr=insts[j].argrepr,
|
|
offset=insts[j].offset,
|
|
linestart=insts[j].starts_line,
|
|
has_arg=True,
|
|
has_extended_arg=False,
|
|
opc=self.opc,
|
|
)
|
|
)
|
|
new_tokens.append(
|
|
Token(
|
|
opname="BUILD_%s" % collection_type,
|
|
attr=t.attr,
|
|
pattr=t.pattr,
|
|
offset=t.offset,
|
|
linestart=t.linestart,
|
|
has_arg=t.has_arg,
|
|
has_extended_arg=False,
|
|
opc=t.opc,
|
|
)
|
|
)
|
|
return new_tokens
|
|
|
|
def bound_map_from_inst(self, insts, next_tokens, inst, t, i):
|
|
"""
|
|
Try to a sequence of instruction that ends with a BUILD_MAP into
|
|
a sequence that can be parsed much faster, but inserting the
|
|
token boundary at the beginning of the sequence.
|
|
"""
|
|
count = t.attr
|
|
assert isinstance(count, int)
|
|
if count > i:
|
|
return None
|
|
|
|
# For small lists don't bother
|
|
if count < 5:
|
|
return None
|
|
|
|
collection_start = i - (count * 2)
|
|
assert (count * 2) <= i
|
|
|
|
for j in range(collection_start, i, 2):
|
|
if insts[j].opname not in ("LOAD_CONST",):
|
|
return None
|
|
if insts[j + 1].opname not in ("LOAD_CONST",):
|
|
return None
|
|
|
|
collection_start = i - (2 * count)
|
|
collection_enum = CONST_COLLECTIONS.index("CONST_MAP")
|
|
|
|
# If we get here, all instructions before tokens[i] are LOAD_CONST and
|
|
# we can replace add a boundary marker and change LOAD_CONST to
|
|
# something else.
|
|
new_tokens = next_tokens[: -(2 * count)]
|
|
start_offset = insts[collection_start].offset
|
|
new_tokens.append(
|
|
Token(
|
|
opname="COLLECTION_START",
|
|
attr=collection_enum,
|
|
pattr="CONST_MAP",
|
|
offset="%s_0" % start_offset,
|
|
linestart=False,
|
|
has_arg=True,
|
|
has_extended_arg=False,
|
|
opc=self.opc,
|
|
)
|
|
)
|
|
for j in range(collection_start, i, 2):
|
|
new_tokens.append(
|
|
Token(
|
|
opname="ADD_KEY",
|
|
attr=insts[j].argval,
|
|
pattr=insts[j].argrepr,
|
|
offset=insts[j].offset,
|
|
linestart=insts[j].starts_line,
|
|
has_arg=True,
|
|
has_extended_arg=False,
|
|
opc=self.opc,
|
|
)
|
|
)
|
|
new_tokens.append(
|
|
Token(
|
|
opname="ADD_VALUE",
|
|
attr=insts[j + 1].argval,
|
|
pattr=insts[j + 1].argrepr,
|
|
offset=insts[j + 1].offset,
|
|
linestart=insts[j + 1].starts_line,
|
|
has_arg=True,
|
|
has_extended_arg=False,
|
|
opc=self.opc,
|
|
)
|
|
)
|
|
new_tokens.append(
|
|
Token(
|
|
opname="BUILD_DICT_OLDER",
|
|
attr=t.attr,
|
|
pattr=t.pattr,
|
|
offset=t.offset,
|
|
linestart=t.linestart,
|
|
has_arg=t.has_arg,
|
|
has_extended_arg=False,
|
|
opc=t.opc,
|
|
)
|
|
)
|
|
return new_tokens
|
|
|
|
def ingest(self, co, classname=None, code_objects={}, show_asm=None):
|
|
"""
|
|
Create "tokens" the bytecode of an Python code object. Largely these
|
|
are the opcode name, but in some cases that has been modified to make parsing
|
|
easier.
|
|
returning a list of uncompyle6 Token's.
|
|
|
|
Some transformations are made to assist the deparsing grammar:
|
|
- various types of LOAD_CONST's are categorized in terms of what they load
|
|
- COME_FROM instructions are added to assist parsing control structures
|
|
- operands with stack argument counts or flag masks are appended to the
|
|
opcode name, e.g.:
|
|
* BUILD_LIST, BUILD_SET
|
|
* MAKE_FUNCTION and FUNCTION_CALLS append the number of positional
|
|
arguments
|
|
- EXTENDED_ARGS instructions are removed
|
|
|
|
Also, when we encounter certain tokens, we add them to a set
|
|
which will cause custom grammar rules. Specifically, variable
|
|
arg tokens like MAKE_FUNCTION or BUILD_LIST cause specific rules
|
|
for the specific number of arguments they take.
|
|
"""
|
|
|
|
if not show_asm:
|
|
show_asm = self.show_asm
|
|
|
|
if not show_asm:
|
|
show_asm = self.show_asm
|
|
|
|
bytecode = self.build_instructions(co)
|
|
|
|
# show_asm = 'both'
|
|
if show_asm in ("both", "before"):
|
|
print("\n# ---- before tokenization:")
|
|
bytecode.disassemble_bytes(
|
|
co.co_code,
|
|
varnames=co.co_varnames,
|
|
names=co.co_names,
|
|
constants=co.co_consts,
|
|
cells=bytecode._cell_names,
|
|
linestarts=bytecode._linestarts,
|
|
asm_format="extended",
|
|
)
|
|
|
|
# "customize" is in the process of going away here
|
|
customize = {}
|
|
|
|
if self.is_pypy:
|
|
customize["PyPy"] = 0
|
|
|
|
# Scan for assertions. Later we will
|
|
# turn 'LOAD_GLOBAL' to 'LOAD_ASSERT'.
|
|
# 'LOAD_ASSERT' is used in assert statements.
|
|
self.load_asserts = set()
|
|
|
|
n = len(self.insts)
|
|
for i, inst in enumerate(self.insts):
|
|
opname = inst.opname
|
|
# We need to detect the difference between:
|
|
# raise AssertionError
|
|
# and
|
|
# assert ...
|
|
# If we have a JUMP_FORWARD after the
|
|
# RAISE_VARARGS then we have a "raise" statement
|
|
# else we have an "assert" statement.
|
|
if self.version[:2] == (3, 0):
|
|
# Like 2.6, 3.0 doesn't have POP_JUMP_IF... so we have
|
|
# to go through more machinations
|
|
assert_can_follow = opname == "POP_TOP" and i + 1 < n
|
|
if assert_can_follow:
|
|
prev_inst = self.insts[i - 1]
|
|
assert_can_follow = (
|
|
prev_inst.opname in ("JUMP_IF_TRUE", "JUMP_IF_FALSE")
|
|
and i + 1 < n
|
|
)
|
|
jump_if_inst = prev_inst
|
|
else:
|
|
assert_can_follow = (
|
|
opname in ("POP_JUMP_IF_TRUE", "POP_JUMP_IF_FALSE") and i + 1 < n
|
|
)
|
|
jump_if_inst = inst
|
|
if assert_can_follow:
|
|
next_inst = self.insts[i + 1]
|
|
if (
|
|
next_inst.opname == "LOAD_GLOBAL"
|
|
and next_inst.argval == "AssertionError"
|
|
and jump_if_inst.argval
|
|
):
|
|
raise_idx = self.offset2inst_index[
|
|
self.prev_op[jump_if_inst.argval]
|
|
]
|
|
raise_inst = self.insts[raise_idx]
|
|
if raise_inst.opname.startswith("RAISE_VARARGS"):
|
|
self.load_asserts.add(next_inst.offset)
|
|
pass
|
|
pass
|
|
|
|
# Get jump targets
|
|
# Format: {target offset: [jump offsets]}
|
|
jump_targets = self.find_jump_targets(show_asm)
|
|
# print("XXX2", jump_targets)
|
|
|
|
last_op_was_break = False
|
|
new_tokens = []
|
|
|
|
for i, inst in enumerate(self.insts):
|
|
opname = inst.opname
|
|
argval = inst.argval
|
|
pattr = inst.argrepr
|
|
|
|
t = Token(
|
|
opname=opname,
|
|
attr=argval,
|
|
pattr=pattr,
|
|
offset=inst.offset,
|
|
linestart=inst.starts_line,
|
|
op=inst.opcode,
|
|
has_arg=inst.has_arg,
|
|
has_extended_arg=inst.has_extended_arg,
|
|
opc=self.opc,
|
|
)
|
|
|
|
# things that smash new_tokens like BUILD_LIST have to come first.
|
|
if opname in (
|
|
"BUILD_CONST_KEY_MAP",
|
|
"BUILD_LIST",
|
|
"BUILD_SET",
|
|
):
|
|
if opname.startswith("BUILD_CONST_KEY_MAP"):
|
|
collection_type = "DICT"
|
|
else:
|
|
collection_type = opname.split("_")[1]
|
|
try_tokens = self.bound_collection_from_inst(
|
|
self.insts, new_tokens, inst, t, i, "CONST_%s" % collection_type
|
|
)
|
|
if try_tokens is not None:
|
|
new_tokens = try_tokens
|
|
continue
|
|
elif opname in ("BUILD_MAP",):
|
|
try_tokens = self.bound_map_from_inst(
|
|
self.insts,
|
|
new_tokens,
|
|
inst,
|
|
t,
|
|
i,
|
|
)
|
|
if try_tokens is not None:
|
|
new_tokens = try_tokens
|
|
continue
|
|
|
|
argval = inst.argval
|
|
op = inst.opcode
|
|
|
|
if opname == "EXTENDED_ARG":
|
|
# FIXME: The EXTENDED_ARG is used to signal annotation
|
|
# parameters
|
|
if i + 1 < n and self.insts[i + 1].opcode != self.opc.MAKE_FUNCTION:
|
|
continue
|
|
|
|
if inst.offset in jump_targets:
|
|
jump_idx = 0
|
|
# We want to process COME_FROMs to the same offset to be in *descending*
|
|
# offset order so we have the larger range or biggest instruction interval
|
|
# last. (I think they are sorted in increasing order, but for safety
|
|
# we sort them). That way, specific COME_FROM tags will match up
|
|
# properly. For example, a "loop" with an "if" nested in it should have the
|
|
# "loop" tag last so the grammar rule matches that properly.
|
|
for jump_offset in sorted(jump_targets[inst.offset], reverse=True):
|
|
come_from_name = "COME_FROM"
|
|
come_from_opname = self.opname_for_offset(jump_offset)
|
|
if come_from_opname == "EXTENDED_ARG":
|
|
j = xdis.next_offset(op, self.opc, jump_offset)
|
|
come_from_opname = self.opname_for_offset(j)
|
|
|
|
if come_from_opname.startswith("SETUP_"):
|
|
come_from_type = come_from_opname[len("SETUP_") :]
|
|
come_from_name = "COME_FROM_%s" % come_from_type
|
|
pass
|
|
elif inst.offset in self.except_targets:
|
|
come_from_name = "COME_FROM_EXCEPT_CLAUSE"
|
|
new_tokens.append(
|
|
Token(
|
|
come_from_name,
|
|
jump_offset,
|
|
repr(jump_offset),
|
|
offset="%s_%s" % (inst.offset, jump_idx),
|
|
has_arg=True,
|
|
opc=self.opc,
|
|
)
|
|
)
|
|
jump_idx += 1
|
|
pass
|
|
pass
|
|
elif inst.offset in self.else_start:
|
|
end_offset = self.else_start[inst.offset]
|
|
new_tokens.append(
|
|
Token(
|
|
"ELSE",
|
|
None,
|
|
repr(end_offset),
|
|
offset="%s" % (inst.offset),
|
|
has_arg=True,
|
|
opc=self.opc,
|
|
)
|
|
)
|
|
|
|
pass
|
|
|
|
if op in self.opc.CONST_OPS:
|
|
const = argval
|
|
if iscode(const):
|
|
co_name = const.co_name
|
|
if co_name == "<lambda>":
|
|
assert opname == "LOAD_CONST"
|
|
opname = "LOAD_LAMBDA"
|
|
elif co_name == "<genexpr>":
|
|
opname = "LOAD_GENEXPR"
|
|
elif co_name == "<dictcomp>":
|
|
opname = "LOAD_DICTCOMP"
|
|
elif co_name == "<setcomp>":
|
|
opname = "LOAD_SETCOMP"
|
|
elif co_name == "<listcomp>":
|
|
opname = "LOAD_LISTCOMP"
|
|
else:
|
|
opname = "LOAD_CODE"
|
|
# verify() uses 'pattr' for comparison, since 'attr'
|
|
# now holds Code(const) and thus can not be used
|
|
# for comparison (todo: think about changing this)
|
|
# pattr = 'code_object @ 0x%x %s->%s' %\
|
|
# (id(const), const.co_filename, const.co_name)
|
|
pattr = "<code_object " + const.co_name + ">"
|
|
elif isinstance(const, str) or isinstance(const, unicode):
|
|
opname = "LOAD_STR"
|
|
else:
|
|
if isinstance(inst.arg, int) and inst.arg < len(co.co_consts):
|
|
argval, _ = _get_const_info(inst.arg, co.co_consts)
|
|
# Why don't we use _ above for "pattr" rather than "const"?
|
|
# This *is* a little hoaky, but we have to coordinate with
|
|
# other parts like n_LOAD_CONST in pysource.py for example.
|
|
pattr = const
|
|
pass
|
|
elif opname == "LOAD_FAST" and argval == ".0":
|
|
# Used as the parameter of a list expression
|
|
opname = "LOAD_ARG"
|
|
|
|
elif opname in ("MAKE_FUNCTION", "MAKE_CLOSURE"):
|
|
if self.version >= (3, 6):
|
|
# 3.6+ doesn't have MAKE_CLOSURE, so opname == 'MAKE_FUNCTION'
|
|
flags = argval
|
|
# FIXME: generalize this
|
|
if flags == 8:
|
|
opname = "MAKE_FUNCTION_CLOSURE"
|
|
elif flags == 9:
|
|
opname = "MAKE_FUNCTION_CLOSURE_POS"
|
|
else:
|
|
opname = "MAKE_FUNCTION_%d" % (flags)
|
|
attr = []
|
|
for flag in self.MAKE_FUNCTION_FLAGS:
|
|
bit = flags & 1
|
|
attr.append(bit)
|
|
flags >>= 1
|
|
attr = attr[:4] # remove last value: attr[5] == False
|
|
else:
|
|
pos_args, name_pair_args, annotate_args = parse_fn_counts_30_35(
|
|
inst.argval
|
|
)
|
|
|
|
pattr = "%s positional, %s keyword only, %s annotated" % (
|
|
pos_args,
|
|
name_pair_args,
|
|
annotate_args,
|
|
)
|
|
|
|
if name_pair_args > 0 and annotate_args > 0:
|
|
# FIXME: this should probably be K_
|
|
opname += "_N%s_A%s" % (name_pair_args, annotate_args)
|
|
pass
|
|
elif annotate_args > 0:
|
|
opname += "_A_%s" % annotate_args
|
|
pass
|
|
elif name_pair_args > 0:
|
|
opname += "_N_%s" % name_pair_args
|
|
pass
|
|
else:
|
|
# Rule customization mathics, MAKE_FUNCTION_...
|
|
# so make sure to add the "_"
|
|
opname += "_0"
|
|
|
|
attr = (pos_args, name_pair_args, annotate_args)
|
|
|
|
new_tokens.append(
|
|
Token(
|
|
opname=opname,
|
|
attr=attr,
|
|
pattr=pattr,
|
|
offset=inst.offset,
|
|
linestart=inst.starts_line,
|
|
op=op,
|
|
has_arg=inst.has_arg,
|
|
opc=self.opc,
|
|
)
|
|
)
|
|
continue
|
|
elif op in self.varargs_ops:
|
|
pos_args = argval
|
|
if self.is_pypy and not pos_args and opname == "BUILD_MAP":
|
|
opname = "BUILD_MAP_n"
|
|
else:
|
|
opname = "%s_%d" % (opname, pos_args)
|
|
|
|
elif self.is_pypy and opname in ("JUMP_IF_NOT_DEBUG", "CALL_FUNCTION"):
|
|
if opname == "JUMP_IF_NOT_DEBUG":
|
|
# The value in the dict is in special cases in semantic actions, such
|
|
# as JUMP_IF_NOT_DEBUG. The value is not used in these cases, so we put
|
|
# in arbitrary value 0.
|
|
customize[opname] = 0
|
|
elif self.version >= (3, 6) and argval > 255:
|
|
opname = "CALL_FUNCTION_KW"
|
|
pass
|
|
|
|
elif opname == "UNPACK_EX":
|
|
# FIXME: try with scanner and parser by
|
|
# changing argval
|
|
before_args = argval & 0xFF
|
|
after_args = (argval >> 8) & 0xFF
|
|
pattr = "%d before vararg, %d after" % (before_args, after_args)
|
|
argval = (before_args, after_args)
|
|
opname = "%s_%d+%d" % (opname, before_args, after_args)
|
|
|
|
elif op == self.opc.JUMP_ABSOLUTE:
|
|
# Further classify JUMP_ABSOLUTE into backward jumps
|
|
# which are used in loops, and "CONTINUE" jumps which
|
|
# may appear in a "continue" statement. The loop-type
|
|
# and continue-type jumps will help us classify loop
|
|
# boundaries The continue-type jumps help us get
|
|
# "continue" statements with would otherwise be turned
|
|
# into a "pass" statement because JUMPs are sometimes
|
|
# ignored in rules as just boundary overhead. In
|
|
# comprehensions we might sometimes classify JUMP_BACK
|
|
# as CONTINUE, but that's okay since we add a grammar
|
|
# rule for that.
|
|
pattr = argval
|
|
target = self.get_target(inst.offset)
|
|
if target <= inst.offset:
|
|
next_opname = self.insts[i + 1].opname
|
|
|
|
# 'Continue's include jumps to loops that are not
|
|
# and the end of a block which follow with POP_BLOCK and COME_FROM_LOOP.
|
|
# If the JUMP_ABSOLUTE is to a FOR_ITER and it is followed by another JUMP_FORWARD
|
|
# then we'll take it as a "continue".
|
|
is_continue = (
|
|
self.insts[self.offset2inst_index[target]].opname == "FOR_ITER"
|
|
and self.insts[i + 1].opname == "JUMP_FORWARD"
|
|
)
|
|
|
|
if (
|
|
self.version[:2] == (3, 0)
|
|
and self.insts[i + 1].opname == "JUMP_FORWARD"
|
|
and not is_continue
|
|
):
|
|
target_prev = self.offset2inst_index[self.prev_op[target]]
|
|
is_continue = self.insts[target_prev].opname == "SETUP_LOOP"
|
|
|
|
if is_continue or (
|
|
inst.offset in self.stmts
|
|
and (
|
|
inst.starts_line
|
|
and next_opname not in self.not_continue_follow
|
|
)
|
|
):
|
|
opname = "CONTINUE"
|
|
else:
|
|
opname = "JUMP_BACK"
|
|
# FIXME: this is a hack to catch stuff like:
|
|
# if x: continue
|
|
# the "continue" is not on a new line.
|
|
# There are other situations where we don't catch
|
|
# CONTINUE as well.
|
|
if (
|
|
new_tokens[-1].kind == "JUMP_BACK"
|
|
and new_tokens[-1].attr <= argval
|
|
):
|
|
if new_tokens[-2].kind == "BREAK_LOOP":
|
|
del new_tokens[-1]
|
|
else:
|
|
# intern is used because we are changing the *previous* token
|
|
new_tokens[-1].kind = intern("CONTINUE")
|
|
if last_op_was_break and opname == "CONTINUE":
|
|
last_op_was_break = False
|
|
continue
|
|
|
|
# FIXME: go over for Python 3.6+. This is sometimes wrong
|
|
elif op == self.opc.RETURN_VALUE:
|
|
if inst.offset in self.return_end_ifs:
|
|
opname = "RETURN_END_IF"
|
|
|
|
elif inst.offset in self.load_asserts:
|
|
opname = "LOAD_ASSERT"
|
|
|
|
last_op_was_break = opname == "BREAK_LOOP"
|
|
t.kind = opname
|
|
t.attr = argval
|
|
t.pattr = pattr
|
|
new_tokens.append(t)
|
|
pass
|
|
|
|
if show_asm in ("both", "after"):
|
|
print("\n# ---- after tokenization:")
|
|
for t in new_tokens:
|
|
print(t.format(line_prefix=""))
|
|
print()
|
|
return new_tokens, customize
|
|
|
|
def find_jump_targets(self, debug):
|
|
"""
|
|
Detect all offsets in a byte code which are jump targets
|
|
where we might insert a COME_FROM instruction.
|
|
|
|
Return the list of offsets.
|
|
|
|
Return the list of offsets. An instruction can be jumped
|
|
to in from multiple instructions.
|
|
"""
|
|
code = self.code
|
|
n = len(code)
|
|
self.structs = [{"type": "root", "start": 0, "end": n - 1}]
|
|
|
|
# All loop entry points
|
|
self.loops = []
|
|
|
|
# Map fixed jumps to their real destination
|
|
self.fixed_jumps = {}
|
|
self.except_targets = {}
|
|
self.ignore_if = set()
|
|
self.build_statement_indices()
|
|
self.else_start = {}
|
|
|
|
# Containers filled by detect_control_flow()
|
|
self.not_continue = set()
|
|
self.return_end_ifs = set()
|
|
self.setup_loop_targets = {} # target given setup_loop offset
|
|
self.setup_loops = {} # setup_loop offset given target
|
|
|
|
targets = {}
|
|
for i, inst in enumerate(self.insts):
|
|
offset = inst.offset
|
|
op = inst.opcode
|
|
|
|
# Determine structures and fix jumps in Python versions
|
|
# since 2.3
|
|
self.detect_control_flow(offset, targets, i)
|
|
|
|
if inst.has_arg:
|
|
label = self.fixed_jumps.get(offset)
|
|
oparg = inst.arg
|
|
if (
|
|
self.version >= (3, 6)
|
|
and self.code[offset] == self.opc.EXTENDED_ARG
|
|
):
|
|
j = xdis.next_offset(op, self.opc, offset)
|
|
next_offset = xdis.next_offset(op, self.opc, j)
|
|
else:
|
|
next_offset = xdis.next_offset(op, self.opc, offset)
|
|
|
|
if label is None:
|
|
if op in self.opc.hasjrel and op != self.opc.FOR_ITER:
|
|
label = next_offset + oparg
|
|
elif op in self.opc.hasjabs:
|
|
if op in self.jump_if_pop:
|
|
if oparg > offset:
|
|
label = oparg
|
|
|
|
if label is not None and label != -1:
|
|
targets[label] = targets.get(label, []) + [offset]
|
|
elif op == self.opc.END_FINALLY and offset in self.fixed_jumps:
|
|
label = self.fixed_jumps[offset]
|
|
targets[label] = targets.get(label, []) + [offset]
|
|
pass
|
|
|
|
pass # for loop
|
|
|
|
# DEBUG:
|
|
if debug in ("both", "after"):
|
|
import pprint as pp
|
|
|
|
pp.pprint(self.structs)
|
|
|
|
return targets
|
|
|
|
def build_statement_indices(self):
|
|
code = self.code
|
|
start = 0
|
|
end = codelen = len(code)
|
|
|
|
# Compose preliminary list of indices with statements,
|
|
# using plain statement opcodes
|
|
prelim = self.inst_matches(start, end, self.statement_opcodes)
|
|
|
|
# Initialize final container with statements with
|
|
# preliminary data
|
|
stmts = self.stmts = set(prelim)
|
|
|
|
# Same for opcode sequences
|
|
pass_stmts = set()
|
|
for sequence in self.statement_opcode_sequences:
|
|
for i in self.op_range(start, end - (len(sequence) + 1)):
|
|
match = True
|
|
for elem in sequence:
|
|
if elem != code[i]:
|
|
match = False
|
|
break
|
|
i += instruction_size(code[i], self.opc)
|
|
|
|
if match is True:
|
|
i = self.prev_op[i]
|
|
stmts.add(i)
|
|
pass_stmts.add(i)
|
|
|
|
# Initialize statement list with the full data we've gathered so far
|
|
if pass_stmts:
|
|
stmt_offset_list = list(stmts)
|
|
stmt_offset_list.sort()
|
|
else:
|
|
stmt_offset_list = prelim
|
|
# 'List-map' which contains offset of start of
|
|
# next statement, when op offset is passed as index
|
|
self.next_stmt = slist = []
|
|
last_stmt_offset = -1
|
|
i = 0
|
|
# Go through all statement offsets
|
|
for stmt_offset in stmt_offset_list:
|
|
# Process absolute jumps, but do not remove 'pass' statements
|
|
# from the set
|
|
if (
|
|
code[stmt_offset] == self.opc.JUMP_ABSOLUTE
|
|
and stmt_offset not in pass_stmts
|
|
):
|
|
# If absolute jump occurs in forward direction or it takes off from the
|
|
# same line as previous statement, this is not a statement
|
|
# FIXME: 0 isn't always correct
|
|
target = self.get_target(stmt_offset)
|
|
if (
|
|
target > stmt_offset
|
|
or self.lines[last_stmt_offset].l_no == self.lines[stmt_offset].l_no
|
|
):
|
|
stmts.remove(stmt_offset)
|
|
continue
|
|
# Rewing ops till we encounter non-JUMP_ABSOLUTE one
|
|
j = self.prev_op[stmt_offset]
|
|
while code[j] == self.opc.JUMP_ABSOLUTE:
|
|
j = self.prev_op[j]
|
|
# If we got here, then it's list comprehension which
|
|
# is not a statement too
|
|
if code[j] == self.opc.LIST_APPEND:
|
|
stmts.remove(stmt_offset)
|
|
continue
|
|
# Exclude ROT_TWO + POP_TOP
|
|
elif (
|
|
code[stmt_offset] == self.opc.POP_TOP
|
|
and code[self.prev_op[stmt_offset]] == self.opc.ROT_TWO
|
|
):
|
|
stmts.remove(stmt_offset)
|
|
continue
|
|
# Exclude FOR_ITER + designators
|
|
elif code[stmt_offset] in self.designator_ops:
|
|
j = self.prev_op[stmt_offset]
|
|
while code[j] in self.designator_ops:
|
|
j = self.prev_op[j]
|
|
if code[j] == self.opc.FOR_ITER:
|
|
stmts.remove(stmt_offset)
|
|
continue
|
|
# Add to list another list with offset of current statement,
|
|
# equal to length of previous statement
|
|
slist += [stmt_offset] * (stmt_offset - i)
|
|
last_stmt_offset = stmt_offset
|
|
i = stmt_offset
|
|
# Finish filling the list for last statement
|
|
slist += [codelen] * (codelen - len(slist))
|
|
|
|
def detect_control_flow(self, offset, targets, inst_index):
|
|
"""
|
|
Detect type of block structures and their boundaries to fix optimized jumps
|
|
in python2.3+
|
|
"""
|
|
|
|
code = self.code
|
|
inst = self.insts[inst_index]
|
|
op = inst.opcode
|
|
|
|
# Detect parent structure
|
|
parent = self.structs[0]
|
|
start = parent["start"]
|
|
end = parent["end"]
|
|
|
|
# Pick inner-most parent for our offset
|
|
for struct in self.structs:
|
|
current_start = struct["start"]
|
|
current_end = struct["end"]
|
|
if (current_start <= offset < current_end) and (
|
|
current_start >= start and current_end <= end
|
|
):
|
|
start = current_start
|
|
end = current_end
|
|
parent = struct
|
|
|
|
if self.version < (3, 8) and op == self.opc.SETUP_LOOP:
|
|
# We categorize loop types: 'for', 'while', 'while 1' with
|
|
# possibly suffixes '-loop' and '-else'
|
|
# Try to find the jump_back instruction of the loop.
|
|
# It could be a return instruction.
|
|
|
|
start += inst.inst_size
|
|
target = self.get_target(offset)
|
|
end = self.restrict_to_parent(target, parent)
|
|
self.setup_loops[target] = offset
|
|
|
|
if target != end:
|
|
self.fixed_jumps[offset] = end
|
|
|
|
(line_no, next_line_byte) = self.lines[offset]
|
|
jump_back = self.last_instr(
|
|
start, end, self.opc.JUMP_ABSOLUTE, next_line_byte, False
|
|
)
|
|
|
|
if jump_back:
|
|
jump_forward_offset = xdis.next_offset(
|
|
code[jump_back], self.opc, jump_back
|
|
)
|
|
else:
|
|
jump_forward_offset = None
|
|
|
|
return_val_offset1 = self.prev[self.prev[end]]
|
|
|
|
if (
|
|
jump_back
|
|
and jump_back != self.prev_op[end]
|
|
and self.is_jump_forward(jump_forward_offset)
|
|
):
|
|
if code[self.prev_op[end]] == self.opc.RETURN_VALUE or (
|
|
code[self.prev_op[end]] == self.opc.POP_BLOCK
|
|
and code[return_val_offset1] == self.opc.RETURN_VALUE
|
|
):
|
|
jump_back = None
|
|
if not jump_back:
|
|
# loop suite ends in return
|
|
jump_back = self.last_instr(start, end, self.opc.RETURN_VALUE)
|
|
if not jump_back:
|
|
return
|
|
|
|
jb_inst = self.get_inst(jump_back)
|
|
jump_back = self.next_offset(jb_inst.opcode, jump_back)
|
|
|
|
if_offset = None
|
|
if code[self.prev_op[next_line_byte]] not in self.pop_jump_tf:
|
|
if_offset = self.prev[next_line_byte]
|
|
if if_offset:
|
|
loop_type = "while"
|
|
self.ignore_if.add(if_offset)
|
|
else:
|
|
loop_type = "for"
|
|
target = next_line_byte
|
|
end = xdis.next_offset(code[jump_back], self.opc, jump_back)
|
|
else:
|
|
if self.get_target(jump_back) >= next_line_byte:
|
|
jump_back = self.last_instr(
|
|
start, end, self.opc.JUMP_ABSOLUTE, start, False
|
|
)
|
|
|
|
jb_inst = self.get_inst(jump_back)
|
|
|
|
jb_next_offset = self.next_offset(jb_inst.opcode, jump_back)
|
|
if end > jb_next_offset and self.is_jump_forward(end):
|
|
if self.is_jump_forward(jb_next_offset):
|
|
if self.get_target(jb_next_offset) == self.get_target(end):
|
|
self.fixed_jumps[offset] = jb_next_offset
|
|
end = jb_next_offset
|
|
elif target < offset:
|
|
self.fixed_jumps[offset] = jb_next_offset
|
|
end = jb_next_offset
|
|
|
|
target = self.get_target(jump_back)
|
|
|
|
if code[target] in (self.opc.FOR_ITER, self.opc.GET_ITER):
|
|
loop_type = "for"
|
|
else:
|
|
loop_type = "while"
|
|
test = self.prev_op[next_line_byte]
|
|
|
|
if test == offset:
|
|
loop_type = "while 1"
|
|
elif self.code[test] in self.opc.JUMP_OPs:
|
|
self.ignore_if.add(test)
|
|
test_target = self.get_target(test)
|
|
if test_target > (jump_back + 3):
|
|
jump_back = test_target
|
|
self.not_continue.add(jump_back)
|
|
self.loops.append(target)
|
|
self.structs.append(
|
|
{"type": loop_type + "-loop", "start": target, "end": jump_back}
|
|
)
|
|
after_jump_offset = xdis.next_offset(code[jump_back], self.opc, jump_back)
|
|
if after_jump_offset != end:
|
|
self.structs.append(
|
|
{
|
|
"type": loop_type + "-else",
|
|
"start": after_jump_offset,
|
|
"end": end,
|
|
}
|
|
)
|
|
elif op in self.pop_jump_tf:
|
|
start = offset + inst.inst_size
|
|
target = inst.argval
|
|
rtarget = self.restrict_to_parent(target, parent)
|
|
prev_op = self.prev_op
|
|
|
|
# Do not let jump to go out of parent struct bounds
|
|
if target != rtarget and parent["type"] == "and/or":
|
|
self.fixed_jumps[offset] = rtarget
|
|
return
|
|
|
|
# Does this jump to right after another conditional jump that is
|
|
# not myself? If so, it's part of a larger conditional.
|
|
# rocky: if we have a conditional jump to the next instruction, then
|
|
# possibly I am "skipping over" a "pass" or null statement.
|
|
pretarget = self.get_inst(prev_op[target])
|
|
|
|
if (
|
|
pretarget.opcode in self.pop_jump_if_pop
|
|
and (target > offset)
|
|
and pretarget.offset != offset
|
|
):
|
|
# FIXME: hack upon hack...
|
|
# In some cases the pretarget can be a jump to the next instruction
|
|
# and these aren't and/or's either. We limit to 3.5+ since we experienced there
|
|
# but it might be earlier versions, or might be a general principle.
|
|
if self.version < (3, 5) or pretarget.argval != target:
|
|
# FIXME: this is not accurate The commented out below
|
|
# is what it should be. However grammar rules right now
|
|
# assume the incorrect offsets.
|
|
# self.fixed_jumps[offset] = target
|
|
self.fixed_jumps[offset] = pretarget.offset
|
|
self.structs.append(
|
|
{"type": "and/or", "start": start, "end": pretarget.offset}
|
|
)
|
|
return
|
|
|
|
# The opcode *two* instructions before the target jump offset is important
|
|
# in making a determination of what we have. Save that.
|
|
pre_rtarget = prev_op[rtarget]
|
|
|
|
# Is it an "and" inside an "if" or "while" block
|
|
if op == self.opc.POP_JUMP_IF_FALSE:
|
|
# Search for another POP_JUMP_IF_FALSE targeting the same op,
|
|
# in current statement, starting from current offset, and filter
|
|
# everything inside inner 'or' jumps and midline ifs
|
|
match = self.rem_or(
|
|
start, self.next_stmt[offset], self.opc.POP_JUMP_IF_FALSE, target
|
|
)
|
|
|
|
# FIXME: Remove this whole "if" block
|
|
# If we still have any offsets in set, start working on it
|
|
if match:
|
|
is_jump_forward = self.is_jump_forward(pre_rtarget)
|
|
if (
|
|
is_jump_forward
|
|
and pre_rtarget not in self.stmts
|
|
and self.restrict_to_parent(
|
|
self.get_target(pre_rtarget), parent
|
|
)
|
|
== rtarget
|
|
):
|
|
if (
|
|
code[prev_op[pre_rtarget]] == self.opc.JUMP_ABSOLUTE
|
|
and self.remove_mid_line_ifs([offset])
|
|
and target == self.get_target(prev_op[pre_rtarget])
|
|
and (
|
|
prev_op[pre_rtarget] not in self.stmts
|
|
or self.get_target(prev_op[pre_rtarget])
|
|
> prev_op[pre_rtarget]
|
|
)
|
|
and 1
|
|
== len(
|
|
self.remove_mid_line_ifs(
|
|
self.rem_or(
|
|
start,
|
|
prev_op[pre_rtarget],
|
|
self.pop_jump_tf,
|
|
target,
|
|
)
|
|
)
|
|
)
|
|
):
|
|
pass
|
|
elif (
|
|
code[prev_op[pre_rtarget]] == self.opc.RETURN_VALUE
|
|
and self.remove_mid_line_ifs([offset])
|
|
and 1
|
|
== (
|
|
len(
|
|
set(
|
|
self.remove_mid_line_ifs(
|
|
self.rem_or(
|
|
start,
|
|
prev_op[pre_rtarget],
|
|
self.pop_jump_tf,
|
|
target,
|
|
)
|
|
)
|
|
)
|
|
| set(
|
|
self.remove_mid_line_ifs(
|
|
self.rem_or(
|
|
start,
|
|
prev_op[pre_rtarget],
|
|
(
|
|
self.opc.POP_JUMP_IF_FALSE,
|
|
self.opc.POP_JUMP_IF_TRUE,
|
|
self.opc.JUMP_ABSOLUTE,
|
|
),
|
|
pre_rtarget,
|
|
True,
|
|
)
|
|
)
|
|
)
|
|
)
|
|
)
|
|
):
|
|
pass
|
|
elif self.version <= (3, 2):
|
|
fix = None
|
|
jump_ifs = self.inst_matches(
|
|
start,
|
|
self.next_stmt[offset],
|
|
self.opc.POP_JUMP_IF_FALSE,
|
|
)
|
|
last_jump_good = True
|
|
for j in jump_ifs:
|
|
if target == self.get_target(j):
|
|
# FIXME: remove magic number
|
|
if self.lines[j].next == j + 3 and last_jump_good:
|
|
fix = j
|
|
break
|
|
else:
|
|
last_jump_good = False
|
|
self.fixed_jumps[offset] = fix or match[-1]
|
|
return
|
|
else:
|
|
if self.version < (3, 6):
|
|
# FIXME: this is putting in COME_FROMs in the wrong place.
|
|
# Fix up grammar so we don't need to do this.
|
|
# See cf_for_iter use in parser36.py
|
|
self.fixed_jumps[offset] = match[-1]
|
|
elif target > offset:
|
|
# Right now we only add COME_FROMs in forward (not loop) jumps
|
|
self.fixed_jumps[offset] = target
|
|
return
|
|
# op == POP_JUMP_IF_TRUE
|
|
else:
|
|
next = self.next_stmt[offset]
|
|
if prev_op[next] == offset:
|
|
pass
|
|
elif self.is_jump_forward(next) and target == self.get_target(next):
|
|
if code[prev_op[next]] == self.opc.POP_JUMP_IF_FALSE:
|
|
if (
|
|
code[next] == self.opc.JUMP_FORWARD
|
|
or target != rtarget
|
|
or code[prev_op[pre_rtarget]]
|
|
not in (self.opc.JUMP_ABSOLUTE, self.opc.RETURN_VALUE)
|
|
):
|
|
self.fixed_jumps[offset] = prev_op[next]
|
|
return
|
|
elif (
|
|
code[next] == self.opc.JUMP_ABSOLUTE
|
|
and self.is_jump_forward(target)
|
|
and self.get_target(target) == self.get_target(next)
|
|
):
|
|
self.fixed_jumps[offset] = prev_op[next]
|
|
return
|
|
|
|
# Don't add a struct for a while test, it's already taken care of
|
|
if offset in self.ignore_if:
|
|
return
|
|
|
|
rtarget_is_ja = code[pre_rtarget] == self.opc.JUMP_ABSOLUTE
|
|
if (
|
|
rtarget_is_ja
|
|
and pre_rtarget in self.stmts
|
|
and pre_rtarget != offset
|
|
and prev_op[pre_rtarget] != offset
|
|
and not (
|
|
code[rtarget] == self.opc.JUMP_ABSOLUTE
|
|
and code[rtarget + 3] == self.opc.POP_BLOCK
|
|
and code[prev_op[pre_rtarget]] != self.opc.JUMP_ABSOLUTE
|
|
)
|
|
):
|
|
rtarget = pre_rtarget
|
|
|
|
# Does the "jump if" jump beyond a jump op?
|
|
# That is, we have something like:
|
|
# POP_JUMP_IF_FALSE HERE
|
|
# ...
|
|
# JUMP_FORWARD
|
|
# HERE:
|
|
#
|
|
# If so, this can be block inside an "if" statement
|
|
# or a conditional assignment like:
|
|
# x = 1 if x else 2
|
|
#
|
|
# For 3.5, in addition the JUMP_FORWARD above we could have
|
|
# JUMP_BACK or CONTINUE
|
|
#
|
|
# There are other situations we may need to consider, like
|
|
# if the condition jump is to a forward location.
|
|
# Also the existence of a jump to the instruction after "END_FINALLY"
|
|
# will distinguish "try/else" from "try".
|
|
if self.version < (3, 8):
|
|
rtarget_break = (self.opc.RETURN_VALUE, self.opc.BREAK_LOOP)
|
|
else:
|
|
rtarget_break = (self.opc.RETURN_VALUE,)
|
|
|
|
if self.is_jump_forward(pre_rtarget) or (
|
|
rtarget_is_ja and self.version >= (3, 5)
|
|
):
|
|
if_end = self.get_target(pre_rtarget)
|
|
|
|
# If the jump target is back, we are looping
|
|
if (
|
|
if_end < pre_rtarget
|
|
and self.version < (3, 8)
|
|
and (code[prev_op[if_end]] == self.opc.SETUP_LOOP)
|
|
):
|
|
if if_end > start:
|
|
return
|
|
|
|
end = self.restrict_to_parent(if_end, parent)
|
|
|
|
self.structs.append(
|
|
{"type": "if-then", "start": start, "end": pre_rtarget}
|
|
)
|
|
|
|
# FIXME: add this
|
|
# self.fixed_jumps[offset] = rtarget
|
|
self.not_continue.add(pre_rtarget)
|
|
|
|
if rtarget < end and (
|
|
code[rtarget] not in (self.opc.END_FINALLY, self.opc.JUMP_ABSOLUTE)
|
|
and code[prev_op[pre_rtarget]]
|
|
not in (self.opc.POP_EXCEPT, self.opc.END_FINALLY)
|
|
):
|
|
self.structs.append({"type": "else", "start": rtarget, "end": end})
|
|
self.else_start[rtarget] = end
|
|
elif self.is_jump_back(pre_rtarget, 0):
|
|
if_end = rtarget
|
|
self.structs.append(
|
|
{"type": "if-then", "start": start, "end": pre_rtarget}
|
|
)
|
|
self.not_continue.add(pre_rtarget)
|
|
elif code[pre_rtarget] in rtarget_break:
|
|
self.structs.append({"type": "if-then", "start": start, "end": rtarget})
|
|
# It is important to distinguish if this return is inside some sort
|
|
# except block return
|
|
jump_prev = prev_op[offset]
|
|
if self.is_pypy and code[jump_prev] == self.opc.COMPARE_OP:
|
|
if self.opc.cmp_op[code[jump_prev + 1]] == "exception-match":
|
|
return
|
|
if self.version >= (3, 5):
|
|
# Python 3.5 may remove as dead code a JUMP
|
|
# instruction after a RETURN_VALUE. So we check
|
|
# based on seeing SETUP_EXCEPT various places.
|
|
if self.version < (3, 6) and code[rtarget] == self.opc.SETUP_EXCEPT:
|
|
return
|
|
# Check that next instruction after pops and jump is
|
|
# not from SETUP_EXCEPT
|
|
next_op = rtarget
|
|
if code[next_op] == self.opc.POP_BLOCK:
|
|
next_op += instruction_size(self.code[next_op], self.opc)
|
|
if code[next_op] == self.opc.JUMP_ABSOLUTE:
|
|
next_op += instruction_size(self.code[next_op], self.opc)
|
|
if next_op in targets:
|
|
for try_op in targets[next_op]:
|
|
come_from_op = code[try_op]
|
|
if (
|
|
self.version < (3, 8)
|
|
and come_from_op == self.opc.SETUP_EXCEPT
|
|
):
|
|
return
|
|
pass
|
|
pass
|
|
|
|
if self.version >= (3, 4):
|
|
self.fixed_jumps[offset] = rtarget
|
|
|
|
if code[pre_rtarget] == self.opc.RETURN_VALUE:
|
|
# If we are at some sort of POP_JUMP_IF and the instruction before was
|
|
# COMPARE_OP exception-match, then pre_rtarget is not an end_if
|
|
if not (
|
|
inst_index > 0
|
|
and self.insts[inst_index - 1].argval == "exception-match"
|
|
):
|
|
self.return_end_ifs.add(pre_rtarget)
|
|
else:
|
|
self.fixed_jumps[offset] = rtarget
|
|
self.not_continue.add(pre_rtarget)
|
|
else:
|
|
# FIXME: this is very convoluted and based on rather hacky
|
|
# empirical evidence. It should go a way when
|
|
# we have better control-flow analysis
|
|
normal_jump = self.version >= (3, 6)
|
|
if self.version[:2] == (3, 5):
|
|
j = self.offset2inst_index[target]
|
|
if j + 2 < len(self.insts) and self.insts[j + 2].is_jump_target:
|
|
normal_jump = self.insts[j + 1].opname == "POP_BLOCK"
|
|
|
|
if normal_jump:
|
|
# For now, we'll only tag forward jump.
|
|
if target > offset:
|
|
self.fixed_jumps[offset] = target
|
|
pass
|
|
else:
|
|
# FIXME: This is probably a bug in < 3.5 and we should
|
|
# instead use the above code. But until we smoke things
|
|
# out we'll stick with it.
|
|
if rtarget > offset:
|
|
self.fixed_jumps[offset] = rtarget
|
|
|
|
elif self.version < (3, 8) and op == self.opc.SETUP_EXCEPT:
|
|
target = self.get_target(offset)
|
|
end = self.restrict_to_parent(target, parent)
|
|
self.fixed_jumps[offset] = end
|
|
elif op == self.opc.POP_EXCEPT:
|
|
next_offset = xdis.next_offset(op, self.opc, offset)
|
|
target = self.get_target(next_offset)
|
|
if target > next_offset:
|
|
next_op = code[next_offset]
|
|
if (
|
|
self.opc.JUMP_ABSOLUTE == next_op
|
|
and self.opc.END_FINALLY
|
|
!= code[xdis.next_offset(next_op, self.opc, next_offset)]
|
|
):
|
|
self.fixed_jumps[next_offset] = target
|
|
self.except_targets[target] = next_offset
|
|
|
|
elif op == self.opc.SETUP_FINALLY:
|
|
target = self.get_target(offset)
|
|
end = self.restrict_to_parent(target, parent)
|
|
self.fixed_jumps[offset] = end
|
|
elif op in self.jump_if_pop:
|
|
target = self.get_target(offset)
|
|
if target > offset:
|
|
unop_target = self.last_instr(
|
|
offset, target, self.opc.JUMP_FORWARD, target
|
|
)
|
|
if unop_target and code[unop_target + 3] != self.opc.ROT_TWO:
|
|
self.fixed_jumps[offset] = unop_target
|
|
else:
|
|
self.fixed_jumps[offset] = self.restrict_to_parent(target, parent)
|
|
pass
|
|
pass
|
|
elif self.version >= (3, 5):
|
|
# 3.5+ has Jump optimization which too often causes RETURN_VALUE to get
|
|
# misclassified as RETURN_END_IF. Handle that here.
|
|
# In RETURN_VALUE, JUMP_ABSOLUTE, RETURN_VALUE is never RETURN_END_IF
|
|
if op == self.opc.RETURN_VALUE:
|
|
next_offset = xdis.next_offset(op, self.opc, offset)
|
|
if next_offset < len(code) and (
|
|
code[next_offset] == self.opc.JUMP_ABSOLUTE
|
|
and offset in self.return_end_ifs
|
|
):
|
|
self.return_end_ifs.remove(offset)
|
|
pass
|
|
pass
|
|
elif op == self.opc.JUMP_FORWARD:
|
|
# If we have:
|
|
# JUMP_FORWARD x, [non-jump, insns], RETURN_VALUE, x:
|
|
# then RETURN_VALUE is not RETURN_END_IF
|
|
rtarget = self.get_target(offset)
|
|
rtarget_prev = self.prev[rtarget]
|
|
if (
|
|
code[rtarget_prev] == self.opc.RETURN_VALUE
|
|
and rtarget_prev in self.return_end_ifs
|
|
):
|
|
i = rtarget_prev
|
|
while i != offset:
|
|
if code[i] in [op3.JUMP_FORWARD, op3.JUMP_ABSOLUTE]:
|
|
return
|
|
i = self.prev[i]
|
|
self.return_end_ifs.remove(rtarget_prev)
|
|
pass
|
|
return
|
|
|
|
def is_jump_back(self, offset, extended_arg):
|
|
"""
|
|
Return True if the code at offset is some sort of jump back.
|
|
That is, it is ether "JUMP_FORWARD" or an absolute jump that
|
|
goes forward.
|
|
"""
|
|
if self.code[offset] != self.opc.JUMP_ABSOLUTE:
|
|
return False
|
|
return offset > self.get_target(offset, extended_arg)
|
|
|
|
def next_except_jump(self, start):
|
|
"""
|
|
Return the next jump that was generated by an except SomeException:
|
|
construct in a try...except...else clause or None if not found.
|
|
"""
|
|
|
|
if self.code[start] == self.opc.DUP_TOP:
|
|
except_match = self.first_instr(
|
|
start, len(self.code), self.opc.POP_JUMP_IF_FALSE
|
|
)
|
|
if except_match:
|
|
jmp = self.prev_op[self.get_target(except_match)]
|
|
self.ignore_if.add(except_match)
|
|
self.not_continue.add(jmp)
|
|
return jmp
|
|
|
|
count_END_FINALLY = 0
|
|
count_SETUP_ = 0
|
|
for i in self.op_range(start, len(self.code)):
|
|
op = self.code[i]
|
|
if op == self.opc.END_FINALLY:
|
|
if count_END_FINALLY == count_SETUP_:
|
|
assert self.code[self.prev_op[i]] in frozenset(
|
|
[
|
|
self.opc.JUMP_ABSOLUTE,
|
|
self.opc.JUMP_FORWARD,
|
|
self.opc.RETURN_VALUE,
|
|
]
|
|
)
|
|
self.not_continue.add(self.prev_op[i])
|
|
return self.prev_op[i]
|
|
count_END_FINALLY += 1
|
|
elif op in self.setup_opts_no_loop:
|
|
count_SETUP_ += 1
|
|
|
|
def rem_or(self, start, end, instr, target=None, include_beyond_target=False):
|
|
"""
|
|
Find offsets of all requested <instr> between <start> and <end>,
|
|
optionally <target>ing specified offset, and return list found
|
|
<instr> offsets which are not within any POP_JUMP_IF_TRUE jumps.
|
|
"""
|
|
assert start >= 0 and end <= len(self.code) and start <= end
|
|
|
|
# Find all offsets of requested instructions
|
|
instr_offsets = self.inst_matches(
|
|
start, end, instr, target, include_beyond_target
|
|
)
|
|
# Get all POP_JUMP_IF_TRUE (or) offsets
|
|
if self.version[:2] == (3, 0):
|
|
jump_true_op = self.opc.JUMP_IF_TRUE
|
|
else:
|
|
jump_true_op = self.opc.POP_JUMP_IF_TRUE
|
|
pjit_offsets = self.inst_matches(start, end, jump_true_op)
|
|
filtered = []
|
|
for pjit_offset in pjit_offsets:
|
|
pjit_tgt = self.get_target(pjit_offset) - 3
|
|
for instr_offset in instr_offsets:
|
|
if instr_offset <= pjit_offset or instr_offset >= pjit_tgt:
|
|
filtered.append(instr_offset)
|
|
instr_offsets = filtered
|
|
filtered = []
|
|
return instr_offsets
|