Creating a good coordinator loader & a pipeline selector

This commit is contained in:
Zoe Roux
2020-11-12 23:15:13 +01:00
parent 19ff9ff7fc
commit cf3fc9ef48
8 changed files with 124 additions and 19 deletions
+4 -4
View File
@@ -1,12 +1,12 @@
import logging
from datetime import datetime
from typing import Generator, Callable
from models import Input, APData
from typing import Generator, Callable, List
from autopipe import Input, APData
import feedparser
class RssInput(Input):
def __init__(self, url: str, mapper: Callable[[[]], APData], start_from_now: bool = True):
def __init__(self, url: str, mapper: Callable[[List], APData], start_from_now: bool = True):
super().__init__()
self.url = url
self.mapper = mapper
@@ -17,7 +17,7 @@ class RssInput(Input):
def name(self):
return "Rss"
def generate(self) -> Generator[APData]:
def generate(self) -> Generator[APData, None, None]:
logging.debug(f"Pulling the rss feed at {self.url}, last etag: {self.last_etag}, modif: {self.last_modified}")
feed = feedparser.parse(self.url, etag=self.last_etag, modified=self.last_modified)
if feed.status != 304: