File size: 2,289 Bytes
17ad0be
f0532c5
 
 
 
17ad0be
 
 
 
f0532c5
 
 
 
 
 
 
 
17ad0be
 
 
f0532c5
17ad0be
 
 
 
 
 
f0532c5
17ad0be
 
 
 
 
 
 
f0532c5
17ad0be
 
 
 
 
 
 
 
f0532c5
 
17ad0be
f0532c5
17ad0be
 
 
 
 
 
f0532c5
 
17ad0be
f0532c5
 
 
 
17ad0be
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# scheduler.py
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.interval import IntervalTrigger
from config import SETTINGS, load_feeds
import logging
from fetch_news import NewsFetcher
from filter_news import NewsFilter
from telegram_bot import TelegramBot
import asyncio

logger = logging.getLogger(__name__)

class NewsScheduler:
    def __init__(self):
        self.scheduler = AsyncIOScheduler()
        self.processed_links = set()
        self.max_history = 1000
        self.filter = NewsFilter()
        self.bot = TelegramBot()
        self.running = False

    async def start(self):
        if not self.running:
            self.scheduler.start()
            self.running = True
            asyncio.create_task(self.bot.process_queue())
            logger.info("Scheduler gestartet mit %d Jobs", len(self.scheduler.get_jobs()))

    async def shutdown(self):
        if self.running:
            await self.scheduler.shutdown()
            await self.bot.session.close()
            self.running = False

    async def job_wrapper(self, feed: Dict):
        try:
            async with NewsFetcher() as fetcher:
                articles = await fetcher.process_rss(feed, self.processed_links) if feed["type"] == "rss" \
                    else await fetcher.get_news_api(feed, self.processed_links)
                
            filtered = self.filter.filter_articles(articles)
            for article in filtered:
                await self.bot.message_queue.put(article)
            
            self.cleanup_processed_links()
        except Exception as e:
            logger.error(f"Job für {feed['name']} fehlgeschlagen: {str(e)}")

    def cleanup_processed_links(self):
        if len(self.processed_links) > self.max_history:
            self.processed_links = set(list(self.processed_links)[-self.max_history:])

    def add_jobs(self):
        for feed in load_feeds():
            trigger = IntervalTrigger(
                minutes=feed.get("interval", SETTINGS["check_interval"])
            )
            self.scheduler.add_job(
                self.job_wrapper,
                trigger=trigger,
                kwargs={"feed": feed},
                max_instances=3,
                name=f"Feed: {feed['name']}"
            )