File size: 14,760 Bytes
c0d93a3
 
 
 
 
 
 
 
 
 
 
 
 
415d65e
c0d93a3
 
415d65e
 
 
1832474
415d65e
 
 
 
 
c0d93a3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
415d65e
c0d93a3
 
 
415d65e
 
 
d03a443
415d65e
 
 
 
 
0fe7a3f
415d65e
 
 
 
 
 
c0d93a3
415d65e
 
 
 
 
 
 
3870925
415d65e
 
 
 
3870925
415d65e
c0d93a3
415d65e
c0d93a3
 
415d65e
 
3dc8018
 
 
415d65e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6343fb1
415d65e
 
 
 
 
 
c0d93a3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4073db6
 
 
 
 
 
c0d93a3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4073db6
 
 
c0d93a3
 
3a3dc4d
 
57280b8
3a3dc4d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
57280b8
3a3dc4d
 
 
 
 
 
 
 
 
 
 
 
 
 
57280b8
3a3dc4d
 
 
c0d93a3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
415d65e
 
3dc8018
 
 
 
c0d93a3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1832474
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c0d93a3
1832474
 
 
 
 
 
 
c0d93a3
1832474
 
 
 
 
 
 
 
 
c0d93a3
1832474
 
 
 
 
c0d93a3
 
 
 
 
 
 
 
 
 
 
 
415d65e
c0d93a3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
415d65e
c0d93a3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
415d65e
c0d93a3
415d65e
c0d93a3
 
 
 
 
 
 
 
415d65e
c0d93a3
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
import gradio as gr
import requests
from bs4 import BeautifulSoup
import pytz
from datetime import datetime, timedelta
import logging
import traceback
from typing import List, Dict, Any
import hashlib
import icalendar
import uuid
import re
import json
import os

# Hugging Face imports
try:
    from transformers import AutoModelForCausalLM, AutoTokenizer
    import torch
    TRANSFORMERS_AVAILABLE = True # TODO change back to true to use local llm 
except ImportError:
    TRANSFORMERS_AVAILABLE = False

# Hugging Face Inference Client
from huggingface_hub import InferenceClient

