Add typing

This commit is contained in:
2022-05-07 15:46:32 +02:00
parent b827ebf4c3
commit a221e378bf

View File

@@ -4,6 +4,7 @@ import ctypes
import ctypes.util import ctypes.util
import distutils.sysconfig import distutils.sysconfig
from functools import wraps from functools import wraps
from typing import Any, Callable, List, Tuple, Union
import pkg_resources import pkg_resources
import inspect import inspect
import os.path import os.path
@@ -498,7 +499,7 @@ class UcCleanupManager(object):
class Uc(object): class Uc(object):
_cleanup = UcCleanupManager() _cleanup = UcCleanupManager()
def __init__(self, arch, mode): def __init__(self, arch: int, mode: int):
# verify version compatibility with the core before doing anything # verify version compatibility with the core before doing anything
(major, minor, _combined) = uc_version() (major, minor, _combined) = uc_version()
# print("core version =", uc_version()) # print("core version =", uc_version())
@@ -522,7 +523,7 @@ class Uc(object):
self._hook_exception = None # The exception raised in a hook self._hook_exception = None # The exception raised in a hook
@staticmethod @staticmethod
def release_handle(uch): def release_handle(uch: ctypes.CDLL):
if uch: if uch:
try: try:
status = _uc.uc_close(uch) status = _uc.uc_close(uch)
@@ -532,7 +533,7 @@ class Uc(object):
pass pass
# emulate from @begin, and stop when reaching address @until # emulate from @begin, and stop when reaching address @until
def emu_start(self, begin, until, timeout=0, count=0): def emu_start(self, begin: int, until: int, timeout: int=0, count: int=0):
self._hook_exception = None self._hook_exception = None
status = _uc.uc_emu_start(self._uch, begin, until, timeout, count) status = _uc.uc_emu_start(self._uch, begin, until, timeout, count)
if status != uc.UC_ERR_OK: if status != uc.UC_ERR_OK:
@@ -547,24 +548,24 @@ class Uc(object):
if status != uc.UC_ERR_OK: if status != uc.UC_ERR_OK:
raise UcError(status) raise UcError(status)
# return the value of a register # return the value of a register, for @opt parameter, specify int for x86 msr, tuple for arm cp/neon regs.
def reg_read(self, reg_id, opt=None): def reg_read(self, reg_id: int, opt: Union[None, int, Tuple]=None):
return reg_read(functools.partial(_uc.uc_reg_read, self._uch), self._arch, reg_id, opt) return reg_read(functools.partial(_uc.uc_reg_read, self._uch), self._arch, reg_id, opt)
# write to a register # write to a register, tuple for arm cp regs.
def reg_write(self, reg_id, value): def reg_write(self, reg_id: Union[int, Tuple], value: int):
return reg_write(functools.partial(_uc.uc_reg_write, self._uch), self._arch, reg_id, value) return reg_write(functools.partial(_uc.uc_reg_write, self._uch), self._arch, reg_id, value)
# read from MSR - X86 only # read from MSR - X86 only
def msr_read(self, msr_id): def msr_read(self, msr_id: int):
return self.reg_read(x86_const.UC_X86_REG_MSR, msr_id) return self.reg_read(x86_const.UC_X86_REG_MSR, msr_id)
# write to MSR - X86 only # write to MSR - X86 only
def msr_write(self, msr_id, value): def msr_write(self, msr_id, value: int):
return self.reg_write(x86_const.UC_X86_REG_MSR, (msr_id, value)) return self.reg_write(x86_const.UC_X86_REG_MSR, (msr_id, value))
# read data from memory # read data from memory
def mem_read(self, address, size): def mem_read(self, address: int, size: int):
data = ctypes.create_string_buffer(size) data = ctypes.create_string_buffer(size)
status = _uc.uc_mem_read(self._uch, address, data, size) status = _uc.uc_mem_read(self._uch, address, data, size)
if status != uc.UC_ERR_OK: if status != uc.UC_ERR_OK:
@@ -572,7 +573,7 @@ class Uc(object):
return bytearray(data) return bytearray(data)
# write to memory # write to memory
def mem_write(self, address, data): def mem_write(self, address: int, data: Union[bytes, bytearray]):
status = _uc.uc_mem_write(self._uch, address, data, len(data)) status = _uc.uc_mem_write(self._uch, address, data, len(data))
if status != uc.UC_ERR_OK: if status != uc.UC_ERR_OK:
raise UcError(status) raise UcError(status)
@@ -585,7 +586,9 @@ class Uc(object):
(cb, data) = self._callbacks[user_data] (cb, data) = self._callbacks[user_data]
cb(self, offset, size, value, data) cb(self, offset, size, value, data)
def mmio_map(self, address, size, read_cb, user_data_read, write_cb, user_data_write): def mmio_map(self, address: int, size: int,
read_cb: Callable[["Uc", int, int, Any], int], user_data_read: Any,
write_cb: Callable[["Uc", int, int, int, Any], None], user_data_write: Any):
internal_read_cb = ctypes.cast(UC_MMIO_READ_CB(self._mmio_map_read_cb), UC_MMIO_READ_CB) internal_read_cb = ctypes.cast(UC_MMIO_READ_CB(self._mmio_map_read_cb), UC_MMIO_READ_CB)
internal_write_cb = ctypes.cast(UC_MMIO_WRITE_CB(self._mmio_map_write_cb), UC_MMIO_WRITE_CB) internal_write_cb = ctypes.cast(UC_MMIO_WRITE_CB(self._mmio_map_write_cb), UC_MMIO_WRITE_CB)
@@ -605,31 +608,31 @@ class Uc(object):
self._ctype_cbs.append(internal_write_cb) self._ctype_cbs.append(internal_write_cb)
# map a range of memory # map a range of memory
def mem_map(self, address, size, perms=uc.UC_PROT_ALL): def mem_map(self, address: int, size: int, perms: int=uc.UC_PROT_ALL):
status = _uc.uc_mem_map(self._uch, address, size, perms) status = _uc.uc_mem_map(self._uch, address, size, perms)
if status != uc.UC_ERR_OK: if status != uc.UC_ERR_OK:
raise UcError(status) raise UcError(status)
# map a range of memory from a raw host memory address # map a range of memory from a raw host memory address
def mem_map_ptr(self, address, size, perms, ptr): def mem_map_ptr(self, address: int, size: int, perms: int, ptr: int):
status = _uc.uc_mem_map_ptr(self._uch, address, size, perms, ptr) status = _uc.uc_mem_map_ptr(self._uch, address, size, perms, ptr)
if status != uc.UC_ERR_OK: if status != uc.UC_ERR_OK:
raise UcError(status) raise UcError(status)
# unmap a range of memory # unmap a range of memory
def mem_unmap(self, address, size): def mem_unmap(self, address: int, size: int):
status = _uc.uc_mem_unmap(self._uch, address, size) status = _uc.uc_mem_unmap(self._uch, address, size)
if status != uc.UC_ERR_OK: if status != uc.UC_ERR_OK:
raise UcError(status) raise UcError(status)
# protect a range of memory # protect a range of memory
def mem_protect(self, address, size, perms=uc.UC_PROT_ALL): def mem_protect(self, address: int, size: int, perms: int=uc.UC_PROT_ALL):
status = _uc.uc_mem_protect(self._uch, address, size, perms) status = _uc.uc_mem_protect(self._uch, address, size, perms)
if status != uc.UC_ERR_OK: if status != uc.UC_ERR_OK:
raise UcError(status) raise UcError(status)
# return CPU mode at runtime # return CPU mode at runtime
def query(self, query_mode): def query(self, query_mode: int):
result = ctypes.c_size_t(0) result = ctypes.c_size_t(0)
status = _uc.uc_query(self._uch, query_mode, ctypes.byref(result)) status = _uc.uc_query(self._uch, query_mode, ctypes.byref(result))
if status != uc.UC_ERR_OK: if status != uc.UC_ERR_OK:
@@ -704,7 +707,7 @@ class Uc(object):
(cb, data) = self._callbacks[user_data] (cb, data) = self._callbacks[user_data]
cb(self, data) cb(self, data)
def ctl(self, control, *args): def ctl(self, control: int, *args):
status = _uc.uc_ctl(self._uch, control, *args) status = _uc.uc_ctl(self._uch, control, *args)
if status != uc.UC_ERR_OK: if status != uc.UC_ERR_OK:
raise UcError(status) raise UcError(status)
@@ -748,7 +751,7 @@ class Uc(object):
def ctl_get_page_size(self): def ctl_get_page_size(self):
return self.__ctl_r_1_arg(uc.UC_CTL_UC_PAGE_SIZE, ctypes.c_uint32) return self.__ctl_r_1_arg(uc.UC_CTL_UC_PAGE_SIZE, ctypes.c_uint32)
def ctl_set_page_size(self, val): def ctl_set_page_size(self, val: int):
self.__ctl_w_1_arg(uc.UC_CTL_UC_PAGE_SIZE, val, ctypes.c_uint32) self.__ctl_w_1_arg(uc.UC_CTL_UC_PAGE_SIZE, val, ctypes.c_uint32)
def ctl_get_arch(self): def ctl_get_arch(self):
@@ -757,7 +760,7 @@ class Uc(object):
def ctl_get_timeout(self): def ctl_get_timeout(self):
return self.__ctl_r_1_arg(uc.UC_CTL_UC_TIMEOUT, ctypes.c_uint64) return self.__ctl_r_1_arg(uc.UC_CTL_UC_TIMEOUT, ctypes.c_uint64)
def ctl_exits_enabled(self, val): def ctl_exits_enabled(self, val: bool):
self.__ctl_w_1_arg(uc.UC_CTL_UC_USE_EXITS, val, ctypes.c_int) self.__ctl_w_1_arg(uc.UC_CTL_UC_USE_EXITS, val, ctypes.c_int)
def ctl_get_exits_cnt(self): def ctl_get_exits_cnt(self):
@@ -769,7 +772,7 @@ class Uc(object):
self.ctl(self.__ctl_r(uc.UC_CTL_UC_EXITS, 2), ctypes.cast(arr, ctypes.c_void_p), ctypes.c_size_t(l)) self.ctl(self.__ctl_r(uc.UC_CTL_UC_EXITS, 2), ctypes.cast(arr, ctypes.c_void_p), ctypes.c_size_t(l))
return [i for i in arr] return [i for i in arr]
def ctl_set_exits(self, exits): def ctl_set_exits(self, exits: List[int]):
arr = (ctypes.c_uint64 * len(exits))() arr = (ctypes.c_uint64 * len(exits))()
for idx, exit in enumerate(exits): for idx, exit in enumerate(exits):
arr[idx] = exit arr[idx] = exit
@@ -778,20 +781,20 @@ class Uc(object):
def ctl_get_cpu_model(self): def ctl_get_cpu_model(self):
return self.__ctl_r_1_arg(uc.UC_CTL_CPU_MODEL, ctypes.c_int) return self.__ctl_r_1_arg(uc.UC_CTL_CPU_MODEL, ctypes.c_int)
def ctl_set_cpu_model(self, val): def ctl_set_cpu_model(self, val: int):
self.__ctl_w_1_arg(uc.UC_CTL_CPU_MODEL, val, ctypes.c_int) self.__ctl_w_1_arg(uc.UC_CTL_CPU_MODEL, val, ctypes.c_int)
def ctl_remove_cache(self, addr, len): def ctl_remove_cache(self, addr: int, end: int):
self.__ctl_w_2_arg(uc.UC_CTL_TB_REMOVE_CACHE, addr, len, ctypes.c_uint64, ctypes.c_uint64) self.__ctl_w_2_arg(uc.UC_CTL_TB_REMOVE_CACHE, addr, end, ctypes.c_uint64, ctypes.c_uint64)
def ctl_request_cache(self, addr): def ctl_request_cache(self, addr: int):
return self.__ctl_rw_1_1_arg(uc.UC_CTL_TB_REQUEST_CACHE, addr, ctypes.c_uint64, uc_tb) return self.__ctl_rw_1_1_arg(uc.UC_CTL_TB_REQUEST_CACHE, addr, ctypes.c_uint64, uc_tb)
def ctl_flush_tb(self): def ctl_flush_tb(self):
self.ctl(self.__ctl_w(uc.UC_CTL_TB_FLUSH, 0)) self.ctl(self.__ctl_w(uc.UC_CTL_TB_FLUSH, 0))
# add a hook # add a hook
def hook_add(self, htype, callback, user_data=None, begin=1, end=0, arg1=0, arg2=0): def hook_add(self, htype: int, callback: Callable, user_data: Any=None, begin: int=1, end: int=0, arg1: int=0, arg2: int=0):
_h2 = uc_hook_h() _h2 = uc_hook_h()
# save callback & user_data # save callback & user_data
@@ -883,7 +886,7 @@ class Uc(object):
return _h2.value return _h2.value
# delete a hook # delete a hook
def hook_del(self, h): def hook_del(self, h: int):
_h = uc_hook_h(h) _h = uc_hook_h(h)
status = _uc.uc_hook_del(self._uch, _h) status = _uc.uc_hook_del(self._uch, _h)
if status != uc.UC_ERR_OK: if status != uc.UC_ERR_OK:
@@ -898,12 +901,12 @@ class Uc(object):
return context return context
def context_update(self, context): def context_update(self, context: "UcContext"):
status = _uc.uc_context_save(self._uch, context.context) status = _uc.uc_context_save(self._uch, context.context)
if status != uc.UC_ERR_OK: if status != uc.UC_ERR_OK:
raise UcError(status) raise UcError(status)
def context_restore(self, context): def context_restore(self, context: "UcContext"):
status = _uc.uc_context_restore(self._uch, context.context) status = _uc.uc_context_restore(self._uch, context.context)
if status != uc.UC_ERR_OK: if status != uc.UC_ERR_OK:
raise UcError(status) raise UcError(status)