Adding a deamon argument

This commit is contained in:
Zoe Roux
2020-11-14 01:16:01 +01:00
parent e721ac37dc
commit 8e981861e2
7 changed files with 50 additions and 46 deletions
+6 -2
View File
@@ -20,16 +20,20 @@ def main(argv=None):
from argparse import ArgumentParser
parser = ArgumentParser(description="Easily run advanced pipelines in a daemon or in one run sessions.")
parser.add_argument("coordinator", help="The name of your pipeline coordinator.", nargs="+", dest="coord")
parser.add_argument("coordinator", help="The name of your pipeline coordinator.", nargs="+")
parser.add_argument("-V", "--version", action="version", version=f"%(prog)s {version}")
parser.add_argument("-v", "--verbose", choices=["debug", "info", "warn", "error"], nargs="?", const="info",
default="warn", dest="log_level", metavar="loglevel",
help="Set the logging level.", type=str.lower)
parser.add_argument("-d", "--daemon", help="Enable the daemon mode (rerun input generators after a sleep cooldown",
action="store_true")
args = parser.parse_args(argv if argv is not None else sys.argv[1:])
try:
global autopipe
autopipe = Autopipe(args.coord[0], args.coord[1:], log_level=getattr(logging, args.log_level.upper()))
autopipe = Autopipe(args.coordinator[0], args.coordinator[1:],
log_level=getattr(logging, args.log_level.upper()),
daemon=args.daemon)
return 0
except ArgumentError as e:
print(str(e), file=stderr)
+4 -2
View File
@@ -7,7 +7,9 @@ from autopipe import APData, Coordinator, ArgumentError, Output, Pipe
class Autopipe:
def __init__(self, coordinator, coordinator_args, log_level=logging.WARNING):
def __init__(self, coordinator, coordinator_args,
log_level=logging.WARNING,
daemon=False):
logging.basicConfig(format="%(levelname)s: %(message)s", level=log_level)
self.interceptors = []
@@ -21,7 +23,7 @@ class Autopipe:
while True:
self.process_coordinator()
sleep_time = self.coordinator.get_input().loop_cooldown
if sleep_time <= 0:
if sleep_time <= 0 or not daemon:
logging.info("Input generator finished. Closing now.")
break
logging.info(f"Input generator finished. Starting again in {sleep_time} seconds.")
+1 -1
View File
@@ -11,7 +11,7 @@ class DownloadExample(Coordinator):
@classmethod
def name(cls):
return "NotifyExample"
return "DownloadExample"
@property
def pipeline(self) -> List[Union[Pipe, Callable[..., APData]]]:
+4 -7
View File
@@ -13,17 +13,14 @@ class APData(ABC):
class Pipe(ABC):
def __init__(self):
logging.info(f"Entering pipe: {self.name}")
@property
@abstractmethod
def name(self):
raise NotImplementedError
@abstractmethod
def pipe(self, data: APData) -> APData:
raise NotImplementedError
logging.info(f"Entering pipe: {self.name}")
return data
def __call__(self, data: APData) -> APData:
return self.pipe(data)
@@ -57,7 +54,6 @@ class Input(ABC):
class Output(Pipe):
def __init__(self, pipe: Union[Pipe, Callable[[APData], APData], APData] = None):
super().__init__()
if callable(pipe):
self.pipe = pipe
self.output = None
@@ -71,9 +67,10 @@ class Output(Pipe):
if self.output:
return "Static output"
raise NotImplementedError
return self.pipe.name
return self.pipe.name if isinstance(self.pipe, Pipe) else self.pipe.__name__
def pipe(self, data: APData) -> APData:
super().pipe(data)
if self.pipe is None:
if self.output:
return self.output
+33 -32
View File
@@ -20,39 +20,40 @@ class DownloaderPipe(Pipe):
return "Downloader"
def pipe(self, data: FileData) -> FileData:
super().pipe(data)
if data.is_local:
return data
if not force_refresh and os.path.isfile(path):
if not read:
return
with open(path, "r") as f:
return StringIO(f.read())
if message:
print(message)
r = requests.get(url, stream=progress)
try:
Path(os.path.dirname(path)).mkdir(parents=True, exist_ok=True)
with open(path, "wb") as f:
length = r.headers.get("content-length")
if progress and length:
local = 0
length = int(length)
for chunk in r.iter_content(chunk_size=4096):
f.write(chunk)
local += len(chunk)
per = 50 * local // length
print(f"\r [{'#' * per}{'-' * (50 - per)}] ({sizeof_fmt(local)}/{sizeof_fmt(length)}) \r",
end='', flush=True)
else:
f.write(r.content)
if read:
return StringIO(r.content.decode(encoding))
except KeyboardInterrupt:
os.remove(path)
if progress:
print()
print("Download cancelled")
raise
# if not force_refresh and os.path.isfile(path):
# if not read:
# return
# with open(path, "r") as f:
# return StringIO(f.read())
#
# if message:
# print(message)
# r = requests.get(url, stream=progress)
# try:
# Path(os.path.dirname(path)).mkdir(parents=True, exist_ok=True)
# with open(path, "wb") as f:
# length = r.headers.get("content-length")
# if progress and length:
# local = 0
# length = int(length)
# for chunk in r.iter_content(chunk_size=4096):
# f.write(chunk)
# local += len(chunk)
# per = 50 * local // length
# print(f"\r [{'#' * per}{'-' * (50 - per)}] ({sizeof_fmt(local)}/{sizeof_fmt(length)}) \r",
# end='', flush=True)
# else:
# f.write(r.content)
# if read:
# return StringIO(r.content.decode(encoding))
# except KeyboardInterrupt:
# os.remove(path)
# if progress:
# print()
# print("Download cancelled")
# raise
data.is_local = True
return data
+1 -1
View File
@@ -4,7 +4,6 @@ from autopipe.models import Pipe, APData
class FilterPipe(Pipe):
def __init__(self, filter: Callable[[APData], bool]):
super().__init__()
self.filter = filter
@property
@@ -12,4 +11,5 @@ class FilterPipe(Pipe):
return "Filter"
def pipe(self, data: APData) -> APData:
super().pipe(data)
return data if self.filter(data) else None
+1 -1
View File
@@ -3,7 +3,6 @@ from autopipe.models import Pipe, APData
class TeePipe(Pipe):
def __init__(self, *outputs):
super().__init__()
self.outputs = outputs
@property
@@ -11,6 +10,7 @@ class TeePipe(Pipe):
return "Tee"
def pipe(self, data: APData) -> APData:
super().pipe(data)
for output in self.outputs:
output.pipe(data)
return data