Dunevhhhh commited on
Commit
17ad0be
·
verified ·
1 Parent(s): e12cb2c

Update scheduler.py

Browse files
Files changed (1) hide show
  1. scheduler.py +40 -20
scheduler.py CHANGED
@@ -1,8 +1,12 @@
1
- # scheduler.py (erweitert)
2
  from apscheduler.schedulers.asyncio import AsyncIOScheduler
3
  from apscheduler.triggers.interval import IntervalTrigger
4
  from config import SETTINGS, load_feeds
5
  import logging
 
 
 
 
6
 
7
  logger = logging.getLogger(__name__)
8
 
@@ -11,34 +15,50 @@ class NewsScheduler:
11
  self.scheduler = AsyncIOScheduler()
12
  self.processed_links = set()
13
  self.max_history = 1000
 
 
 
14
 
15
- def cleanup_processed_links(self):
16
- # Verhindere, dass die Menge zu groß wird
17
- if len(self.processed_links) > self.max_history:
18
- self.processed_links = set(list(self.processed_links)[-self.max_history:])
 
 
19
 
20
- async def job_wrapper(self):
 
 
 
 
 
 
21
  try:
22
- await process_and_send_news()
 
 
 
 
 
 
 
23
  self.cleanup_processed_links()
24
  except Exception as e:
25
- logger.error(f"Scheduler-Job fehlgeschlagen: {str(e)}")
26
 
27
- def start(self):
28
- feeds = load_feeds()
29
- # Dynamische Job-Erstellung basierend auf Feed-Konfiguration
30
- for feed in feeds:
 
 
31
  trigger = IntervalTrigger(
32
  minutes=feed.get("interval", SETTINGS["check_interval"])
 
33
  self.scheduler.add_job(
34
  self.job_wrapper,
35
  trigger=trigger,
36
  kwargs={"feed": feed},
37
- max_instances=3
38
- )
39
- self.scheduler.start()
40
- logger.info("Scheduler gestartet mit %d Jobs", len(self.scheduler.get_jobs()))
41
-
42
- async def process_and_send_news(feed=None):
43
- # Implementierung hier
44
- pass
 
1
+ # scheduler.py
2
  from apscheduler.schedulers.asyncio import AsyncIOScheduler
3
  from apscheduler.triggers.interval import IntervalTrigger
4
  from config import SETTINGS, load_feeds
5
  import logging
6
+ from fetch_news import NewsFetcher
7
+ from filter_news import NewsFilter
8
+ from telegram_bot import TelegramBot
9
+ import asyncio
10
 
11
  logger = logging.getLogger(__name__)
12
 
 
15
  self.scheduler = AsyncIOScheduler()
16
  self.processed_links = set()
17
  self.max_history = 1000
18
+ self.filter = NewsFilter()
19
+ self.bot = TelegramBot()
20
+ self.running = False
21
 
22
+ async def start(self):
23
+ if not self.running:
24
+ self.scheduler.start()
25
+ self.running = True
26
+ asyncio.create_task(self.bot.process_queue())
27
+ logger.info("Scheduler gestartet mit %d Jobs", len(self.scheduler.get_jobs()))
28
 
29
+ async def shutdown(self):
30
+ if self.running:
31
+ await self.scheduler.shutdown()
32
+ await self.bot.session.close()
33
+ self.running = False
34
+
35
+ async def job_wrapper(self, feed: Dict):
36
  try:
37
+ async with NewsFetcher() as fetcher:
38
+ articles = await fetcher.process_rss(feed, self.processed_links) if feed["type"] == "rss" \
39
+ else await fetcher.get_news_api(feed, self.processed_links)
40
+
41
+ filtered = self.filter.filter_articles(articles)
42
+ for article in filtered:
43
+ await self.bot.message_queue.put(article)
44
+
45
  self.cleanup_processed_links()
46
  except Exception as e:
47
+ logger.error(f"Job für {feed['name']} fehlgeschlagen: {str(e)}")
48
 
49
+ def cleanup_processed_links(self):
50
+ if len(self.processed_links) > self.max_history:
51
+ self.processed_links = set(list(self.processed_links)[-self.max_history:])
52
+
53
+ def add_jobs(self):
54
+ for feed in load_feeds():
55
  trigger = IntervalTrigger(
56
  minutes=feed.get("interval", SETTINGS["check_interval"])
57
+ )
58
  self.scheduler.add_job(
59
  self.job_wrapper,
60
  trigger=trigger,
61
  kwargs={"feed": feed},
62
+ max_instances=3,
63
+ name=f"Feed: {feed['name']}"
64
+ )