File size: 4,516 Bytes
9c6594c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
import contextlib
import ctypes
import os
from ctypes.wintypes import (
BOOL,
CHAR,
DWORD,
HANDLE,
LONG,
LPWSTR,
MAX_PATH,
PDWORD,
ULONG,
)
from shellingham._core import SHELL_NAMES
INVALID_HANDLE_VALUE = HANDLE(-1).value
ERROR_NO_MORE_FILES = 18
ERROR_INSUFFICIENT_BUFFER = 122
TH32CS_SNAPPROCESS = 2
PROCESS_QUERY_LIMITED_INFORMATION = 0x1000
kernel32 = ctypes.windll.kernel32
def _check_handle(error_val=0):
def check(ret, func, args):
if ret == error_val:
raise ctypes.WinError()
return ret
return check
def _check_expected(expected):
def check(ret, func, args):
if ret:
return True
code = ctypes.GetLastError()
if code == expected:
return False
raise ctypes.WinError(code)
return check
class ProcessEntry32(ctypes.Structure):
_fields_ = (
("dwSize", DWORD),
("cntUsage", DWORD),
("th32ProcessID", DWORD),
("th32DefaultHeapID", ctypes.POINTER(ULONG)),
("th32ModuleID", DWORD),
("cntThreads", DWORD),
("th32ParentProcessID", DWORD),
("pcPriClassBase", LONG),
("dwFlags", DWORD),
("szExeFile", CHAR * MAX_PATH),
)
kernel32.CloseHandle.argtypes = [HANDLE]
kernel32.CloseHandle.restype = BOOL
kernel32.CreateToolhelp32Snapshot.argtypes = [DWORD, DWORD]
kernel32.CreateToolhelp32Snapshot.restype = HANDLE
kernel32.CreateToolhelp32Snapshot.errcheck = _check_handle( # type: ignore
INVALID_HANDLE_VALUE,
)
kernel32.Process32First.argtypes = [HANDLE, ctypes.POINTER(ProcessEntry32)]
kernel32.Process32First.restype = BOOL
kernel32.Process32First.errcheck = _check_expected( # type: ignore
ERROR_NO_MORE_FILES,
)
kernel32.Process32Next.argtypes = [HANDLE, ctypes.POINTER(ProcessEntry32)]
kernel32.Process32Next.restype = BOOL
kernel32.Process32Next.errcheck = _check_expected( # type: ignore
ERROR_NO_MORE_FILES,
)
kernel32.GetCurrentProcessId.argtypes = []
kernel32.GetCurrentProcessId.restype = DWORD
kernel32.OpenProcess.argtypes = [DWORD, BOOL, DWORD]
kernel32.OpenProcess.restype = HANDLE
kernel32.OpenProcess.errcheck = _check_handle( # type: ignore
INVALID_HANDLE_VALUE,
)
kernel32.QueryFullProcessImageNameW.argtypes = [HANDLE, DWORD, LPWSTR, PDWORD]
kernel32.QueryFullProcessImageNameW.restype = BOOL
kernel32.QueryFullProcessImageNameW.errcheck = _check_expected( # type: ignore
ERROR_INSUFFICIENT_BUFFER,
)
@contextlib.contextmanager
def _handle(f, *args, **kwargs):
handle = f(*args, **kwargs)
try:
yield handle
finally:
kernel32.CloseHandle(handle)
def _iter_processes():
f = kernel32.CreateToolhelp32Snapshot
with _handle(f, TH32CS_SNAPPROCESS, 0) as snap:
entry = ProcessEntry32()
entry.dwSize = ctypes.sizeof(entry)
ret = kernel32.Process32First(snap, entry)
while ret:
yield entry
ret = kernel32.Process32Next(snap, entry)
def _get_full_path(proch):
size = DWORD(MAX_PATH)
while True:
path_buff = ctypes.create_unicode_buffer("", size.value)
if kernel32.QueryFullProcessImageNameW(proch, 0, path_buff, size):
return path_buff.value
size.value *= 2
def get_shell(pid=None, max_depth=10):
proc_map = {
proc.th32ProcessID: (proc.th32ParentProcessID, proc.szExeFile)
for proc in _iter_processes()
}
pid = pid or os.getpid()
for _ in range(0, max_depth + 1):
try:
ppid, executable = proc_map[pid]
except KeyError: # No such process? Give up.
break
# The executable name would be encoded with the current code page if
# we're in ANSI mode (usually). Try to decode it into str/unicode,
# replacing invalid characters to be safe (not thoeratically necessary,
# I think). Note that we need to use 'mbcs' instead of encoding
# settings from sys because this is from the Windows API, not Python
# internals (which those settings reflect). (pypa/pipenv#3382)
if isinstance(executable, bytes):
executable = executable.decode("mbcs", "replace")
name = executable.rpartition(".")[0].lower()
if name not in SHELL_NAMES:
pid = ppid
continue
key = PROCESS_QUERY_LIMITED_INFORMATION
with _handle(kernel32.OpenProcess, key, 0, pid) as proch:
return (name, _get_full_path(proch))
return None
|