From 2db50971ee482782c29fa2b01512cb519b8e5fdb Mon Sep 17 00:00:00 2001 From: Gulshan Singh Date: Sun, 27 Nov 2022 19:26:29 -0800 Subject: [PATCH] Add support for Android binder --- pwndbg/commands/__init__.py | 1 + pwndbg/commands/binder.py | 373 +++++++++++++++++++++++++++++++++ pwndbg/gdblib/backtrace.py | 31 +++ pwndbg/gdblib/kernel/rbtree.py | 108 ++++++++++ 4 files changed, 513 insertions(+) create mode 100644 pwndbg/commands/binder.py create mode 100644 pwndbg/gdblib/backtrace.py create mode 100644 pwndbg/gdblib/kernel/rbtree.py diff --git a/pwndbg/commands/__init__.py b/pwndbg/commands/__init__.py index f019c3fb58..7510d2426e 100644 --- a/pwndbg/commands/__init__.py +++ b/pwndbg/commands/__init__.py @@ -587,6 +587,7 @@ def load_commands() -> None: import pwndbg.commands.aslr import pwndbg.commands.attachp import pwndbg.commands.auxv + import pwndbg.commands.binder import pwndbg.commands.canary import pwndbg.commands.checksec import pwndbg.commands.comments diff --git a/pwndbg/commands/binder.py b/pwndbg/commands/binder.py new file mode 100644 index 0000000000..ae0213b504 --- /dev/null +++ b/pwndbg/commands/binder.py @@ -0,0 +1,373 @@ +import argparse +from typing import Any +from typing import Optional + +import gdb + +import pwndbg.color as C +import pwndbg.commands +import pwndbg.gdblib.symbol +from pwndbg.commands import CommandCategory +from pwndbg.gdblib.kernel.macros import container_of +from pwndbg.gdblib.kernel.macros import for_each_entry +from pwndbg.gdblib.kernel.macros import offset_of +from pwndbg.gdblib.kernel.rbtree import for_each_rb_entry + +addrc = C.green +fieldnamec = C.blue +fieldvaluec = C.yellow +typenamec = C.red + + +def for_each_transaction(addr, field): + typename = "struct binder_transaction" + offset = offset_of(typename, field) + + head = addr + + while addr != 0: + transaction = gdb.Value(addr).cast(gdb.lookup_type(typename).pointer()) + yield transaction + + addr = transaction[field] + + +# TODO: pull this out from the slab command so we can reuse it +class IndentContextManager: + def __init__(self): + self.indent = 0 + + def __enter__(self): + self.indent += 1 + + def __exit__(self, exc_type, exc_value, exc_tb): + self.indent -= 1 + assert self.indent >= 0 + + +node_types = { + "waiting_threads": "struct binder_thread", + "todo": "struct binder_work", + "refs": "struct binder_ref", + "threads": "struct binder_thread", + "nodes": "struct binder_node", + "refs_by_node": "struct binder_ref", +} + +entry_field_names = { + "waiting_threads": "waiting_thread_node", + "todo": "entry", + "refs": "node_entry", +} + +rb_node_field_names = { + "threads": "rb_node", + "nodes": "rb_node", + "refs_by_node": "rb_node_node", +} + +codenames = [x for x in dir(gdb) if "TYPE_CODE_" in x] +code_to_name = {getattr(gdb, name): name for name in codenames} +name_to_code = {v: k for k, v in code_to_name.items()} + + +# TODO: merge with for_each_entry? +def for_each_hlist_entry(head, typename, field): + addr = head["first"] + while addr != 0: + yield container_of(addr, typename, field) + addr = addr.dereference()["next"] + + +class BinderVisitor: + def __init__(self, procs_addr): + self.indent = IndentContextManager() + self.addr = pwndbg.gdblib.memory.poi(gdb.lookup_type("struct hlist_head"), procs_addr) + + def _format_indent(self, text: str) -> str: + return " " * self.indent.indent + text + + def _format_heading(self, typename: str, fields, addr: int): + hex_addr = hex(int(addr)) + return self._format_indent( + f"{fieldnamec(typename)} {fieldvaluec(fields)} ({addrc(hex_addr)})" + ) + + # TODO: do this in a cleaner, object-oriented way + def _format_field(self, field: Optional[str] = None, value: Any = "", only_heading=True): + if isinstance(value, gdb.Value): + t = value.type + if t.code == name_to_code["TYPE_CODE_TYPEDEF"]: + real_type = t.strip_typedefs() + + # We only want to replace the typedef with the real type if the + # real type is not an anonymous struct + if real_type.name is not None: + t = real_type + + if t.code == name_to_code["TYPE_CODE_INT"]: + value = int(value) + elif t.code == name_to_code["TYPE_CODE_BOOL"]: + value = True if value else False + elif t.code == name_to_code["TYPE_CODE_PTR"]: + typename = t.target().name + if int(value) == 0: + value = "NULL" + elif typename == "binder_proc": + value = self.format_proc(value, only_heading=only_heading).strip() + elif typename == "binder_thread": + value = self.format_thread(value, only_heading=only_heading).strip() + elif typename == "binder_node": + value = self.format_node(value).strip() + elif typename == "binder_ref": + value = self.format_ref(value, only_heading=only_heading).strip() + elif typename == "binder_work": + value = self.format_work(value.dereference()).strip() + elif typename == "binder_transaction": + value = self.format_transaction(value, only_heading=only_heading) + if only_heading: + value = value.strip() + else: + value = "\n" + "\n".join([" " + l for l in value.split("\n")]) + else: + print(f"Warning: no formatter for pointer type {typename}") + elif t.code in [name_to_code["TYPE_CODE_STRUCT"], name_to_code["TYPE_CODE_TYPEDEF"]]: + typename = t.name + if typename == "spinlock": + value = self.format_spinlock(value).strip() + elif typename == "atomic_t": + value = value["counter"] + elif typename == "rb_root": + assert field is not None + value, num_elts = self.format_rb_tree(field, value) + field += f" [{num_elts}]" + elif typename in ["list_head", "hlist_head"]: + assert field is not None + value, num_elts = self.format_list(field, value, typename) + field += f" [{num_elts}]" + else: + print(f"Warning: no formatter for type {typename}") + print(t) + else: + print(f"Warning: no formatter for type code {t.code}") + + output = "" + if field: + output += fieldnamec(field) + ": " + output += fieldvaluec(value) + + return self._format_indent(output) + + def format_rb_tree(self, field, value): + res = [] + + node_type = node_types[field] + entry_field_name = rb_node_field_names[field] + + with self.indent: + for entry in for_each_rb_entry( + value, + node_type, + entry_field_name, + ): + s = self._format_field(value=entry, only_heading=False) + num_space = len(s) - len(s.lstrip()) + res.append(" " * num_space + "* " + s.lstrip()) + + if len(res) == 0: + return "EMPTY", 0 + + # Prepend a newline so the list starts on the line after the field name + return "\n" + "\n".join(res), len(res) + + def format_list(self, field, value, typename): + res = [] + + node_type = node_types[field] + entry_field_name = entry_field_names[field] + + if typename == "list_head": + each_entry = for_each_entry + elif typename == "hlist_head": + each_entry = for_each_hlist_entry + else: + assert False + + with self.indent: + for entry in each_entry( + value, + node_type, + entry_field_name, + ): + s = self._format_field(value=entry) + num_space = len(s) - len(s.lstrip()) + res.append(" " * num_space + "* " + s.lstrip()) + + if len(res) == 0: + return "EMPTY", 0 + + # Prepend a newline so the list starts on the line after the field name + return "\n" + "\n".join(res), len(res) + + def _format_fields(self, obj, fields, only_heading=True): + res = [] + for field in fields: + res.append(self._format_field(field, obj[field], only_heading=only_heading)) + return "\n".join(res) + + def visit(self): + for proc in for_each_hlist_entry(self.addr, "struct binder_proc", "proc_node"): + print(self.format_proc(proc)) + print() + + def format_proc(self, proc, only_heading=False): + res = [] + res.append(self._format_heading("binder_proc", "PID %s" % proc["pid"], proc)) + + if only_heading: + return "\n".join(res) + + with self.indent: + fields = [ + "is_dead", + "tmp_ref", + "inner_lock", + "outer_lock", + "waiting_threads", + "todo", + "threads", + "nodes", + "refs_by_node", + ] + res.append(self._format_fields(proc, fields)) + + return "\n".join(res) + + def format_thread(self, thread, only_heading=False): + res = [] + res.append(self._format_heading("binder_thread", "PID %s" % thread["pid"], thread)) + + if only_heading: + return "\n".join(res) + + with self.indent: + fields = ["tmp_ref", "looper_need_return", "process_todo", "is_dead", "todo"] + res.append(self._format_fields(thread, fields)) + + # We need to print this separately since we wanted print the entire + # object and not just the heading + res.append(self._format_fields(thread, ["transaction_stack"], only_heading=False)) + + return "\n".join(res) + + def format_transaction(self, transaction, only_heading=False): + res = [] + res.append( + self._format_heading( + "binder_transaction", "ID %d" % transaction["debug_id"], transaction + ) + ) + + if only_heading: + return "\n".join(res) + + with self.indent: + res.append(self._format_fields(transaction, ["lock", "to_proc", "from", "to_thread"])) + + if int(transaction["from_parent"]) == 0: + res.append(self._format_field("from_parent", "NULL")) + else: + res.append(self._format_field("from_parent")) + with self.indent: + for transaction in for_each_transaction( + transaction["from_parent"], "from_parent" + ): + res.append(self.format_transaction(transaction)) + + if int(transaction["to_parent"]) == 0: + res.append(self._format_field("to_parent", "NULL")) + else: + res.append(self._format_field("to_parent")) + with self.indent: + for transaction in for_each_transaction(transaction["to_parent"], "to_parent"): + res.append(self.format_transaction(transaction)) + + return "\n".join(res) + + def format_node(self, node): + res = [] + res.append(self._format_heading("binder_node", "", node)) + with self.indent: + fields = [ + "lock", + "internal_strong_refs", + "local_weak_refs", + "local_strong_refs", + "tmp_refs", + "refs", + ] + res.append(self._format_fields(node, fields)) + + return "\n".join(res) + + def format_ref(self, ref, only_heading=False): + res = [] + res.append(self._format_heading("binder_ref", "HANDLE %s" % ref["data"]["desc"], ref)) + + if only_heading: + return "\n".join(res) + + with self.indent: + fields = ["strong", "weak"] + res.append(self._format_fields(ref["data"], fields)) + + return "\n".join(res) + + def format_work(self, work): + res = [] + res.append(self._format_heading("binder_work", work["type"], work.address)) + + t = int(work["type"]) + # TODO: Create enum + if t == 1: + obj = container_of(work.address, "struct binder_transaction", "work") + elif t in [2, 3]: + return "\n".join(res) # These are just binder_work objects + elif t == 4: + obj = container_of(work.address, "struct binder_error", "work") + elif t == 5: + obj = container_of(work.address, "struct binder_node", "work") + elif t in [6, 7, 8]: + obj = container_of(work.address, "struct binder_ref_death", "work") + else: + assert False + + with self.indent: + res.append(self._format_field(value=obj)) + + return "\n".join(res) + + def print_object(self, object): + # TODO: type + print(object) + + def format_spinlock(self, lock: gdb.Value) -> str: + raw_lock = lock["rlock"]["raw_lock"] + val = pwndbg.gdblib.memory.ushort(raw_lock.address) + locked = val & 0xFF + pending = val >> 8 + + return self._format_heading("", f"LOCKED: {locked} PENDING: {pending}", int(lock.address)) + + +parser = argparse.ArgumentParser(description="Show Android Binder information") + + +@pwndbg.commands.OnlyWhenQemuKernel +@pwndbg.commands.OnlyWithKernelDebugSyms +@pwndbg.commands.OnlyWhenPagingEnabled +@pwndbg.commands.ArgparsedCommand(parser, category=CommandCategory.KERNEL) +def binder(): + procs_addr = pwndbg.gdblib.symbol.address("binder_procs") + bv = BinderVisitor(procs_addr) + bv.visit() diff --git a/pwndbg/gdblib/backtrace.py b/pwndbg/gdblib/backtrace.py new file mode 100644 index 0000000000..139b1242e9 --- /dev/null +++ b/pwndbg/gdblib/backtrace.py @@ -0,0 +1,31 @@ +# gdb.selected_frame () +# gdb.newest_frame () +# Frame.pc () +# Frame.function () +# Frame.older () +# Frame.newer () +# Frame.select () +# Frame.read_var (variable [, block]) +from typing import List + +import gdb + +# class Frame: +# def __init__(self, frame): +# pass + +# def get_frames() -> List[Frame]: +# pass + +# def select_frame(num: int): +# pass + + +def get_function_frame(function_name: str): + f = gdb.selected_frame() + while f != None: + if f.function() != None and f.function().name == function_name: + break + f = f.older() + + return f diff --git a/pwndbg/gdblib/kernel/rbtree.py b/pwndbg/gdblib/kernel/rbtree.py new file mode 100644 index 0000000000..ab015246eb --- /dev/null +++ b/pwndbg/gdblib/kernel/rbtree.py @@ -0,0 +1,108 @@ +import gdb + +from pwndbg.gdblib.events import new_objfile +from pwndbg.gdblib.kernel.macros import container_of + +rb_root_type = None +rb_node_type = None + + +@new_objfile +def init(): + global rb_root_type, rb_node_type + rb_root_type = gdb.lookup_type("struct rb_root") + rb_node_type = gdb.lookup_type("struct rb_node") + + +def for_each_rb_entry(root, typename, fieldname): + node = rb_first(root) + while node is not None and node != 0: + yield container_of(node, typename, fieldname) + node = rb_next(node) + + +def rb_first(root): + if root.type == rb_root_type: + node = root.address.cast(rb_root_type.pointer()) + elif root.type != rb_root_type.pointer(): + raise gdb.GdbError("Must be struct rb_root not {}".format(root.type)) + + node = root["rb_node"] + if node == 0: + return None + + while node["rb_left"]: + node = node["rb_left"] + + return node + + +def rb_last(root): + if root.type == rb_root_type: + node = root.address.cast(rb_root_type.pointer()) + elif root.type != rb_root_type.pointer(): + raise gdb.GdbError("Must be struct rb_root not {}".format(root.type)) + + node = root["rb_node"] + if node == 0: + return None + + while node["rb_right"]: + node = node["rb_right"] + + return node + + +def rb_parent(node): + parent = gdb.Value(node["__rb_parent_color"] & ~3) + return parent.cast(rb_node_type.pointer()) + + +def rb_empty_node(node): + return node["__rb_parent_color"] == node.address + + +def rb_next(node): + if node.type == rb_node_type: + node = node.address.cast(rb_node_type.pointer()) + elif node.type != rb_node_type.pointer(): + raise gdb.GdbError("Must be struct rb_node not {}".format(node.type)) + + if rb_empty_node(node): + return None + + if node["rb_right"]: + node = node["rb_right"] + while node["rb_left"]: + node = node["rb_left"] + return node + + parent = rb_parent(node) + while parent and node == parent["rb_right"]: + node = parent + parent = rb_parent(node) + + return parent + + +def rb_prev(node): + if node.type == rb_node_type: + node = node.address.cast(rb_node_type.pointer()) + elif node.type != rb_node_type.pointer(): + raise gdb.GdbError("Must be struct rb_node not {}".format(node.type)) + + if rb_empty_node(node): + return None + + if node["rb_left"]: + node = node["rb_left"] + while node["rb_right"]: + node = node["rb_right"] + return node.dereference() + + parent = rb_parent(node) + while parent and node == parent["rb_left"].dereference(): + node = parent + parent = rb_parent(node) + + return parent