|
import io |
|
import os |
|
import re |
|
import sys |
|
|
|
from ._core import Process |
|
|
|
|
|
|
|
|
|
BSD_STAT_PPID = 2 |
|
|
|
|
|
LINUX_STAT_PPID = 3 |
|
|
|
STAT_PATTERN = re.compile(r"\(.+\)|\S+") |
|
|
|
|
|
def detect_proc(): |
|
"""Detect /proc filesystem style. |
|
|
|
This checks the /proc/{pid} directory for possible formats. Returns one of |
|
the following as str: |
|
|
|
* `stat`: Linux-style, i.e. ``/proc/{pid}/stat``. |
|
* `status`: BSD-style, i.e. ``/proc/{pid}/status``. |
|
""" |
|
pid = os.getpid() |
|
for name in ("stat", "status"): |
|
if os.path.exists(os.path.join("/proc", str(pid), name)): |
|
return name |
|
raise ProcFormatError("unsupported proc format") |
|
|
|
|
|
def _use_bsd_stat_format(): |
|
try: |
|
return os.uname().sysname.lower() in ("freebsd", "netbsd", "dragonfly") |
|
except Exception: |
|
return False |
|
|
|
|
|
def _get_ppid(pid, name): |
|
path = os.path.join("/proc", str(pid), name) |
|
with io.open(path, encoding="ascii", errors="replace") as f: |
|
parts = STAT_PATTERN.findall(f.read()) |
|
|
|
if _use_bsd_stat_format(): |
|
return parts[BSD_STAT_PPID] |
|
return parts[LINUX_STAT_PPID] |
|
|
|
|
|
def _get_cmdline(pid): |
|
path = os.path.join("/proc", str(pid), "cmdline") |
|
encoding = sys.getfilesystemencoding() or "utf-8" |
|
with io.open(path, encoding=encoding, errors="replace") as f: |
|
|
|
|
|
|
|
|
|
return tuple(f.read().split("\0")[:-1]) |
|
|
|
|
|
class ProcFormatError(EnvironmentError): |
|
pass |
|
|
|
|
|
def iter_process_parents(pid, max_depth=10): |
|
"""Try to look up the process tree via the /proc interface.""" |
|
stat_name = detect_proc() |
|
|
|
|
|
|
|
|
|
def _iter_process_parents(pid, max_depth): |
|
for _ in range(max_depth): |
|
ppid = _get_ppid(pid, stat_name) |
|
args = _get_cmdline(pid) |
|
yield Process(args=args, pid=pid, ppid=ppid) |
|
if ppid == "0": |
|
break |
|
pid = ppid |
|
|
|
return _iter_process_parents(pid, max_depth) |
|
|