From 2d765d6bf2314eeb3cebdc90838c60fde28b88c7 Mon Sep 17 00:00:00 2001 From: Marcel Otte Date: Tue, 25 May 2021 21:32:32 +0200 Subject: [PATCH] Stream asyncio process output instead of retrieving as a whole --- backive/backive_service | 2 +- backive/core/backup.py | 75 +++++++++++++++++++++++++++++------------ 2 files changed, 54 insertions(+), 23 deletions(-) diff --git a/backive/backive_service b/backive/backive_service index dcd997f..ae0d481 100755 --- a/backive/backive_service +++ b/backive/backive_service @@ -54,7 +54,7 @@ class Backive: if await self._scheduler.should_run(backup.name): logging.info("Running backup '%s'", backup.name) result = await backup.run() - logging.debug("Result: %s", result.decode()) + logging.debug("Result: %s", result) await self._scheduler.register_run(backup.name) else: logging.info( diff --git a/backive/core/backup.py b/backive/core/backup.py index dff375a..22e0b47 100644 --- a/backive/core/backup.py +++ b/backive/core/backup.py @@ -17,6 +17,28 @@ class Backup: def get_frequency(self): return self.config.get("frequency", None) + + async def _read_stream(self, stream, cb): + while True: + line = await stream.readline() + if line: + cb(line) + else: + break + + async def stream_subprocess(self, cmd, outcb, errcb): + proc = await asyncio.create_subprocess_shell( + cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + + await asyncio.wait([ + self._read_stream(proc.stdout, outcb), + self._read_stream(proc.stderr, errcb) + ]) + + return await proc.wait() async def run(self): logging.debug("Running backup %s", self.name) @@ -31,31 +53,40 @@ class Backup: self.config.get("target_device") )).config.get("mountname") ) - proc = await asyncio.create_subprocess_shell( - """mkdir -p {}""".format( - os.path.join( - backup_env["BACKIVE_MOUNT"], - backup_env["BACKIVE_TO"] - ) - ), - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, + backup_env["BACKIVE_TARGET_DIR"] = os.path.join( + backup_env["BACKIVE_MOUNT"], + backup_env["BACKIVE_TO"] + ) + proc = await self.stream_subprocess( + """mkdir -p {}""".format( backup_env["BACKIVE_TARGET_DIR"]), + lambda x: logging.debug("STDOUT: %s", x), + lambda x: logging.debug("STDERR: %s", x), ) - stdout, stderr = await proc.communicate() - logging.debug("stdout: %s", stdout.decode()) - logging.debug("stderr: %s", stderr.decode()) +# proc = await asyncio.create_subprocess_shell( +# """mkdir -p {}""".format( backup_env["BACKIVE_TARGET_DIR"] +# ), +# stdout=asyncio.subprocess.PIPE, +# stderr=asyncio.subprocess.PIPE, +# ) +# stdout, stderr = await proc.communicate() +# logging.debug("stdout: %s", stdout.decode()) +# logging.debug("stderr: %s", stderr.decode()) user = self.config.get("user") - proc = await asyncio.create_subprocess_shell( +# proc = await asyncio.create_subprocess_shell( + proc = await self.stream_subprocess( # "set -x; chown -R {} ${{BACKIVE_MOUNT}}/${{BACKIVE_TO}};".format(user) + # "sudo -E -u {} sh -c '".format(user) + self.config.get("script"), -# "'", - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - shell=True, - env=backup_env + lambda x: logging.debug("STDOUT: %s", x), + lambda x: logging.debug("STDERR: %s", x), ) - stdout, stderr = await proc.communicate() - logging.debug("stdout: %s", stdout.decode()) - logging.debug("stderr: %s", stderr.decode()) - return stdout +# "'", +# stdout=asyncio.subprocess.PIPE, +# stderr=asyncio.subprocess.PIPE, +# shell=True, +# env=backup_env +# ) +# stdout, stderr = await proc.communicate() +# logging.debug("stdout: %s", stdout.decode()) +# logging.debug("stderr: %s", stderr.decode()) + return "done"