From 0e028487596091afb3ea1035f7f78ef7661c9c6e Mon Sep 17 00:00:00 2001 From: Malfurious Date: Fri, 12 Jan 2024 06:34:38 -0500 Subject: rev: Expose modules' contents through package This is the start of an overarching change meant to simplify sploit library imports. In general, all packages (directories) are intended to export all the classes, methods, and variables of their contained modules. This way users need only import the package, which leads to less verbose import statements (and usually fewer import statements). We would still like to gate objects behind their respective packages, rather than providing the whole world with `from sploit import *` so that users can still have some amount of control over what is brought into their global namespace. Beware: For code internal to sploit, full module imports should probably continue to be used. Otherwise, there is a possibility for circular imports if two modules from two packages cross import. Signed-off-by: Malfurious --- sploit/rev/__init__.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) (limited to 'sploit') diff --git a/sploit/rev/__init__.py b/sploit/rev/__init__.py index 0d0dc9b..42e2f5b 100644 --- a/sploit/rev/__init__.py +++ b/sploit/rev/__init__.py @@ -1,6 +1,4 @@ -from . import ( - elf, - gadget, - ldd, - r2, -) +from .elf import * +from .gadget import * +from .ldd import * +from .r2 import * -- cgit v1.2.3 From 4a39e37e19fe02be9169d8a37ee09190acd09527 Mon Sep 17 00:00:00 2001 From: Malfurious Date: Fri, 12 Jan 2024 07:37:53 -0500 Subject: builder: Rename package to payload and expose contents This follows in the package contents export change. Additionally, the builder package is renamed to "payload". "payload" is actually the preferred name of this package. It was previously renamed due to the absurdity of importing "sploit.payload.payload.Payload()", and the fact that additional modules were being bundled together so a more broad name _seemed_ desirable. Signed-off-by: Malfurious --- sploit/__init__.py | 2 +- sploit/builder/__init__.py | 5 - sploit/builder/gadhint.py | 109 ------------- sploit/builder/payload.py | 94 ----------- sploit/builder/rop.py | 383 --------------------------------------------- sploit/payload/__init__.py | 3 + sploit/payload/gadhint.py | 109 +++++++++++++ sploit/payload/payload.py | 94 +++++++++++ sploit/payload/rop.py | 383 +++++++++++++++++++++++++++++++++++++++++++++ 9 files changed, 590 insertions(+), 592 deletions(-) delete mode 100644 sploit/builder/__init__.py delete mode 100644 sploit/builder/gadhint.py delete mode 100644 sploit/builder/payload.py delete mode 100644 sploit/builder/rop.py create mode 100644 sploit/payload/__init__.py create mode 100644 sploit/payload/gadhint.py create mode 100644 sploit/payload/payload.py create mode 100644 sploit/payload/rop.py (limited to 'sploit') diff --git a/sploit/__init__.py b/sploit/__init__.py index 1eb570c..f761ead 100644 --- a/sploit/__init__.py +++ b/sploit/__init__.py @@ -1,8 +1,8 @@ from sploit import ( arch, - builder, comm, log, + payload, rev, symtbl, until, diff --git a/sploit/builder/__init__.py b/sploit/builder/__init__.py deleted file mode 100644 index 758d511..0000000 --- a/sploit/builder/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -from . import ( - gadhint, - payload, - rop, -) diff --git a/sploit/builder/gadhint.py b/sploit/builder/gadhint.py deleted file mode 100644 index 9b077fe..0000000 --- a/sploit/builder/gadhint.py +++ /dev/null @@ -1,109 +0,0 @@ -from dataclasses import dataclass, field -from sploit.rev.gadget import Gadget - -@dataclass -class GadHint: - """ - User-annotated gadget description object - - gadget (Gadget|int): The gadget being annotated. May be a Gadget object or - an offset as an int. - - pops (list[str]): The registers popped by this gadget, in order of - occurrence. - - movs (dict{str:str}): The register-to-register moves made by this gadget. - Keys are destination register names, values are source register names. The - order given is insignificant. - - imms (dict{str:int}): The immediate-to-register loads made by this gadget. - Keys are destination register names, values are immediate values. The order - given is insignificant. - - writes (dict{str:str}): The register-to-memory moves (stores) made by this - gadget. Keys are destination register names (expected to hold memory - locations), values are source register names (expected to hold direct - values). The order given is insignificant. - - requirements (dict{str:int}): The register state that is required before - this gadget should be executed. Keys are register names, values are the - required register values. - - stack (list[int]): A list of words to append to the stack following this - gadget. The first element given is nearest to the top of the stack and the - rest follow in order. - - align (bool): If True, this gadget expects the stack to be aligned prior - to entry. - - syscall (bool): If True, this gadget contains a syscall instruction. - - spm (int): "Stack pointer move" - The amount the stack pointer is adjusted - by this gadget. The effect of executing a terminating "return" instruction - should not be accounted for. A value of zero is taken as "unspecified". - """ - - gadget: int = 0 - pops: list = field(default_factory=list) - movs: dict = field(default_factory=dict) - imms: dict = field(default_factory=dict) - writes: dict = field(default_factory=dict) - requirements: dict = field(default_factory=dict) - stack: list = field(default_factory=list) - align: bool = False - syscall: bool = False - spm: int = 0 - - @property - def offset(self): - """Return gadget offset as an integer.""" - return int(self.gadget) - - def __index__(self): - """Convert object to integer using offset value.""" - return self.offset - - def __add__(self, x): - """Return new object with adjusted offset.""" - return GadHint(self.gadget + x, self.pops, self.movs, self.imms, - self.writes, self.requirements, self.stack, self.align, - self.syscall, self.spm) - - def __sub__(self, x): - """Return new object with adjusted offset.""" - return self + (-x) - - def with_requirements(self, reqs): - """Return new object with additional requirements.""" - for k, v in reqs.items(): - if self.requirements.get(k, v) != v: - raise ValueError( - f"GadHint: Conflicting gadget requirements: " - f"{self.requirements}, {reqs}") - - return GadHint(self.gadget, self.pops, self.movs, self.imms, - self.writes, self.requirements | reqs, self.stack, - self.align, self.syscall, self.spm) - - def __repr__(self): - """Return human-readable GadHint.""" - def fmt(name, prop): - if len(prop) > 0: - return f", {name}={prop}" - return "" - - s = hex(self.gadget) - s = f"Gadget({s})" if type(self.gadget) is Gadget else s - s += fmt("pops", self.pops) - s += fmt("movs", self.movs) - s += fmt("imms", self.imms) - s += fmt("writes", self.writes) - s += fmt("requirements", self.requirements) - s += fmt("stack", self.stack) - if self.align: - s += ", align" - if self.syscall: - s += ", syscall" - if self.spm > 0: - s += f", spm={self.spm}" - return f"GadHint({s})" diff --git a/sploit/builder/payload.py b/sploit/builder/payload.py deleted file mode 100644 index cf105c6..0000000 --- a/sploit/builder/payload.py +++ /dev/null @@ -1,94 +0,0 @@ -from sploit.arch import arch, itob -from sploit.symtbl import Symtbl - -class Payload: - MAGIC = b'\xef' - - def __init__(self, **kwargs): - self.payload = b'' - self.sym = Symtbl(**kwargs) - self.ctrs = {} - - def __len__(self): - return len(self.payload) - - def __call__(self, badbytes=b''): - found = [ hex(x) for x in set(self.payload).intersection(badbytes) ] - if len(found) > 0: - raise Exception(f'Payload: bad bytes in content: {found}') - return self.payload - - def _name(self, kind, sym): - if sym is not None: return sym - try: ctr = self.ctrs[kind] - except: ctr = 0 - self.ctrs[kind] = ctr + 1 - return f'{kind}_{ctr}' - - def _append(self, value, sym): - (self.sym @ 0)[sym] = len(self) - self.payload += value - return self - - def _prepend(self, value, sym): - self.sym >>= len(value) - (self.sym @ 0)[sym] = 0 - self.payload = value + self.payload - return self - - def end(self): - return self.sym.base + len(self) - - def bin(self, *values, sym=None): - return self._append(b''.join(values), sym=self._name('bin', sym)) - - def str(self, *values, sym=None): - values = [ v.encode() + b'\x00' for v in values ] - return self.bin(*values, sym=self._name('str', sym)) - - def int(self, *values, sym=None): - values = [ itob(v) for v in values ] - return self.bin(*values, sym=self._name('int', sym)) - - def int8(self, *values, sym=None): - values = [ itob(v, 1) for v in values ] - return self.bin(*values, sym=self._name('int', sym)) - - def int16(self, *values, sym=None): - values = [ itob(v, 2) for v in values ] - return self.bin(*values, sym=self._name('int', sym)) - - def int32(self, *values, sym=None): - values = [ itob(v, 4) for v in values ] - return self.bin(*values, sym=self._name('int', sym)) - - def int64(self, *values, sym=None): - values = [ itob(v, 8) for v in values ] - return self.bin(*values, sym=self._name('int', sym)) - - def ret(self, *values, sym=None): - return self.int(*values, sym=self._name('ret', sym)) - - def sbp(self, *values, sym=None): - if len(values) == 0: - return self.rep(self.MAGIC, arch.wordsize, sym=self._name('sbp', sym)) - return self.int(*values, sym=self._name('sbp', sym)) - - def rep(self, value, size, sym=None): - return self.bin(self._rep_helper(value, size), sym=self._name('rep', sym)) - - def pad(self, size, value=None, sym=None): - return self.bin(self._pad_helper(size, value), sym=self._name('pad', sym)) - - def pad_front(self, size, value=None, sym=None): - return self._prepend(self._pad_helper(size, value), sym=self._name('pad', sym)) - - def _rep_helper(self, value, size, *, explain=''): - if size < 0: - raise Exception(f'Payload: {explain}rep: available space is negative') - if (size := size / len(value)) != int(size): - raise Exception(f'Payload: {explain}rep: element does not divide the space evenly') - return value * int(size) - - def _pad_helper(self, size, value): - return self._rep_helper(value or arch.nopcode, size - len(self), explain='pad: ') diff --git a/sploit/builder/rop.py b/sploit/builder/rop.py deleted file mode 100644 index 7b58e0e..0000000 --- a/sploit/builder/rop.py +++ /dev/null @@ -1,383 +0,0 @@ -""" -ROP chain generation utilities - -This module contains tools for automating basic return-oriented-programming -workloads, such as loading register values and calling into arbitrary functions -or syscalls. The tools are currently designed to work on x86 (32 or 64 bit) -and ARM (32 bit only). - -The main appeal of the ROP class is the ability to abstract away the manual -construction of ROP chain data, and instead make declarative statements like -"call this function with these arguments." The ROP class will also utilize its -supplied binary objects to automatically find and use trivial gadgets. - -The user is able to provide annotations for more complicated gadgets, which help -instruct the class how to incorporate them into a ROP chain. This is done with -the GadHint dataclass. GadHint objects are provided to a ROP instance by -including them in the Symtbl of one of the binary objects it is constructed with. -If applicable, a user-supplied gadget will take precedence over automatic gadget -searching. - -See the GadHint class to learn more about the descriptive attributes that are -supported. -""" - -from graphlib import TopologicalSorter - -from sploit.arch import arch, btoi, itob -from sploit.builder.gadhint import GadHint -from sploit.builder.payload import Payload - -class ROP(Payload): - """ - ROP-enabled payload builder - - POP_MAGIC (int): Magic value used for pop instructions where no specific - value is required by the user. - - SPM_MAGIC (bytes): Magic value to fill the stack with when the best - available cleaning gadget is larger than is necessary. - - objects (list[ELF]): The binary objects this ROP instance will consider - for gadget searching. - - safe_syscalls (bool): If True, require that automatically found syscall - instructions are immediately followed by a return instruction. - - align_calls (bool): If True, ensure that the stack return address into - function calls is aligned according to the architecture alignment property. - Knowledge of alignment is taken from the instance Symtbl's base value. - - clean_stack (bool): If True, attempt to locate a cleaning gadget to "pop" - stack data that is leftover from a function call. Required if attempting - to make multiple calls that involve stack-based arguments. - """ - - POP_MAGIC = 0xdead - SPM_MAGIC = b'\x69' - - def __init__(self, *objects, safe_syscalls=True, align_calls=True, - clean_stack=True, **symbols): - """Initialize new ROP builder instance.""" - super().__init__(**symbols) - self.objects = objects - self.safe_syscalls = safe_syscalls - self.align_calls = align_calls - self.clean_stack = clean_stack - - def gadgets(self, *regexes, cont=False): - """Return a list of matching gadgets, considering all objects.""" - results = [] - for obj in self.objects: - results += obj.gadgets(*regexes, cont=cont) - return results - - def gadget(self, *regexes): - """Return the first matching gadget, considering all objects.""" - for obj in self.objects: - try: - return obj.gadget(*regexes) - except: - pass - raise LookupError( - f"ROP: Need to define gadget symbol for {'; '.join(regexes)}") - - def assign(self, *, sym=None, **sets): - """ - Insert a ROP chain to control given registers. - - **sets (str:int): Keyword arguments specify register assignments to - perform with this ROP chain. Argument names correspond to register - names. - - sym (str): If given, sym is the symbol name used to refer to the - inserted data. - """ - gadget = GadHint(0, requirements=sets) - return self._start_chain(gadget, sym=self._name("assign", sym)) - - def call(self, func, *params, sym=None): - """ - Insert a ROP chain to call function. - - func (int): Entry address of function to call. - *params (int): Remaining positional args are passed to func. - - sym (str): If given, sym is the symbol name used to refer to the - inserted data. - """ - register_params = dict(zip(arch.funcargs, params)) - stack_params = params[len(register_params):] - gadget = GadHint(func, requirements=register_params, stack=stack_params, - align=self.align_calls) - return self._start_chain(gadget, sym=self._name("call", sym)) - - def syscall(self, *params, sym=None): - """ - Insert a ROP chain to call kernel. - - *params (int): The first argument is the syscall number. Remaining - positional arguments are passed to the syscall. - - sym (str): If given, sym is the symbol name used to refer to the - inserted data. - """ - if len(params) > len(arch.kernargs): - raise TypeError("ROP: Too many arguments passed to syscall. " - "Target architecture supports up to {len(arch.kernargs)-1}.") - - register_params = dict(zip(arch.kernargs, params)) - gadget = self._get_gadget("syscall", {}).with_requirements(register_params) - return self._start_chain(gadget, sym=self._name("syscall", sym)) - - def memcpy(self, dst, src, *, sym=None): - """ - Insert a ROP chain to write data into memory. - - dst (int): The destination memory address. - src (bytes): The content to write. - - sym (str): If given, sym is the symbol name used to refer to the - inserted data. - """ - gadgets = [] - for idx in range(0, len(src), arch.wordsize): - g = self._get_write(dst + idx, btoi(src[idx:idx+arch.wordsize])) - gadgets.append(g) - return self._start_chain(*gadgets, sym=self._name("memcpy", sym)) - - def _get_hints(self): - """Return all user-supplied gadget hints.""" - return [h for obj in self.objects for _,h in obj.sym if type(h) is GadHint] - - def _discover_requirements(self, seen, graph, current): - """ - Populate gadget dependency graph. - - This function recursively looks up gadgets to ensure all necessary - required gadgets can be found, and stores this information into the - given graph object. Established dependencies encode the order that the - chain builder should attempt to satisfy register requirements. - Dependency loops are detected by the TopologicalSorter. - - seen (set): Set of (register,value) tuples we have already discovered. - graph (TopologicalSorter): Dependency graph model object. - current (GadHint): Current gadget we are processing. - """ - for r, v in current.requirements.items(): - # We key on register name _and_ value because some gadgets may - # only be capable of storing specific values in a target register. - # Requiring a register to store different values may require the - # use of multiple gadgets. - if (r, v) not in seen: - gadget = self._get_gadget(r, current.requirements) - - # Add gadget's requirements to the dependency graph. - # We say that each requirement is a 'successor' to this - # current gadget 'r', so that the chain builder will satisfy - # 'r' first. This prevents the fulfillment of 'r' from - # colbbering targets it requires, as the builder will satisfy - # them afterward. - for x in gadget.requirements: - graph.add(x, r) - - # Treat gadget's load immediates as pseudo-requirements for - # the sake of target ordering, following the same logic - # as above. - for x in gadget.imms: - graph.add(x, r) - - # Mark node as visited - seen.add((r, v)) - self._discover_requirements(seen, graph, gadget) - - def _get_gadget(self, target, sets): - """ - Get context-specific gadget. - - target (str): Either "ret", "syscall", or the name of a register we - would like to modify. - - sets (dict{str:int}): The set of other register requirements we are - trying to fulfill in parallel. Values may affect the gadget we decide - to use. - """ - # First, consider user-provided hints before automatically locating - # gadgets. - for hint in self._get_hints(): - # Setup additional requirements based on hint's register moves. - # If a mov target is in sets, require to set the src to the 'sets' - # value. - addl_reqs = { src:sets[dst] for dst, src in hint.movs.items() if dst in sets } - hint = hint.with_requirements(addl_reqs) - - # Pops will be accounted for by the chain builder. - # Immediates will be handled by gadget ordering in chain builder. - # Writes are a non-issue here. - - if hint.syscall: - # Only consider syscalls if the target is syscall. - if target == "syscall": - return hint - elif target in hint.imms: - if hint.imms[target] == sets[target]: - return hint - elif target in hint.pops: - return hint - elif target in hint.movs: - return hint - - # Automatically locate simple gadgets - if target == "ret": - return GadHint(self.gadget(arch.ret)) - - if target == "syscall": - insns = [arch.syscall, arch.ret] if self.safe_syscalls else [arch.syscall] - return GadHint(self.gadget(*insns), syscall=True) - - # target == register - insns = [ i.format(target) for i in arch.popgad ] - return GadHint(self.gadget(*insns), pops=[target], spm=arch.wordsize) - - def _get_clean(self, size): - """ - Get a stack cleaning gadget that moves sp by _at least_ size. - - size (int): Minimum stack pointer move. - """ - # spm values of zero (the default) can't be trusted, as in this case - # the user likely hasn't annotated the GadHint properly. Returning a - # larger move than requested is fine, since the chain builder can insert - # junk to be popped. - for hint in self._get_hints(): - if hint.spm >= size and hint.spm > 0: - return hint - - results = self.gadgets(*arch.cleangad) - table = { int(g.asm[0].group(1), 0): g for g in results } - sizes = sorted([ x for x in table.keys() if x >= size ]) - - if len(sizes) > 0: - return GadHint(table[sizes[0]], spm=sizes[0]) - - raise LookupError( - f"ROP: Need to define a stack move gadget of at least {size}") - - def _get_write(self, dst, src): - """ - Get a memory write gadget, injected with requirements for user data. - - dst (int): The intended memory write location. - src (int): The intended value to write. - """ - # If any exist, take the first write provided by user hints, assuming - # the user's intent to specifically use _this_ write. Follow-on gadgets - # to setup the dst and src registers must be findable. - for hint in self._get_hints(): - if hint.writes: - d, s = list(hint.writes.items())[0] - return hint.with_requirements({d:dst, s:src}) - - # Only take an automatic write gadget if we can prove up front that its - # requirements can be met, otherwise move on. A later search result may - # pass the test. - results = self.gadgets(*arch.writegad) - - for gad in results: - d = gad.asm[0].group("dst") - s = gad.asm[0].group("src") - - try: - # Assert requirements are met. - gadget = GadHint(gad, writes={d: s}, requirements={d:dst, s:src}) - self._discover_requirements(set(), TopologicalSorter(), gadget) - return gadget - except: - pass - - raise LookupError("ROP: Need to define gadgets for memory write / deps") - - def _start_chain(self, *gadgets, sym=None): - """ - Insert a generic ROP chain. - - *gadgets (GadHint): Annotated gadgets to prepare a chain from. - - sym (str): If given, sym is the symbol name used to refer to the - inserted data. - """ - stack = Payload(base=self.end()) - for g in gadgets: - self._build_chain(stack, g, {}) - return self.bin(stack(), sym=self._name("gadget", sym)) - - def _build_chain(self, stack, gadget, sets): - """ - Generate chain data for a given ROP gadget. - - This function recursively builds a ROP chain for the given gadget and - its requirements, storing data in the 'stack' object. - - stack (Payload): Stack data being constructed. - gadget (GadHint): Current gadget we are processing. - - sets (dict{str:int}): The set of other register requirements we are - trying to fulfill in parallel. - """ - # Form a to-do-list of registers from our immediate requirements, - # attempting to order them such that we avoid overwriting/conflicting - # values (this may not be possible). - reqs = gadget.requirements - graph = TopologicalSorter({ r:set() for r in reqs }) - self._discover_requirements(set(), graph, gadget) - to_do_list = [ x for x in graph.static_order() if x in reqs ] - - # Start chain by satisfying to-do-list requirements. - while len(to_do_list) > 0: - g = self._get_gadget(to_do_list[0], reqs) - self._build_chain(stack, g, reqs) - - # This gadget may satisfy multiple items in the to-do-list. - # Specifically, all of its pop and mov targets, and any load - # immediates that match our requirements. Non-matching - # immediates will be handled by a later gadget. - imms = g.imms.keys() & reqs.keys() - imms = [ x for x in imms if g.imms[x] == reqs[x] ] - done = g.pops + list(g.movs) + imms - to_do_list = [ x for x in to_do_list if x not in done ] - - # Append chain data to execute this gadget, but respect offset == 0 - # as a way to disable this gadget (perform a NULL gadget). - if gadget.offset != 0: - # Stack alignment if required. - if gadget.align: - align = -stack.end() % arch.alignment - stack.rep(itob(self._get_gadget("ret", {})), align) - - # "Return address" entry into this gadget. - stack.ret(gadget.offset) - - # The gadget's "inner stack data" will be values to be popped - # and additional junk data to be deallocated by the gadget itself. - sp_dest = len(stack) + gadget.spm - stack.int(*[ sets.get(p, self.POP_MAGIC) for p in gadget.pops ]) - if gadget.spm > 0: - stack.pad(sp_dest, self.SPM_MAGIC) - - # The gadget's "outer stack data" will be the additional values - # explicitly specified by the gadget. Append a separate gadget - # to clean up these values. - if len(gadget.stack) > 0: - size = len(gadget.stack) * arch.wordsize - - if self.clean_stack: - clean = self._get_clean(size) - stack.ret(clean) - sp_dest = len(stack) + clean.spm - else: - ret = self._get_gadget("ret", {}) - stack.ret(ret) - sp_dest = len(stack) + size - - stack.int(*gadget.stack) - stack.pad(sp_dest, self.SPM_MAGIC) diff --git a/sploit/payload/__init__.py b/sploit/payload/__init__.py new file mode 100644 index 0000000..78769b4 --- /dev/null +++ b/sploit/payload/__init__.py @@ -0,0 +1,3 @@ +from .gadhint import * +from .payload import * +from .rop import * diff --git a/sploit/payload/gadhint.py b/sploit/payload/gadhint.py new file mode 100644 index 0000000..9b077fe --- /dev/null +++ b/sploit/payload/gadhint.py @@ -0,0 +1,109 @@ +from dataclasses import dataclass, field +from sploit.rev.gadget import Gadget + +@dataclass +class GadHint: + """ + User-annotated gadget description object + + gadget (Gadget|int): The gadget being annotated. May be a Gadget object or + an offset as an int. + + pops (list[str]): The registers popped by this gadget, in order of + occurrence. + + movs (dict{str:str}): The register-to-register moves made by this gadget. + Keys are destination register names, values are source register names. The + order given is insignificant. + + imms (dict{str:int}): The immediate-to-register loads made by this gadget. + Keys are destination register names, values are immediate values. The order + given is insignificant. + + writes (dict{str:str}): The register-to-memory moves (stores) made by this + gadget. Keys are destination register names (expected to hold memory + locations), values are source register names (expected to hold direct + values). The order given is insignificant. + + requirements (dict{str:int}): The register state that is required before + this gadget should be executed. Keys are register names, values are the + required register values. + + stack (list[int]): A list of words to append to the stack following this + gadget. The first element given is nearest to the top of the stack and the + rest follow in order. + + align (bool): If True, this gadget expects the stack to be aligned prior + to entry. + + syscall (bool): If True, this gadget contains a syscall instruction. + + spm (int): "Stack pointer move" - The amount the stack pointer is adjusted + by this gadget. The effect of executing a terminating "return" instruction + should not be accounted for. A value of zero is taken as "unspecified". + """ + + gadget: int = 0 + pops: list = field(default_factory=list) + movs: dict = field(default_factory=dict) + imms: dict = field(default_factory=dict) + writes: dict = field(default_factory=dict) + requirements: dict = field(default_factory=dict) + stack: list = field(default_factory=list) + align: bool = False + syscall: bool = False + spm: int = 0 + + @property + def offset(self): + """Return gadget offset as an integer.""" + return int(self.gadget) + + def __index__(self): + """Convert object to integer using offset value.""" + return self.offset + + def __add__(self, x): + """Return new object with adjusted offset.""" + return GadHint(self.gadget + x, self.pops, self.movs, self.imms, + self.writes, self.requirements, self.stack, self.align, + self.syscall, self.spm) + + def __sub__(self, x): + """Return new object with adjusted offset.""" + return self + (-x) + + def with_requirements(self, reqs): + """Return new object with additional requirements.""" + for k, v in reqs.items(): + if self.requirements.get(k, v) != v: + raise ValueError( + f"GadHint: Conflicting gadget requirements: " + f"{self.requirements}, {reqs}") + + return GadHint(self.gadget, self.pops, self.movs, self.imms, + self.writes, self.requirements | reqs, self.stack, + self.align, self.syscall, self.spm) + + def __repr__(self): + """Return human-readable GadHint.""" + def fmt(name, prop): + if len(prop) > 0: + return f", {name}={prop}" + return "" + + s = hex(self.gadget) + s = f"Gadget({s})" if type(self.gadget) is Gadget else s + s += fmt("pops", self.pops) + s += fmt("movs", self.movs) + s += fmt("imms", self.imms) + s += fmt("writes", self.writes) + s += fmt("requirements", self.requirements) + s += fmt("stack", self.stack) + if self.align: + s += ", align" + if self.syscall: + s += ", syscall" + if self.spm > 0: + s += f", spm={self.spm}" + return f"GadHint({s})" diff --git a/sploit/payload/payload.py b/sploit/payload/payload.py new file mode 100644 index 0000000..cf105c6 --- /dev/null +++ b/sploit/payload/payload.py @@ -0,0 +1,94 @@ +from sploit.arch import arch, itob +from sploit.symtbl import Symtbl + +class Payload: + MAGIC = b'\xef' + + def __init__(self, **kwargs): + self.payload = b'' + self.sym = Symtbl(**kwargs) + self.ctrs = {} + + def __len__(self): + return len(self.payload) + + def __call__(self, badbytes=b''): + found = [ hex(x) for x in set(self.payload).intersection(badbytes) ] + if len(found) > 0: + raise Exception(f'Payload: bad bytes in content: {found}') + return self.payload + + def _name(self, kind, sym): + if sym is not None: return sym + try: ctr = self.ctrs[kind] + except: ctr = 0 + self.ctrs[kind] = ctr + 1 + return f'{kind}_{ctr}' + + def _append(self, value, sym): + (self.sym @ 0)[sym] = len(self) + self.payload += value + return self + + def _prepend(self, value, sym): + self.sym >>= len(value) + (self.sym @ 0)[sym] = 0 + self.payload = value + self.payload + return self + + def end(self): + return self.sym.base + len(self) + + def bin(self, *values, sym=None): + return self._append(b''.join(values), sym=self._name('bin', sym)) + + def str(self, *values, sym=None): + values = [ v.encode() + b'\x00' for v in values ] + return self.bin(*values, sym=self._name('str', sym)) + + def int(self, *values, sym=None): + values = [ itob(v) for v in values ] + return self.bin(*values, sym=self._name('int', sym)) + + def int8(self, *values, sym=None): + values = [ itob(v, 1) for v in values ] + return self.bin(*values, sym=self._name('int', sym)) + + def int16(self, *values, sym=None): + values = [ itob(v, 2) for v in values ] + return self.bin(*values, sym=self._name('int', sym)) + + def int32(self, *values, sym=None): + values = [ itob(v, 4) for v in values ] + return self.bin(*values, sym=self._name('int', sym)) + + def int64(self, *values, sym=None): + values = [ itob(v, 8) for v in values ] + return self.bin(*values, sym=self._name('int', sym)) + + def ret(self, *values, sym=None): + return self.int(*values, sym=self._name('ret', sym)) + + def sbp(self, *values, sym=None): + if len(values) == 0: + return self.rep(self.MAGIC, arch.wordsize, sym=self._name('sbp', sym)) + return self.int(*values, sym=self._name('sbp', sym)) + + def rep(self, value, size, sym=None): + return self.bin(self._rep_helper(value, size), sym=self._name('rep', sym)) + + def pad(self, size, value=None, sym=None): + return self.bin(self._pad_helper(size, value), sym=self._name('pad', sym)) + + def pad_front(self, size, value=None, sym=None): + return self._prepend(self._pad_helper(size, value), sym=self._name('pad', sym)) + + def _rep_helper(self, value, size, *, explain=''): + if size < 0: + raise Exception(f'Payload: {explain}rep: available space is negative') + if (size := size / len(value)) != int(size): + raise Exception(f'Payload: {explain}rep: element does not divide the space evenly') + return value * int(size) + + def _pad_helper(self, size, value): + return self._rep_helper(value or arch.nopcode, size - len(self), explain='pad: ') diff --git a/sploit/payload/rop.py b/sploit/payload/rop.py new file mode 100644 index 0000000..54226b4 --- /dev/null +++ b/sploit/payload/rop.py @@ -0,0 +1,383 @@ +""" +ROP chain generation utilities + +This module contains tools for automating basic return-oriented-programming +workloads, such as loading register values and calling into arbitrary functions +or syscalls. The tools are currently designed to work on x86 (32 or 64 bit) +and ARM (32 bit only). + +The main appeal of the ROP class is the ability to abstract away the manual +construction of ROP chain data, and instead make declarative statements like +"call this function with these arguments." The ROP class will also utilize its +supplied binary objects to automatically find and use trivial gadgets. + +The user is able to provide annotations for more complicated gadgets, which help +instruct the class how to incorporate them into a ROP chain. This is done with +the GadHint dataclass. GadHint objects are provided to a ROP instance by +including them in the Symtbl of one of the binary objects it is constructed with. +If applicable, a user-supplied gadget will take precedence over automatic gadget +searching. + +See the GadHint class to learn more about the descriptive attributes that are +supported. +""" + +from graphlib import TopologicalSorter + +from sploit.arch import arch, btoi, itob +from sploit.payload.gadhint import GadHint +from sploit.payload.payload import Payload + +class ROP(Payload): + """ + ROP-enabled payload builder + + POP_MAGIC (int): Magic value used for pop instructions where no specific + value is required by the user. + + SPM_MAGIC (bytes): Magic value to fill the stack with when the best + available cleaning gadget is larger than is necessary. + + objects (list[ELF]): The binary objects this ROP instance will consider + for gadget searching. + + safe_syscalls (bool): If True, require that automatically found syscall + instructions are immediately followed by a return instruction. + + align_calls (bool): If True, ensure that the stack return address into + function calls is aligned according to the architecture alignment property. + Knowledge of alignment is taken from the instance Symtbl's base value. + + clean_stack (bool): If True, attempt to locate a cleaning gadget to "pop" + stack data that is leftover from a function call. Required if attempting + to make multiple calls that involve stack-based arguments. + """ + + POP_MAGIC = 0xdead + SPM_MAGIC = b'\x69' + + def __init__(self, *objects, safe_syscalls=True, align_calls=True, + clean_stack=True, **symbols): + """Initialize new ROP builder instance.""" + super().__init__(**symbols) + self.objects = objects + self.safe_syscalls = safe_syscalls + self.align_calls = align_calls + self.clean_stack = clean_stack + + def gadgets(self, *regexes, cont=False): + """Return a list of matching gadgets, considering all objects.""" + results = [] + for obj in self.objects: + results += obj.gadgets(*regexes, cont=cont) + return results + + def gadget(self, *regexes): + """Return the first matching gadget, considering all objects.""" + for obj in self.objects: + try: + return obj.gadget(*regexes) + except: + pass + raise LookupError( + f"ROP: Need to define gadget symbol for {'; '.join(regexes)}") + + def assign(self, *, sym=None, **sets): + """ + Insert a ROP chain to control given registers. + + **sets (str:int): Keyword arguments specify register assignments to + perform with this ROP chain. Argument names correspond to register + names. + + sym (str): If given, sym is the symbol name used to refer to the + inserted data. + """ + gadget = GadHint(0, requirements=sets) + return self._start_chain(gadget, sym=self._name("assign", sym)) + + def call(self, func, *params, sym=None): + """ + Insert a ROP chain to call function. + + func (int): Entry address of function to call. + *params (int): Remaining positional args are passed to func. + + sym (str): If given, sym is the symbol name used to refer to the + inserted data. + """ + register_params = dict(zip(arch.funcargs, params)) + stack_params = params[len(register_params):] + gadget = GadHint(func, requirements=register_params, stack=stack_params, + align=self.align_calls) + return self._start_chain(gadget, sym=self._name("call", sym)) + + def syscall(self, *params, sym=None): + """ + Insert a ROP chain to call kernel. + + *params (int): The first argument is the syscall number. Remaining + positional arguments are passed to the syscall. + + sym (str): If given, sym is the symbol name used to refer to the + inserted data. + """ + if len(params) > len(arch.kernargs): + raise TypeError("ROP: Too many arguments passed to syscall. " + "Target architecture supports up to {len(arch.kernargs)-1}.") + + register_params = dict(zip(arch.kernargs, params)) + gadget = self._get_gadget("syscall", {}).with_requirements(register_params) + return self._start_chain(gadget, sym=self._name("syscall", sym)) + + def memcpy(self, dst, src, *, sym=None): + """ + Insert a ROP chain to write data into memory. + + dst (int): The destination memory address. + src (bytes): The content to write. + + sym (str): If given, sym is the symbol name used to refer to the + inserted data. + """ + gadgets = [] + for idx in range(0, len(src), arch.wordsize): + g = self._get_write(dst + idx, btoi(src[idx:idx+arch.wordsize])) + gadgets.append(g) + return self._start_chain(*gadgets, sym=self._name("memcpy", sym)) + + def _get_hints(self): + """Return all user-supplied gadget hints.""" + return [h for obj in self.objects for _,h in obj.sym if type(h) is GadHint] + + def _discover_requirements(self, seen, graph, current): + """ + Populate gadget dependency graph. + + This function recursively looks up gadgets to ensure all necessary + required gadgets can be found, and stores this information into the + given graph object. Established dependencies encode the order that the + chain builder should attempt to satisfy register requirements. + Dependency loops are detected by the TopologicalSorter. + + seen (set): Set of (register,value) tuples we have already discovered. + graph (TopologicalSorter): Dependency graph model object. + current (GadHint): Current gadget we are processing. + """ + for r, v in current.requirements.items(): + # We key on register name _and_ value because some gadgets may + # only be capable of storing specific values in a target register. + # Requiring a register to store different values may require the + # use of multiple gadgets. + if (r, v) not in seen: + gadget = self._get_gadget(r, current.requirements) + + # Add gadget's requirements to the dependency graph. + # We say that each requirement is a 'successor' to this + # current gadget 'r', so that the chain builder will satisfy + # 'r' first. This prevents the fulfillment of 'r' from + # colbbering targets it requires, as the builder will satisfy + # them afterward. + for x in gadget.requirements: + graph.add(x, r) + + # Treat gadget's load immediates as pseudo-requirements for + # the sake of target ordering, following the same logic + # as above. + for x in gadget.imms: + graph.add(x, r) + + # Mark node as visited + seen.add((r, v)) + self._discover_requirements(seen, graph, gadget) + + def _get_gadget(self, target, sets): + """ + Get context-specific gadget. + + target (str): Either "ret", "syscall", or the name of a register we + would like to modify. + + sets (dict{str:int}): The set of other register requirements we are + trying to fulfill in parallel. Values may affect the gadget we decide + to use. + """ + # First, consider user-provided hints before automatically locating + # gadgets. + for hint in self._get_hints(): + # Setup additional requirements based on hint's register moves. + # If a mov target is in sets, require to set the src to the 'sets' + # value. + addl_reqs = { src:sets[dst] for dst, src in hint.movs.items() if dst in sets } + hint = hint.with_requirements(addl_reqs) + + # Pops will be accounted for by the chain builder. + # Immediates will be handled by gadget ordering in chain builder. + # Writes are a non-issue here. + + if hint.syscall: + # Only consider syscalls if the target is syscall. + if target == "syscall": + return hint + elif target in hint.imms: + if hint.imms[target] == sets[target]: + return hint + elif target in hint.pops: + return hint + elif target in hint.movs: + return hint + + # Automatically locate simple gadgets + if target == "ret": + return GadHint(self.gadget(arch.ret)) + + if target == "syscall": + insns = [arch.syscall, arch.ret] if self.safe_syscalls else [arch.syscall] + return GadHint(self.gadget(*insns), syscall=True) + + # target == register + insns = [ i.format(target) for i in arch.popgad ] + return GadHint(self.gadget(*insns), pops=[target], spm=arch.wordsize) + + def _get_clean(self, size): + """ + Get a stack cleaning gadget that moves sp by _at least_ size. + + size (int): Minimum stack pointer move. + """ + # spm values of zero (the default) can't be trusted, as in this case + # the user likely hasn't annotated the GadHint properly. Returning a + # larger move than requested is fine, since the chain builder can insert + # junk to be popped. + for hint in self._get_hints(): + if hint.spm >= size and hint.spm > 0: + return hint + + results = self.gadgets(*arch.cleangad) + table = { int(g.asm[0].group(1), 0): g for g in results } + sizes = sorted([ x for x in table.keys() if x >= size ]) + + if len(sizes) > 0: + return GadHint(table[sizes[0]], spm=sizes[0]) + + raise LookupError( + f"ROP: Need to define a stack move gadget of at least {size}") + + def _get_write(self, dst, src): + """ + Get a memory write gadget, injected with requirements for user data. + + dst (int): The intended memory write location. + src (int): The intended value to write. + """ + # If any exist, take the first write provided by user hints, assuming + # the user's intent to specifically use _this_ write. Follow-on gadgets + # to setup the dst and src registers must be findable. + for hint in self._get_hints(): + if hint.writes: + d, s = list(hint.writes.items())[0] + return hint.with_requirements({d:dst, s:src}) + + # Only take an automatic write gadget if we can prove up front that its + # requirements can be met, otherwise move on. A later search result may + # pass the test. + results = self.gadgets(*arch.writegad) + + for gad in results: + d = gad.asm[0].group("dst") + s = gad.asm[0].group("src") + + try: + # Assert requirements are met. + gadget = GadHint(gad, writes={d: s}, requirements={d:dst, s:src}) + self._discover_requirements(set(), TopologicalSorter(), gadget) + return gadget + except: + pass + + raise LookupError("ROP: Need to define gadgets for memory write / deps") + + def _start_chain(self, *gadgets, sym=None): + """ + Insert a generic ROP chain. + + *gadgets (GadHint): Annotated gadgets to prepare a chain from. + + sym (str): If given, sym is the symbol name used to refer to the + inserted data. + """ + stack = Payload(base=self.end()) + for g in gadgets: + self._build_chain(stack, g, {}) + return self.bin(stack(), sym=self._name("gadget", sym)) + + def _build_chain(self, stack, gadget, sets): + """ + Generate chain data for a given ROP gadget. + + This function recursively builds a ROP chain for the given gadget and + its requirements, storing data in the 'stack' object. + + stack (Payload): Stack data being constructed. + gadget (GadHint): Current gadget we are processing. + + sets (dict{str:int}): The set of other register requirements we are + trying to fulfill in parallel. + """ + # Form a to-do-list of registers from our immediate requirements, + # attempting to order them such that we avoid overwriting/conflicting + # values (this may not be possible). + reqs = gadget.requirements + graph = TopologicalSorter({ r:set() for r in reqs }) + self._discover_requirements(set(), graph, gadget) + to_do_list = [ x for x in graph.static_order() if x in reqs ] + + # Start chain by satisfying to-do-list requirements. + while len(to_do_list) > 0: + g = self._get_gadget(to_do_list[0], reqs) + self._build_chain(stack, g, reqs) + + # This gadget may satisfy multiple items in the to-do-list. + # Specifically, all of its pop and mov targets, and any load + # immediates that match our requirements. Non-matching + # immediates will be handled by a later gadget. + imms = g.imms.keys() & reqs.keys() + imms = [ x for x in imms if g.imms[x] == reqs[x] ] + done = g.pops + list(g.movs) + imms + to_do_list = [ x for x in to_do_list if x not in done ] + + # Append chain data to execute this gadget, but respect offset == 0 + # as a way to disable this gadget (perform a NULL gadget). + if gadget.offset != 0: + # Stack alignment if required. + if gadget.align: + align = -stack.end() % arch.alignment + stack.rep(itob(self._get_gadget("ret", {})), align) + + # "Return address" entry into this gadget. + stack.ret(gadget.offset) + + # The gadget's "inner stack data" will be values to be popped + # and additional junk data to be deallocated by the gadget itself. + sp_dest = len(stack) + gadget.spm + stack.int(*[ sets.get(p, self.POP_MAGIC) for p in gadget.pops ]) + if gadget.spm > 0: + stack.pad(sp_dest, self.SPM_MAGIC) + + # The gadget's "outer stack data" will be the additional values + # explicitly specified by the gadget. Append a separate gadget + # to clean up these values. + if len(gadget.stack) > 0: + size = len(gadget.stack) * arch.wordsize + + if self.clean_stack: + clean = self._get_clean(size) + stack.ret(clean) + sp_dest = len(stack) + clean.spm + else: + ret = self._get_gadget("ret", {}) + stack.ret(ret) + sp_dest = len(stack) + size + + stack.int(*gadget.stack) + stack.pad(sp_dest, self.SPM_MAGIC) -- cgit v1.2.3 From ead4ec1340555569e00919891383e05dca839b01 Mon Sep 17 00:00:00 2001 From: Malfurious Date: Fri, 12 Jan 2024 14:28:26 -0500 Subject: util: Promote from module to package We would like to move additional modules under the namespace of "util" to clean up the top-level "sploit" package. To start, the functions from the previous util module are moved. Given the package is named "util" the module is renamed to "cmd" to somewhat match the theme of the contained functions. Per the previous commits, these functions are now exposed via the util package as well. Signed-off-by: Malfurious --- sploit/rev/ldd.py | 2 +- sploit/rev/r2.py | 2 +- sploit/util.py | 26 -------------------------- sploit/util/__init__.py | 1 + sploit/util/cmd.py | 26 ++++++++++++++++++++++++++ 5 files changed, 29 insertions(+), 28 deletions(-) delete mode 100644 sploit/util.py create mode 100644 sploit/util/__init__.py create mode 100644 sploit/util/cmd.py (limited to 'sploit') diff --git a/sploit/rev/ldd.py b/sploit/rev/ldd.py index 1a28c7c..b5bc564 100644 --- a/sploit/rev/ldd.py +++ b/sploit/rev/ldd.py @@ -1,4 +1,4 @@ -from sploit.util import run_cmd_cached +from sploit.util.cmd import run_cmd_cached from sploit.log import ilog import re diff --git a/sploit/rev/r2.py b/sploit/rev/r2.py index 1be731c..d9dbabd 100644 --- a/sploit/rev/r2.py +++ b/sploit/rev/r2.py @@ -2,7 +2,7 @@ from sploit.arch import arch from sploit.log import ilog from sploit.rev.gadget import Gadget from sploit.symtbl import Symtbl -from sploit.util import run_cmd_cached +from sploit.util.cmd import run_cmd_cached from collections import namedtuple as nt from functools import cache diff --git a/sploit/util.py b/sploit/util.py deleted file mode 100644 index 3a2b842..0000000 --- a/sploit/util.py +++ /dev/null @@ -1,26 +0,0 @@ -from os import path -from subprocess import run - -def run_cmd(cmd,cwd=None): - return run(cmd,cwd=cwd,capture_output=True,text=True,check=True).stdout.split('\n')[:-1] - -__RUN_CACHE__ = {} -def run_cmd_cached(cmd,cwd=None): - key = ''.join(cmd) - if key in __RUN_CACHE__: - return __RUN_CACHE__[key] - else: - result = run_cmd(cmd,cwd) - __RUN_CACHE__[key] = result - return result - -#try to get the version through git -def git_version(): - try: - cwd = path.dirname(path.realpath(__file__)) - version = run_cmd(["git","describe","--always","--first-parent","--dirty"],cwd=cwd)[0] - #PEP440 compliance - version = version.replace('-','+',1).replace('-','.') - return version - except: - return "0+unknown.version" diff --git a/sploit/util/__init__.py b/sploit/util/__init__.py new file mode 100644 index 0000000..9103509 --- /dev/null +++ b/sploit/util/__init__.py @@ -0,0 +1 @@ +from .cmd import * diff --git a/sploit/util/cmd.py b/sploit/util/cmd.py new file mode 100644 index 0000000..3a2b842 --- /dev/null +++ b/sploit/util/cmd.py @@ -0,0 +1,26 @@ +from os import path +from subprocess import run + +def run_cmd(cmd,cwd=None): + return run(cmd,cwd=cwd,capture_output=True,text=True,check=True).stdout.split('\n')[:-1] + +__RUN_CACHE__ = {} +def run_cmd_cached(cmd,cwd=None): + key = ''.join(cmd) + if key in __RUN_CACHE__: + return __RUN_CACHE__[key] + else: + result = run_cmd(cmd,cwd) + __RUN_CACHE__[key] = result + return result + +#try to get the version through git +def git_version(): + try: + cwd = path.dirname(path.realpath(__file__)) + version = run_cmd(["git","describe","--always","--first-parent","--dirty"],cwd=cwd)[0] + #PEP440 compliance + version = version.replace('-','+',1).replace('-','.') + return version + except: + return "0+unknown.version" -- cgit v1.2.3 From 074a15310b8bbeeeeb00bf5ab5877c12f1ca1861 Mon Sep 17 00:00:00 2001 From: Malfurious Date: Fri, 12 Jan 2024 16:26:03 -0500 Subject: log: Move to sploit.util package Signed-off-by: Malfurious --- sploit/__init__.py | 1 - sploit/comm.py | 2 +- sploit/log.py | 32 -------------------------------- sploit/main.py | 2 +- sploit/rev/ldd.py | 2 +- sploit/rev/r2.py | 2 +- sploit/util/__init__.py | 1 + sploit/util/log.py | 32 ++++++++++++++++++++++++++++++++ 8 files changed, 37 insertions(+), 37 deletions(-) delete mode 100644 sploit/log.py create mode 100644 sploit/util/log.py (limited to 'sploit') diff --git a/sploit/__init__.py b/sploit/__init__.py index f761ead..f443fe9 100644 --- a/sploit/__init__.py +++ b/sploit/__init__.py @@ -1,7 +1,6 @@ from sploit import ( arch, comm, - log, payload, rev, symtbl, diff --git a/sploit/comm.py b/sploit/comm.py index 522d540..3bc448e 100644 --- a/sploit/comm.py +++ b/sploit/comm.py @@ -4,8 +4,8 @@ import os import sys import select -from sploit.log import * from sploit.until import bind +from sploit.util.log import * class Comm: logonread = True diff --git a/sploit/log.py b/sploit/log.py deleted file mode 100644 index 823b252..0000000 --- a/sploit/log.py +++ /dev/null @@ -1,32 +0,0 @@ -import codecs -import sys - -# https://docs.python.org/3/library/codecs.html#standard-encodings -ENCODING = None - -ERROR = 31 -WARNING = 33 -STATUS = 32 -NORMAL = 0 -ALT = 90 - -def enc_value(value, enc): - if type(value) is bytes: - if enc is not None: - value = codecs.encode(value, enc) - elif ENCODING is not None: - value = codecs.encode(value, ENCODING) - value = str(value)[2:-1] # strip b'' - return str(value) - -def generic_log(*values, sep, end, file, flush, enc, color): - string = sep.join([ enc_value(x, enc) for x in values ]) - print(f'\033[{color}m{string}\033[0m', end=end, file=file, flush=flush) - -# For library internal use -def ilog(*values, sep=' ', end='\n', file=sys.stderr, flush=True, enc=None, color=STATUS): - generic_log(*values, sep=sep, end=end, file=file, flush=flush, enc=enc, color=color) - -# For external use in user script (via print = elog) -def elog(*values, sep=' ', end='\n', file=sys.stdout, flush=True, enc=None, color=ALT): - generic_log(*values, sep=sep, end=end, file=file, flush=flush, enc=enc, color=color) diff --git a/sploit/main.py b/sploit/main.py index 6d71196..0d022f2 100644 --- a/sploit/main.py +++ b/sploit/main.py @@ -5,7 +5,7 @@ import tempfile import traceback from sploit.comm import * -from sploit.log import * +from sploit.util.log import * from sploit import __version__ def print_banner(color, line1=__version__, line2='', line3=''): diff --git a/sploit/rev/ldd.py b/sploit/rev/ldd.py index b5bc564..b773abf 100644 --- a/sploit/rev/ldd.py +++ b/sploit/rev/ldd.py @@ -1,5 +1,5 @@ from sploit.util.cmd import run_cmd_cached -from sploit.log import ilog +from sploit.util.log import ilog import re from collections import namedtuple as nt diff --git a/sploit/rev/r2.py b/sploit/rev/r2.py index d9dbabd..e81adc9 100644 --- a/sploit/rev/r2.py +++ b/sploit/rev/r2.py @@ -1,8 +1,8 @@ from sploit.arch import arch -from sploit.log import ilog from sploit.rev.gadget import Gadget from sploit.symtbl import Symtbl from sploit.util.cmd import run_cmd_cached +from sploit.util.log import ilog from collections import namedtuple as nt from functools import cache diff --git a/sploit/util/__init__.py b/sploit/util/__init__.py index 9103509..32a079b 100644 --- a/sploit/util/__init__.py +++ b/sploit/util/__init__.py @@ -1 +1,2 @@ from .cmd import * +from .log import * diff --git a/sploit/util/log.py b/sploit/util/log.py new file mode 100644 index 0000000..823b252 --- /dev/null +++ b/sploit/util/log.py @@ -0,0 +1,32 @@ +import codecs +import sys + +# https://docs.python.org/3/library/codecs.html#standard-encodings +ENCODING = None + +ERROR = 31 +WARNING = 33 +STATUS = 32 +NORMAL = 0 +ALT = 90 + +def enc_value(value, enc): + if type(value) is bytes: + if enc is not None: + value = codecs.encode(value, enc) + elif ENCODING is not None: + value = codecs.encode(value, ENCODING) + value = str(value)[2:-1] # strip b'' + return str(value) + +def generic_log(*values, sep, end, file, flush, enc, color): + string = sep.join([ enc_value(x, enc) for x in values ]) + print(f'\033[{color}m{string}\033[0m', end=end, file=file, flush=flush) + +# For library internal use +def ilog(*values, sep=' ', end='\n', file=sys.stderr, flush=True, enc=None, color=STATUS): + generic_log(*values, sep=sep, end=end, file=file, flush=flush, enc=enc, color=color) + +# For external use in user script (via print = elog) +def elog(*values, sep=' ', end='\n', file=sys.stdout, flush=True, enc=None, color=ALT): + generic_log(*values, sep=sep, end=end, file=file, flush=flush, enc=enc, color=color) -- cgit v1.2.3 From 03e1f242f69903fcd132010228259231301b3f43 Mon Sep 17 00:00:00 2001 From: Malfurious Date: Sat, 13 Jan 2024 08:45:40 -0500 Subject: comm: Promote from module to package This is done to help clean the top-level "sploit" package. Furthermore, there is some planned future work to refactor comm into multiple modules, so this lays some groundwork for that. Signed-off-by: Malfurious --- sploit/comm.py | 205 ------------------------------------------------ sploit/comm/__init__.py | 1 + sploit/comm/comm.py | 205 ++++++++++++++++++++++++++++++++++++++++++++++++ sploit/main.py | 2 +- 4 files changed, 207 insertions(+), 206 deletions(-) delete mode 100644 sploit/comm.py create mode 100644 sploit/comm/__init__.py create mode 100644 sploit/comm/comm.py (limited to 'sploit') diff --git a/sploit/comm.py b/sploit/comm.py deleted file mode 100644 index 3bc448e..0000000 --- a/sploit/comm.py +++ /dev/null @@ -1,205 +0,0 @@ -import subprocess -import tempfile -import os -import sys -import select - -from sploit.until import bind -from sploit.util.log import * - -class Comm: - logonread = True - logonwrite = False - flushonwrite = True - readonwrite = False - timeout = 250 # milliseconds - last = None # result of last discrete read - - def __init__(self, backend): - self.back = backend - - def shutdown(self): - try: - self.back.stdout.close() - except BrokenPipeError: - pass - - def read(self, size=-1): - if size < 0: - return self.readall_nonblock() - elif size == 0: - data = b'' - else: - data = self.back.stdin.read(size) - if(data == b''): - raise BrokenPipeError('Tried to read on broken pipe') - if self.logonread : ilog(data, file=sys.stdout, color=NORMAL) - self.last = data - return data - - def readline(self): - data = self.back.stdin.readline() - if(data == b''): - raise BrokenPipeError('Tried to read on broken pipe') - if data.endswith(b'\n'): - data = data[:-1] - if self.logonread : ilog(data, file=sys.stdout, color=NORMAL) - self.last = data - return data - - def readall(self): - data = b'' - try: - for line in self.back.stdin: - tolog = (line[:-1] if line.endswith(b'\n') else line) - if self.logonread : ilog(tolog, file=sys.stdout, color=NORMAL) - data += line - except KeyboardInterrupt: - pass - self.last = data - return data - - def readall_nonblock(self): - try: - data = b'' - os.set_blocking(self.back.stdin.fileno(), False) - poll = select.poll() - poll.register(self.back.stdin, select.POLLIN) - while True: - poll.poll(self.timeout) - d = self.readall() - if len(d) == 0: - self.last = data - return data - data += d - finally: - os.set_blocking(self.back.stdin.fileno(), True) - - def readuntil(self, pred, /, *args, **kwargs): - data = b'' - pred = bind(pred, *args, **kwargs) - l = self.logonread - self.logonread = False - try: - while(True): - data += self.read(1) - if(pred(data)): - break - finally: - self.logonread = l - if self.logonread : ilog(data, file=sys.stdout, color=NORMAL) - self.last = data - return data - - def readlineuntil(self, pred, /, *args, **kwargs): - dataarr = [] - pred = bind(pred, *args, **kwargs) - while(True): - dataarr.append(self.readline()) - if(pred(dataarr)): - break - self.last = dataarr - return dataarr - - def write(self, data): - self.back.stdout.write(data) - if self.flushonwrite : self.back.stdout.flush() - if self.logonwrite : ilog(data, file=sys.stdout, color=ALT) - if self.readonwrite : self.readall_nonblock() - - def writeline(self, data=b''): - self.write(data + b'\n') - - def interact(self): - stdin = sys.stdin.buffer - event = select.POLLIN - - def readall_stdin(): - try: - os.set_blocking(stdin.fileno(), False) - for line in stdin: - self.write(line) - finally: - os.set_blocking(stdin.fileno(), True) - - readtable = { - self.back.stdin.fileno(): self.readall_nonblock, - stdin.fileno(): readall_stdin, - } - - try: - ilog("<--Interact Mode-->") - l = self.logonread - self.logonread = True - - poll = select.poll() - poll.register(self.back.stdin, event) - poll.register(stdin, event) - - readtable[self.back.stdin.fileno()]() - while True: - for fd, e in poll.poll(self.timeout): - if not e & event: return - readtable[fd]() - except KeyboardInterrupt: - pass - finally: - self.logonread = l - ilog("<--Interact Mode Done-->") - -def popen(cmdline=''): - io = Comm((Process(cmdline.split()) if len(cmdline) > 0 else Pipes())) - io.readall_nonblock() - io.readonwrite = True - return io - -class Process: - def __init__(self, args): - ilog(f"Running: {' '.join(args)}") - self.proc = subprocess.Popen(args, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - preexec_fn=lambda : os.setpgrp()) - ilog(f"PID: {self.proc.pid}") - self.stdin = self.proc.stdout - self.stdout = self.proc.stdin - - def __del__(self): - if getattr(self, 'proc', None) == None : return - if(self.proc.poll() != None): - return - try: - ilog("Waiting on Target Program to End...") - ilog("Press Ctrl+C to Forcefully Kill It...") - self.proc.wait() - except KeyboardInterrupt: - self.proc.kill() - -class Pipes: - def __init__(self, tmp=None): - if(tmp == None): - self.dir = tempfile.TemporaryDirectory() - dirname = self.dir.name - else: - if(not os.path.exists(tmp)): - os.mkdir(tmp) - dirname = tmp - self.pathin = os.path.join(dirname, "in") - self.pathout = os.path.join(dirname, "out") - os.mkfifo(self.pathin) - os.mkfifo(self.pathout) - ilog("Waiting on Target to Connect...", file=sys.stdout) - ilog(f"<{self.pathin} >{self.pathout}", file=sys.stdout) - self.stdout = open(self.pathin, "wb") - self.stdin = open(self.pathout, "rb") - ilog("Connected!") - - def __del__(self): - try: - if getattr(self,'stdout',None) : self.stdout.close() - if getattr(self,'stdin',None) : self.stdin.close() - except BrokenPipeError: - pass - if getattr(self,'pathin',None) and os.path.exists(self.pathin) : os.unlink(self.pathin) - if getattr(self,'pathout',None) and os.path.exists(self.pathout) : os.unlink(self.pathout) diff --git a/sploit/comm/__init__.py b/sploit/comm/__init__.py new file mode 100644 index 0000000..ffbc402 --- /dev/null +++ b/sploit/comm/__init__.py @@ -0,0 +1 @@ +from .comm import * diff --git a/sploit/comm/comm.py b/sploit/comm/comm.py new file mode 100644 index 0000000..3bc448e --- /dev/null +++ b/sploit/comm/comm.py @@ -0,0 +1,205 @@ +import subprocess +import tempfile +import os +import sys +import select + +from sploit.until import bind +from sploit.util.log import * + +class Comm: + logonread = True + logonwrite = False + flushonwrite = True + readonwrite = False + timeout = 250 # milliseconds + last = None # result of last discrete read + + def __init__(self, backend): + self.back = backend + + def shutdown(self): + try: + self.back.stdout.close() + except BrokenPipeError: + pass + + def read(self, size=-1): + if size < 0: + return self.readall_nonblock() + elif size == 0: + data = b'' + else: + data = self.back.stdin.read(size) + if(data == b''): + raise BrokenPipeError('Tried to read on broken pipe') + if self.logonread : ilog(data, file=sys.stdout, color=NORMAL) + self.last = data + return data + + def readline(self): + data = self.back.stdin.readline() + if(data == b''): + raise BrokenPipeError('Tried to read on broken pipe') + if data.endswith(b'\n'): + data = data[:-1] + if self.logonread : ilog(data, file=sys.stdout, color=NORMAL) + self.last = data + return data + + def readall(self): + data = b'' + try: + for line in self.back.stdin: + tolog = (line[:-1] if line.endswith(b'\n') else line) + if self.logonread : ilog(tolog, file=sys.stdout, color=NORMAL) + data += line + except KeyboardInterrupt: + pass + self.last = data + return data + + def readall_nonblock(self): + try: + data = b'' + os.set_blocking(self.back.stdin.fileno(), False) + poll = select.poll() + poll.register(self.back.stdin, select.POLLIN) + while True: + poll.poll(self.timeout) + d = self.readall() + if len(d) == 0: + self.last = data + return data + data += d + finally: + os.set_blocking(self.back.stdin.fileno(), True) + + def readuntil(self, pred, /, *args, **kwargs): + data = b'' + pred = bind(pred, *args, **kwargs) + l = self.logonread + self.logonread = False + try: + while(True): + data += self.read(1) + if(pred(data)): + break + finally: + self.logonread = l + if self.logonread : ilog(data, file=sys.stdout, color=NORMAL) + self.last = data + return data + + def readlineuntil(self, pred, /, *args, **kwargs): + dataarr = [] + pred = bind(pred, *args, **kwargs) + while(True): + dataarr.append(self.readline()) + if(pred(dataarr)): + break + self.last = dataarr + return dataarr + + def write(self, data): + self.back.stdout.write(data) + if self.flushonwrite : self.back.stdout.flush() + if self.logonwrite : ilog(data, file=sys.stdout, color=ALT) + if self.readonwrite : self.readall_nonblock() + + def writeline(self, data=b''): + self.write(data + b'\n') + + def interact(self): + stdin = sys.stdin.buffer + event = select.POLLIN + + def readall_stdin(): + try: + os.set_blocking(stdin.fileno(), False) + for line in stdin: + self.write(line) + finally: + os.set_blocking(stdin.fileno(), True) + + readtable = { + self.back.stdin.fileno(): self.readall_nonblock, + stdin.fileno(): readall_stdin, + } + + try: + ilog("<--Interact Mode-->") + l = self.logonread + self.logonread = True + + poll = select.poll() + poll.register(self.back.stdin, event) + poll.register(stdin, event) + + readtable[self.back.stdin.fileno()]() + while True: + for fd, e in poll.poll(self.timeout): + if not e & event: return + readtable[fd]() + except KeyboardInterrupt: + pass + finally: + self.logonread = l + ilog("<--Interact Mode Done-->") + +def popen(cmdline=''): + io = Comm((Process(cmdline.split()) if len(cmdline) > 0 else Pipes())) + io.readall_nonblock() + io.readonwrite = True + return io + +class Process: + def __init__(self, args): + ilog(f"Running: {' '.join(args)}") + self.proc = subprocess.Popen(args, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + preexec_fn=lambda : os.setpgrp()) + ilog(f"PID: {self.proc.pid}") + self.stdin = self.proc.stdout + self.stdout = self.proc.stdin + + def __del__(self): + if getattr(self, 'proc', None) == None : return + if(self.proc.poll() != None): + return + try: + ilog("Waiting on Target Program to End...") + ilog("Press Ctrl+C to Forcefully Kill It...") + self.proc.wait() + except KeyboardInterrupt: + self.proc.kill() + +class Pipes: + def __init__(self, tmp=None): + if(tmp == None): + self.dir = tempfile.TemporaryDirectory() + dirname = self.dir.name + else: + if(not os.path.exists(tmp)): + os.mkdir(tmp) + dirname = tmp + self.pathin = os.path.join(dirname, "in") + self.pathout = os.path.join(dirname, "out") + os.mkfifo(self.pathin) + os.mkfifo(self.pathout) + ilog("Waiting on Target to Connect...", file=sys.stdout) + ilog(f"<{self.pathin} >{self.pathout}", file=sys.stdout) + self.stdout = open(self.pathin, "wb") + self.stdin = open(self.pathout, "rb") + ilog("Connected!") + + def __del__(self): + try: + if getattr(self,'stdout',None) : self.stdout.close() + if getattr(self,'stdin',None) : self.stdin.close() + except BrokenPipeError: + pass + if getattr(self,'pathin',None) and os.path.exists(self.pathin) : os.unlink(self.pathin) + if getattr(self,'pathout',None) and os.path.exists(self.pathout) : os.unlink(self.pathout) diff --git a/sploit/main.py b/sploit/main.py index 0d022f2..5fd5192 100644 --- a/sploit/main.py +++ b/sploit/main.py @@ -4,7 +4,7 @@ from os.path import isdir import tempfile import traceback -from sploit.comm import * +from sploit.comm.comm import * from sploit.util.log import * from sploit import __version__ -- cgit v1.2.3 From d051f6d7daa4e0c025dcb58af392cca47c6019d7 Mon Sep 17 00:00:00 2001 From: Malfurious Date: Sat, 13 Jan 2024 09:26:09 -0500 Subject: Remove extra "main.py" file The CLI logic is moved to sploit/__main__.py. This file is now the target of: - python -m sploit - sploit.py (via import) - sploit (installed executable - via pyproject.toml) A module guard (`if __name__ == "__main__"`) is added to allow the application to run when this file is invoked directly. And the entrypoint symlink is no longer necessary. Signed-off-by: Malfurious --- sploit/__main__.py | 70 +++++++++++++++++++++++++++++++++++++++++++++++++++++- sploit/main.py | 65 -------------------------------------------------- 2 files changed, 69 insertions(+), 66 deletions(-) mode change 120000 => 100644 sploit/__main__.py delete mode 100644 sploit/main.py (limited to 'sploit') diff --git a/sploit/__main__.py b/sploit/__main__.py deleted file mode 120000 index 98537fc..0000000 --- a/sploit/__main__.py +++ /dev/null @@ -1 +0,0 @@ -../sploit.py \ No newline at end of file diff --git a/sploit/__main__.py b/sploit/__main__.py new file mode 100644 index 0000000..5b694a2 --- /dev/null +++ b/sploit/__main__.py @@ -0,0 +1,69 @@ +from argparse import ArgumentParser, REMAINDER +import gc +from os.path import isdir +import tempfile +import traceback + +from sploit.comm.comm import * +from sploit.util.log import * +from sploit import __version__ + +def print_banner(color, line1=__version__, line2='', line3=''): + ilog() + ilog(' ░▒█▀▀▀█░▒█▀▀█░▒█░░░░▒█▀▀▀█░▀█▀░▀▀█▀▀ ', end='', color=ALT) + ilog(line1, color=ALT) + ilog(' ░░▀▀▀▄▄░▒█▄▄█░▒█░░░░▒█░░▒█░▒█░░░▒█░░ ', end='', color=color) + ilog(line2, color=ALT) + ilog(' ░▒█▄▄▄█░▒█░░░░▒█▄▄█░▒█▄▄▄█░▄█▄░░▒█░░ ', end='', color=ALT) + ilog(line3, color=ALT) + ilog() + +def main(): + parser = ArgumentParser(description='Execute Sploit script against target') + parser.add_argument('script', help='Exploit script to run') + parser.add_argument('target', nargs=REMAINDER, help='Target cmdline or pipes directory') + args = parser.parse_args() + + if len(args.target) == 0: + with tempfile.TemporaryDirectory() as tmpdir: + pipe(args.script, tmpdir) + elif len(args.target) == 1 and isdir(args.target[0]): + pipe(args.script, args.target[0]) + else: + target(args.script, args.target) + +def pipe(script, tmpdir): + print_banner(ERROR, line3='Pipe Mode') + while True: + try: + p = Pipes(tmpdir) + except KeyboardInterrupt: + break + runscript(script, Comm(p)) + del p + +def target(script, target): + print_banner(STATUS, line3='Subprocess Mode') + runscript(script, Comm(Process(target))) + +def runscript(script, comm): + try: + ilog("Running Script...") + code = compile(open(script).read(), script, 'exec') + exec(code, {'io': comm, 'print': elog}) + ilog("Script Finished!") + return + except KeyboardInterrupt: + pass + except: + ilog(traceback.format_exc(), end='', color=ERROR) + finally: + comm.shutdown() + comm.readall() + gc.collect() + + ilog("Script Ended Early!", color=WARNING) + + +if __name__ == "__main__": + main() diff --git a/sploit/main.py b/sploit/main.py deleted file mode 100644 index 5fd5192..0000000 --- a/sploit/main.py +++ /dev/null @@ -1,65 +0,0 @@ -from argparse import ArgumentParser, REMAINDER -import gc -from os.path import isdir -import tempfile -import traceback - -from sploit.comm.comm import * -from sploit.util.log import * -from sploit import __version__ - -def print_banner(color, line1=__version__, line2='', line3=''): - ilog() - ilog(' ░▒█▀▀▀█░▒█▀▀█░▒█░░░░▒█▀▀▀█░▀█▀░▀▀█▀▀ ', end='', color=ALT) - ilog(line1, color=ALT) - ilog(' ░░▀▀▀▄▄░▒█▄▄█░▒█░░░░▒█░░▒█░▒█░░░▒█░░ ', end='', color=color) - ilog(line2, color=ALT) - ilog(' ░▒█▄▄▄█░▒█░░░░▒█▄▄█░▒█▄▄▄█░▄█▄░░▒█░░ ', end='', color=ALT) - ilog(line3, color=ALT) - ilog() - -def main(): - parser = ArgumentParser(description='Execute Sploit script against target') - parser.add_argument('script', help='Exploit script to run') - parser.add_argument('target', nargs=REMAINDER, help='Target cmdline or pipes directory') - args = parser.parse_args() - - if len(args.target) == 0: - with tempfile.TemporaryDirectory() as tmpdir: - pipe(args.script, tmpdir) - elif len(args.target) == 1 and isdir(args.target[0]): - pipe(args.script, args.target[0]) - else: - target(args.script, args.target) - -def pipe(script, tmpdir): - print_banner(ERROR, line3='Pipe Mode') - while True: - try: - p = Pipes(tmpdir) - except KeyboardInterrupt: - break - runscript(script, Comm(p)) - del p - -def target(script, target): - print_banner(STATUS, line3='Subprocess Mode') - runscript(script, Comm(Process(target))) - -def runscript(script, comm): - try: - ilog("Running Script...") - code = compile(open(script).read(), script, 'exec') - exec(code, {'io': comm, 'print': elog}) - ilog("Script Finished!") - return - except KeyboardInterrupt: - pass - except: - ilog(traceback.format_exc(), end='', color=ERROR) - finally: - comm.shutdown() - comm.readall() - gc.collect() - - ilog("Script Ended Early!", color=WARNING) -- cgit v1.2.3 From 67b5a873a2f6f506fc0450fcb22b58f3e6612dc0 Mon Sep 17 00:00:00 2001 From: Malfurious Date: Sat, 13 Jan 2024 14:59:20 -0500 Subject: sploit: Expose modules' contents through package This completes the overarching package reorganization changes. The contents of the top-level "sploit" package's direct children modules are exported via the package. Explicit imports for sploit's subpackages are not necessary. Other package __init__.py files are using relative imports. However, doing so here causes the hatchling build process to fail. Fortunately, since this is the top level, absolute paths aren't too long. The last few modules left in this package have been kept back since they lack any specific niche, are considered "universally relevant", or are typically imported so frequently that it makes sense to provide them to scripts automatically. Signed-off-by: Malfurious --- sploit/__init__.py | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) (limited to 'sploit') diff --git a/sploit/__init__.py b/sploit/__init__.py index f443fe9..dc5943f 100644 --- a/sploit/__init__.py +++ b/sploit/__init__.py @@ -1,11 +1,6 @@ -from sploit import ( - arch, - comm, - payload, - rev, - symtbl, - until, - util, -) +from sploit.arch import * +from sploit.symtbl import * +from sploit.until import * -__version__ = util.git_version() +from sploit.util import git_version as __git_version +__version__ = __git_version() -- cgit v1.2.3 From 221742f7c5c89dc50ec4374bed5d2ccc0d7534bf Mon Sep 17 00:00:00 2001 From: Malfurious Date: Sat, 13 Jan 2024 15:03:30 -0500 Subject: main: Automatically provide top-level sploit modules to user scripts Signed-off-by: Malfurious --- sploit/__main__.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) (limited to 'sploit') diff --git a/sploit/__main__.py b/sploit/__main__.py index 5b694a2..5d53ca6 100644 --- a/sploit/__main__.py +++ b/sploit/__main__.py @@ -46,11 +46,19 @@ def target(script, target): print_banner(STATUS, line3='Subprocess Mode') runscript(script, Comm(Process(target))) +def user_scope(comm): + import sploit as lib + scope = { name: getattr(lib, name) for name in dir(lib) } + scope['__version__'] = __version__ + scope['print'] = elog + scope['io'] = comm + return scope + def runscript(script, comm): try: ilog("Running Script...") code = compile(open(script).read(), script, 'exec') - exec(code, {'io': comm, 'print': elog}) + exec(code, user_scope(comm)) ilog("Script Finished!") return except KeyboardInterrupt: -- cgit v1.2.3