class EventScraper:
    def __init__(self, urls, timezone='Europe/Berlin'):
        # Setup logging
        logging.basicConfig(level=logging.INFO)
        self.logger = logging.getLogger(__name__)
        
        # Timezone setup
        self.timezone = pytz.timezone(timezone)
        
        # URLs to scrape
        self.urls = urls if isinstance(urls, list) else [urls]
        
        # Event cache to prevent duplicates
        self.event_cache = set()
        
        # iCal calendar
        self.calendar = icalendar.Calendar()
        self.calendar.add('prodid', '-//Event Scraper//example.com//')
        self.calendar.add('version', '2.0')
        
        # Model and tokenizer will be loaded on first use
        self.model = None
        self.tokenizer = None
        self.client = None
    
    def setup_llm(self):
        """Setup Hugging Face LLM for event extraction"""
        # Try local model first
        if TRANSFORMERS_AVAILABLE:
            try:
                model_name = "meta-llama/Llama-3.2-1B-Instruct" # 3B is very slow on HF :(
                self.tokenizer = AutoTokenizer.from_pretrained(model_name)
                self.model = AutoModelForCausalLM.from_pretrained(
                    model_name, 
                    torch_dtype=torch.float16,
                    return_dict_in_generate=False,
                    device_map='auto'
                )
                return
            except Exception as local_err:
                gr.Warning(f"Local model setup failed: {str(local_err)}")
        
        # Fallback to Inference Client
        try:
            # Try to get Hugging Face token from environment
            hf_token = os.getenv('HF_TOKEN')
            
            # Setup Inference Client
            if hf_token:
                self.client = InferenceClient(
                    model="meta-llama/Llama-3.2-3B-Instruct", 
                    token=hf_token 
                )
            else:
                # Public model access without token
                self.client = InferenceClient(
                    model="meta-llama/Llama-3.2-3B-Instruct" 
                )
        except Exception as e:
            gr.Warning(f"Inference Client setup error: {str(e)}")
            raise
    
    def generate_with_model(self, prompt):
        """Generate text using either local model or inference client"""
        print("------ PROMPT ------------")
        print(prompt)
        print("------ PROMPT ------------")
        if self.model and self.tokenizer:
            # Use local model
            inputs = self.tokenizer(prompt, return_tensors="pt").to(self.model.device)
            outputs = self.model.generate(
                inputs.input_ids, 
                max_new_tokens=12000, 
                do_sample=True, 
                temperature=0.9
            )
            return self.tokenizer.decode(outputs[0], skip_special_tokens=True)
        
        elif self.client:
            # Use Inference Client
            return self.client.text_generation(
                prompt, 
                max_new_tokens=2000, 
                temperature=0.9
            )
        
        else:
            raise ValueError("No model or client available for text generation")
    
    def fetch_webpage_content(self, url):
        """Fetch webpage content"""
        try:
            headers = {
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
            }
            response = requests.get(url, headers=headers, timeout=10)
            response.raise_for_status()
            return response.text
        except Exception as e:
            gr.Warning(f"Error fetching {url}: {str(e)}")
            return ""
    
    def extract_text_from_html(self, html_content):
        """Extract readable text from HTML"""
        soup = BeautifulSoup(html_content, 'html.parser')
        
        for script in soup(["script", "style", "nav", "header", "footer"]):
            script.decompose()
        
        text = soup.get_text(separator=' ', strip=True)
        return ' '.join(text.split()[:2000])
    
    def generate_event_extraction_prompt(self, text):
        """Create prompt for LLM to extract event details"""

        prompt=f'''
        <|start_header_id|>system<|end_header_id|>
        
        <|eot_id|><|start_header_id|>user<|end_header_id|>
        You are an event extraction assistant. 
        Find and extract all events from the following text. 
        For each event, provide:
        - Exact event name
        - Date (DD.MM.YYYY)
        - Time (HH:MM if available)
        - Location
        - Short description

        Important: Extract ALL possible events. 
        Text to analyze:
        {text}

        Output ONLY a JSON list of events like this - Response Format:
        [
          {{
            "name": "Event Name",
            "date": "07.12.2024",
            "time": "19:00",
            "location": "Event Location",
            "description": "Event details"
          }}
        ]

        If NO events are found, return an empty list [].
        Only return the json. nothing else. no comments.<|eot_id|><|start_header_id|>assistant<|end_header_id|>
        '''
 
        return prompt
    
    def parse_llm_response(self, response):
        """Parse LLM's text response into structured events"""
        try:
            # Clean the response and handle nested lists
            response = response.strip()
            
            # Try parsing as JSON, handling potential nested structures
            def flatten_events(data):
                if isinstance(data, list):
                    flattened = []
                    for item in data:
                        if isinstance(item, list):
                            flattened.extend(flatten_events(item))
                        elif isinstance(item, dict):
                            flattened.append(item)
                    return flattened
                return []
    
            try:
                # First, attempt direct JSON parsing
                events = json.loads(response)
                events = flatten_events(events)
            except json.JSONDecodeError:
                # If direct parsing fails, try extracting JSON
                import re
                json_match = re.search(r'\[.*\]', response, re.DOTALL | re.MULTILINE)
                if json_match:
                    try:
                        events = json.loads(json_match.group(0))
                        events = flatten_events(events)
                    except json.JSONDecodeError:
                        events = []
                else:
                    events = []
            
            # Clean and validate events
            cleaned_events = []
            for event in events:
                # Ensure each event has at least a name
                if event.get('name'):
                    # Set default values if missing
                    event.setdefault('date', '')
                    event.setdefault('time', '')
                    event.setdefault('location', '')
                    event.setdefault('description', '')
                    cleaned_events.append(event)
            
            return cleaned_events
        
        except Exception as e:
            gr.Warning(f"Parsing error: {str(e)}")
            return []
    
    def scrape_events(self):
        """Main method to scrape events from all URLs"""
        # Ensure LLM is set up
        self.setup_llm()
        
        all_events = []
        
        for url in self.urls:
            try:
                # Fetch webpage
                html_content = self.fetch_webpage_content(url)
                
                # Extract readable text
                text_content = self.extract_text_from_html(html_content)
                
                # Generate prompt
                prompt = self.generate_event_extraction_prompt(text_content)
                
                # Generate response
                response = self.generate_with_model(prompt)

                print("------ response ------------")
                print(response)
                print("------ response ------------")
                
                # Parse events
                parsed_events = self.parse_llm_response(response)
                
                # Deduplicate and add
                for event in parsed_events:
                    event_hash = hashlib.md5(str(event).encode()).hexdigest()
                    if event_hash not in self.event_cache:
                        self.event_cache.add(event_hash)
                        all_events.append(event)
                        
                        # Create and add iCal event
                        try:
                            ical_event = self.create_ical_event(event)
                            self.calendar.add_component(ical_event)
                        except Exception as ical_error:
                            gr.Warning(f"iCal creation error: {str(ical_error)}")
                
            except Exception as e:
                gr.Warning(f"Error processing {url}: {str(e)}")
        
        return all_events
    
