mirror of
https://github.com/zoriya/Autopipe.git
synced 2025-12-06 02:56:09 +00:00
Creating a rss input
This commit is contained in:
2
Pipfile
2
Pipfile
@@ -4,6 +4,6 @@ verify_ssl = true
|
||||
name = "pypi"
|
||||
|
||||
[packages]
|
||||
feedparser = "*"
|
||||
|
||||
[dev-packages]
|
||||
|
||||
|
||||
@@ -4,5 +4,4 @@ import logging
|
||||
class Autopipe:
|
||||
def __init__(self, coordinator, log_level=logging.WARNING):
|
||||
logging.basicConfig(format="%(levelname)s: %(message)s", level=log_level)
|
||||
print("Hello from autopipe")
|
||||
logging.info(f"Using coordinator: {coordinator}")
|
||||
|
||||
29
autopipe/input/rss.py
Normal file
29
autopipe/input/rss.py
Normal file
@@ -0,0 +1,29 @@
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from typing import Generator, Callable
|
||||
from models import Input, APData
|
||||
import feedparser
|
||||
|
||||
|
||||
class RssInput(Input):
|
||||
def __init__(self, url: str, mapper: Callable[[[]], APData], start_from_now: bool = True):
|
||||
super().__init__()
|
||||
self.url = url
|
||||
self.mapper = mapper
|
||||
self.last_etag = None
|
||||
self.last_modified = datetime.now() if start_from_now else None
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
return "Rss"
|
||||
|
||||
def generate(self) -> Generator[APData]:
|
||||
logging.debug(f"Pulling the rss feed at {self.url}, last etag: {self.last_etag}, modif: {self.last_modified}")
|
||||
feed = feedparser.parse(self.url, etag=self.last_etag, modified=self.last_modified)
|
||||
if feed.status != 304:
|
||||
for entry in feed.entries:
|
||||
yield self.mapper(entry)
|
||||
|
||||
@property
|
||||
def loop_cooldown(self) -> int:
|
||||
return 300
|
||||
45
autopipe/models.py
Normal file
45
autopipe/models.py
Normal file
@@ -0,0 +1,45 @@
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Generator
|
||||
import logging
|
||||
|
||||
|
||||
class APData:
|
||||
def __init__(self):
|
||||
self.value = None
|
||||
self.type = None
|
||||
|
||||
|
||||
class Pipe(ABC):
|
||||
def __init__(self):
|
||||
logging.info(f"Entering pipe: {self.name}")
|
||||
|
||||
@property
|
||||
@abstractmethod
|
||||
def name(self):
|
||||
raise NotImplementedError
|
||||
|
||||
def pipe(self, data: APData) -> APData:
|
||||
pass
|
||||
|
||||
|
||||
class Input(ABC):
|
||||
def __init__(self):
|
||||
logging.info(f"Starting input manager: {self.name}")
|
||||
|
||||
@property
|
||||
@abstractmethod
|
||||
def name(self):
|
||||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
def generate(self) -> Generator[APData]:
|
||||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
@property
|
||||
def loop_cooldown(self) -> int:
|
||||
"""
|
||||
If negative or 0, the input can't be chained and once the generator return the program will exit.
|
||||
If grater then 0, the generator will be called again after a sleep of x seconds where x is the return of this.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
Reference in New Issue
Block a user