File size: 2,168 Bytes
618430a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
from asyncio import create_subprocess_exec, subprocess


class Terminal:
    """
    Class for running terminal commands asynchronously.

    Methods:

        run(commands: str)
            commands: Terminal Commands.
            Returns Process id (int)

        terminate(pid: int)
            pid: Process id returned in `run` method.
            Returns True if terminated else False (bool)

        output(pid: int)
            pid: Process id returned in `run` method.
            Returns Output of process (str)

        error(pid: int)
            pid: Process id returned in `run` method.
            Returns Error of process (str)
    """

    def __init__(self) -> None:
        self._processes = {}

    @staticmethod
    def _to_str(data: bytes) -> str:
        return data.decode("utf-8").strip()

    async def run(self, *args) -> int:
        process = await create_subprocess_exec(
            *args, stdout=subprocess.PIPE, stderr=subprocess.PIPE
        )
        pid = process.pid
        self._processes[pid] = process
        return pid

    def terminate(self, pid: int) -> bool:
        try:
            self._processes.pop(pid)
            self._processes[pid].kill()
            return True
        except KeyError:
            return False

    async def output(self, pid: int) -> str:
        output = []
        while True:
            out = self._to_str(await self._processes[pid].stdout.readline())
            if not out:
                break
            output.append(out)
        return "\n".join(output)

    async def error(self, pid: int) -> str:
        error = []
        while True:
            err = self._to_str(await self._processes[pid].stderr.readline())
            if not err:
                break
            error.append(err)
        return "\n".join(error)

    @property
    def _auto_remove_processes(self) -> None:
        while self._processes:
            for proc in self._processes.keys():
                if proc.returncode is not None:  # process is still running
                    try:
                        self._processes.pop(proc)
                    except KeyError:
                        pass