def create_ical_event(self, event):
    """Convert event to iCal format"""
    ical_event = icalendar.Event()
    
    # Set unique identifier
    ical_event.add('uid', str(uuid.uuid4()))
    
    # Add summary (name)
    ical_event.add('summary', event.get('name', 'Unnamed Event'))
    
    # Add description
    ical_event.add('description', event.get('description', ''))
    
    # Add location
    if event.get('location'):
        ical_event.add('location', event['location'])
    
    # Handle date and time
    try:
        # Parse date
        if event.get('date'):
            try:
                event_date = datetime.strptime(event['date'], '%d.%m.%Y').date()
                
                # Parse time if available
                event_time = datetime.strptime(event.get('time', '00:00'), '%H:%M').time() if event.get('time') else datetime.min.time()
                
                # Combine date and time
                event_datetime = datetime.combine(event_date, event_time)
                
                # Localize the datetime to the specified timezone
                localized_datetime = self.timezone.localize(event_datetime)
                
                # For all-day events, set to start at midnight and end just before midnight the next day
                if event_time == datetime.min.time():
                    start_datetime = localized_datetime.replace(hour=0, minute=0, second=0)
                    end_datetime = (start_datetime + timedelta(days=1)).replace(hour=23, minute=59, second=59)
                    
                    # Add properties for all-day event
                    ical_event.add('dtstart', start_datetime.date())
                    ical_event.add('dtend', end_datetime.date())
                    ical_event.add('x-microsoft-cdo-alldayevent', 'TRUE')
                else:
                    # For events with specific time, set 1-hour duration if not specified
                    end_datetime = localized_datetime + timedelta(hours=1)
                    
                    # Use TZID format
                    ical_event['dtstart'] = icalendar.prop.vDDDTypes(localized_datetime)
                    ical_event['dtstart'].params['TZID'] = 'Europe/Berlin'
                    
                    ical_event['dtend'] = icalendar.prop.vDDDTypes(end_datetime)
                    ical_event['dtend'].params['TZID'] = 'Europe/Berlin'
                
            except ValueError as date_err:
                gr.Warning(f"Date parsing error: {date_err}")
        
    except Exception as e:
        gr.Warning(f"iCal event creation error: {str(e)}")
    
    return ical_event
    
    
    def get_ical_string(self):
        """Return iCal as a string"""
        return self.calendar.to_ical().decode('utf-8')

def scrape_events_with_urls(urls):
    """Wrapper function for Gradio interface"""
    # Split URLs by newline or comma
    url_list = [url.strip() for url in re.split(r'[\n,]+', urls) if url.strip()]
    
    if not url_list:
        gr.Warning("Please provide at least one valid URL.")
        return "", ""
    
    try:
        # Initialize scraper
        scraper = EventScraper(url_list)
        
        # Scrape events
        events = scraper.scrape_events()
        
        # Prepare events output
        events_str = json.dumps(events, indent=2)
        
        # Get iCal string
        ical_string = scraper.get_ical_string()
        
        return events_str, ical_string
    
    except Exception as e:
        gr.Warning(f"Error in event scraping: {str(e)}")
        return "", ""

# Create Gradio Interface
def create_gradio_app():
    with gr.Blocks() as demo:
        gr.Markdown("# Event Scraper ๐Ÿ—“๏ธ")
        gr.Markdown("Scrape events from web pages using an AI-powered event extraction tool.")
        
        with gr.Row():
            with gr.Column():
                url_input = gr.Textbox(
                    label="Enter URLs (comma or newline separated)", 
                    placeholder="https://example.com/events\nhttps://another-site.com/calendar"
                )
                scrape_btn = gr.Button("Scrape Events", variant="primary")
        
        with gr.Row():
            with gr.Column():
                events_output = gr.Textbox(label="Extracted Events (JSON)", lines=10)
            with gr.Column():
                ical_output = gr.Textbox(label="iCal Export", lines=10)
        
        scrape_btn.click(
            fn=scrape_events_with_urls, 
            inputs=url_input, 
            outputs=[events_output, ical_output]
        )
        
        gr.Markdown("**Note:** Requires an internet connection and may take a few minutes to process.")
        gr.Markdown("Set HF_TOKEN environment variable for authenticated access.")
    
    return demo

# Launch the app
if __name__ == "__main__":
    demo = create_gradio_app()
    demo.launch()