'''Copyright 2024 Ashok Kumar Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.''' import os import requests import json import pandas as pd import requests import geopandas as gpd import tzlocal import pytz from PIL import Image from datetime import datetime import matplotlib.pyplot as plt from geopy.exc import GeocoderTimedOut from geopy.geocoders import Nominatim import warnings warnings.filterwarnings('ignore') import folium from folium import plugins import streamlit as st import streamlit_folium as st_folium from data import flight_data from huggingface_hub import InferenceApi, login, InferenceClient import branca.colormap as cm from functools import lru_cache import time # Cache the airport data to avoid reloading it every time @st.cache_data(ttl=3600) # Cache for 1 hour def load_airport_data(): data_url = "https://raw.githubusercontent.com/ashok2216-A/ashok_airport-data/main/data/airports.dat" column_names = ["Airport ID", "Name", "City", "Country", "IATA/FAA", "ICAO", "Latitude", "Longitude", "Altitude", "Timezone", "DST", "Tz database time zone", "Type", "Source"] return pd.read_csv(data_url, header=None, names=column_names) # Cache geocoding results @st.cache_data(ttl=3600) def get_location(country): geolocator = Nominatim(user_agent="flight_tracker") return geolocator.geocode(country) # Cache flight data fetching @st.cache_data(ttl=60) # Cache for 1 minute def fetch_flight_data(lat_min, lat_max, lon_min, lon_max): try: # OpenSky Network API endpoint url = "https://opensky-network.org/api/states/all" # Parameters for the request params = { 'lamin': lat_min, 'lamax': lat_max, 'lomin': lon_min, 'lomax': lon_max } # Make the request with a timeout response = requests.get(url, params=params, timeout=10) # Check if the request was successful response.raise_for_status() # Parse the JSON response data = response.json() # Check if we got valid data if not data or 'states' not in data: st.warning("No flight data available for the selected area.") return {'states': [], 'time': 0} return data except requests.exceptions.RequestException as e: st.error(f"Error fetching flight data: {str(e)}") return {'states': [], 'time': 0} except json.JSONDecodeError as e: st.error(f"Error parsing flight data: {str(e)}") return {'states': [], 'time': 0} except Exception as e: st.error(f"Unexpected error: {str(e)}") return {'states': [], 'time': 0} # Hugging Face model configuration HF_API_URL = "https://api-inference.huggingface.co/models/google/flan-t5-large" HF_TOKEN = os.getenv("HF_TOKEN") headers = {"Authorization": f"Bearer {HF_TOKEN}"} def query_llm(prompt): try: payload = { "inputs": prompt, "parameters": { "max_length": 250, "temperature": 0.1, "top_p": 0.95, "do_sample": False } } response = requests.post(HF_API_URL, headers=headers, json=payload) response.raise_for_status() return response.json()[0]['generated_text'] except requests.exceptions.HTTPError as e: if e.response.status_code == 403: st.warning("Language model access is currently restricted. Using direct flight data display instead.") else: st.error(f"Error querying language model: {str(e)}") return None except Exception as e: st.error(f"Error querying language model: {str(e)}") return None def create_flight_embeddings(geo_df): """Create embeddings for flight data to enable semantic search""" try: from sentence_transformers import SentenceTransformer model = SentenceTransformer('all-MiniLM-L6-v2') # Create text representations of flight data flight_texts = [] for _, row in geo_df.iterrows(): text = f"Flight {row['callsign']} from {row['origin_country']} " text += f"at altitude {row['baro_altitude']}m, speed {row['velocity']}m/s, " text += f"heading {row['true_track']}°" flight_texts.append(text) # Generate embeddings embeddings = model.encode(flight_texts) return embeddings, flight_texts except Exception as e: st.warning(f"Could not create embeddings: {str(e)}") return None, None def find_similar_flights(identifier, geo_df, embeddings, flight_texts, threshold=0.7): """Find similar flights using semantic search""" try: from sentence_transformers import SentenceTransformer model = SentenceTransformer('all-MiniLM-L6-v2') # Create query embedding query_embedding = model.encode([identifier]) # Calculate similarities from sklearn.metrics.pairwise import cosine_similarity similarities = cosine_similarity(query_embedding, embeddings)[0] # Find similar flights similar_indices = [i for i, sim in enumerate(similarities) if sim > threshold] if similar_indices: return geo_df.iloc[similar_indices] return None except Exception as e: st.warning(f"Error in semantic search: {str(e)}") return None def query_flight_data(geo_df, question): # Preprocess the question to extract key information question = question.lower().strip() # Common flight information queries and their corresponding columns query_mappings = { 'callsign': ['callsign'], 'altitude': ['baro_altitude', 'geo_altitude'], 'speed': ['velocity'], 'direction': ['true_track'], 'country': ['origin_country'], 'squawk': ['squawk'], 'icao': ['icao24'], 'vertical': ['vertical_rate'], 'ground': ['on_ground'], 'position': ['latitude', 'longitude'], 'time': ['time_position', 'last_contact'] } # Extract the identifier (usually callsign or icao) from the question identifier = None if 'for' in question: identifier = question.split('for')[-1].strip() elif 'of' in question: identifier = question.split('of')[-1].strip() elif 'about' in question: identifier = question.split('about')[-1].strip() if not identifier: return "Please specify a flight identifier (callsign or ICAO code) in your question." # Clean and normalize the identifier identifier = identifier.strip().upper() # Try to find the flight by callsign or icao (case-insensitive) flight_data = None # First try exact match if identifier in geo_df['callsign'].str.upper().values: flight_data = geo_df[geo_df['callsign'].str.upper() == identifier] elif identifier in geo_df['icao24'].str.upper().values: flight_data = geo_df[geo_df['icao24'].str.upper() == identifier] # If no exact match, try partial match if flight_data is None or flight_data.empty: # Try matching without spaces or special characters clean_identifier = ''.join(filter(str.isalnum, identifier)) if not geo_df['callsign'].empty: clean_callsigns = geo_df['callsign'].fillna('').apply(lambda x: ''.join(filter(str.isalnum, str(x).upper()))) matches = clean_callsigns == clean_identifier if matches.any(): flight_data = geo_df[matches] # If still no match, try fuzzy matching if flight_data is None or flight_data.empty: try: from difflib import get_close_matches all_callsigns = geo_df['callsign'].fillna('').str.upper().unique() close_matches = get_close_matches(identifier, all_callsigns, n=1, cutoff=0.8) if close_matches: flight_data = geo_df[geo_df['callsign'].str.upper() == close_matches[0]] except: pass # If still no match, try semantic search using RAG if flight_data is None or flight_data.empty: try: # Create embeddings for all flights embeddings, flight_texts = create_flight_embeddings(geo_df) if embeddings is not None: # Try to find similar flights similar_flights = find_similar_flights(identifier, geo_df, embeddings, flight_texts) if similar_flights is not None and not similar_flights.empty: flight_data = similar_flights st.info(f"Found similar flight(s) to {identifier}") except Exception as e: st.warning(f"Semantic search failed: {str(e)}") if flight_data is None or flight_data.empty: # If still no match, show available flights available_flights = geo_df['callsign'].dropna().unique() if len(available_flights) > 0: return f"Could not find flight {identifier}. Available flights: {', '.join(available_flights[:10])}..." return f"Could not find flight {identifier}. No flights currently available in the selected area." # Prepare flight data for display flight_info = {} for col in flight_data.columns: if col in flight_data.columns: value = flight_data[col].iloc[0] if pd.notna(value): if col == 'baro_altitude' or col == 'geo_altitude': flight_info[col] = f"{value} meters" elif col == 'velocity': flight_info[col] = f"{value} m/s" elif col == 'true_track': flight_info[col] = f"{value} degrees" elif col == 'vertical_rate': flight_info[col] = f"{value} m/s" elif col == 'latitude': flight_info[col] = f"{value}° N" elif col == 'longitude': flight_info[col] = f"{value}° E" else: flight_info[col] = str(value) if not flight_info: return f"No information available for flight {identifier}." # Try to get LLM response, but fall back to direct display if it fails try: # Create a prompt for the LLM prompt = f"""Answer this question about flight {identifier}: {question} Available flight data: {json.dumps(flight_info, indent=2)} Provide a clear and concise answer focusing on the specific information requested.""" llm_response = query_llm(prompt) if llm_response: return llm_response except: pass # Fallback to direct data display response = f"Flight Information for {identifier}:\n" for key, value in flight_info.items(): response += f"- {key.replace('_', ' ').title()}: {value}\n" return response @st.cache_data(ttl=60) # Cache for 1 minute def get_traffic_gdf(lat_min, lat_max, lon_min, lon_max, local_time_zone, _loc, flight_info): # Get cached flight data json_dict = fetch_flight_data(lat_min, lat_max, lon_min, lon_max) if not json_dict or not json_dict.get('states'): st.warning("No flight data available for the selected area.") return None try: # Define columns for the DataFrame columns = ["icao24","callsign","origin_country","time_position","last_contact","longitude","latitude", "baro_altitude","on_ground","velocity","true_track","vertical_rate","sensors","geo_altitude", "squawk","spi","position_source"] unix_timestamp = int(json_dict["time"]) local_timezone = pytz.timezone(local_time_zone) local_time = datetime.fromtimestamp(unix_timestamp, local_timezone).strftime('%Y-%m-%d %H:%M:%S') # Optimize DataFrame creation state_df = pd.DataFrame(json_dict["states"], columns=columns) state_df['time'] = local_time # Create GeoDataFrame more efficiently gdf = gpd.GeoDataFrame( state_df, geometry=gpd.points_from_xy(state_df.longitude, state_df.latitude), crs="EPSG:4326" ) # Display information st.title("Live Flight Tracker") st.subheader('Flight Details', divider='rainbow') st.write('Location: {0}'.format(_loc)) st.write('Current Local Time: {0}-{1}:'.format(local_time, local_time_zone)) st.write("Minimum_latitude is {0} and Maximum_latitude is {1}".format(lat_min, lat_max)) st.write("Minimum_longitude is {0} and Maximum_longitude is {1}".format(lon_min, lon_max)) st.write('Number of Visible Flights: {}'.format(len(json_dict['states']))) st.write('Plotting the flight: {}'.format(flight_info)) st.subheader('Map Visualization', divider='rainbow') st.write('****Click ":orange[Update Map]" Button to Refresh the Map****') return gdf except Exception as e: st.error(f"Error processing flight data: {str(e)}") return None def flight_tracking(flight_view_level, country, local_time_zone, flight_info, airport, color): # Get cached location data loc = get_location(country) if not loc: st.error("Could not find location. Please try a different country name.") return loc_box = loc[1] extend_left =+12*flight_view_level extend_right =+10*flight_view_level extend_top =+10*flight_view_level extend_bottom =+ 18*flight_view_level lat_min, lat_max = (loc_box[0] - extend_left), loc_box[0]+extend_right lon_min, lon_max = (loc_box[1] - extend_bottom), loc_box[1]+extend_top columns = ["icao24","callsign","origin_country","time_position","last_contact","longitude","latitude", "baro_altitude","on_ground","velocity","true_track","vertical_rate","sensors","geo_altitude", "squawk","spi","position_source",] # Get cached airport data airport_df = load_airport_data() airport_locations = airport_df[["Name", "City", "Country", "IATA/FAA", "Latitude", "Longitude"]] # Filter airports for the selected country and within the view bounds airport_country_loc = airport_locations[ (airport_locations['Country'].str.contains(str(country), case=False)) & (airport_locations['Latitude'] >= lat_min) & (airport_locations['Latitude'] <= lat_max) & (airport_locations['Longitude'] >= lon_min) & (airport_locations['Longitude'] <= lon_max) ] # Get traffic data geo_df = get_traffic_gdf(lat_min, lat_max, lon_min, lon_max, local_time_zone, country, flight_info) if geo_df is None: return # Create a base map m = folium.Map( location=[loc_box[0], loc_box[1]], zoom_start=6, tiles='CartoDB dark_matter', control_scale=True ) # Remove grid lines by adding a custom tile layer folium.TileLayer( tiles='https://{s}.basemaps.cartocdn.com/dark_nolabels/{z}/{x}/{y}{r}.png', attr='© OpenStreetMap contributors © CARTO', name='No Labels', show=True ).add_to(m) # Create colormap if color == "rainbow": colormap = cm.LinearColormap( colors=['red', 'yellow', 'green', 'blue', 'purple'], vmin=geo_df[flight_info].min(), vmax=geo_df[flight_info].max() ) elif color == "ice": colormap = cm.LinearColormap( colors=['white', 'lightblue', 'blue'], vmin=geo_df[flight_info].min(), vmax=geo_df[flight_info].max() ) else: # hot colormap = cm.LinearColormap( colors=['yellow', 'orange', 'red'], vmin=geo_df[flight_info].min(), vmax=geo_df[flight_info].max() ) # Pre-compute icon HTML template icon_template = """
""" # Pre-compute tooltip template tooltip_template = """
Flight: {callsign}
{rows}
""" # Add flight markers for idx, row in geo_df.iterrows(): if pd.notna(row['latitude']) and pd.notna(row['longitude']): # Get color based on flight_info value value = row[flight_info] if pd.notna(row[flight_info]) else geo_df[flight_info].min() color_hex = colormap(value) # Create custom flight icon with rotation rotation_angle = row['true_track'] if pd.notna(row['true_track']) else 0 icon_html = icon_template.format(rotation_angle=rotation_angle, color_hex=color_hex) # Create tooltip rows tooltip_rows = [] for col in columns: val = row[col] if pd.notna(row[col]) else 'N/A' if col in ['baro_altitude', 'geo_altitude']: val = f"{val} m" elif col == 'velocity': val = f"{val} m/s" elif col == 'true_track': val = f"{val}°" tooltip_rows.append(f'
{col}:
{val}
') # Create tooltip tooltip_html = tooltip_template.format( callsign=row['callsign'] if pd.notna(row['callsign']) else 'Unknown', rows='\n'.join(tooltip_rows) ) # Create popup content popup_content = f"""
Flight: {row['callsign'] if pd.notna(row['callsign']) else 'Unknown'}
ICAO24:
{row['icao24'] if pd.notna(row['icao24']) else 'N/A'}
Origin Country:
{row['origin_country'] if pd.notna(row['origin_country']) else 'N/A'}
Time Position:
{row['time_position'] if pd.notna(row['time_position']) else 'N/A'}
Last Contact:
{row['last_contact'] if pd.notna(row['last_contact']) else 'N/A'}
Baro Altitude:
{row['baro_altitude'] if pd.notna(row['baro_altitude']) else 'N/A'} m
Geo Altitude:
{row['geo_altitude'] if pd.notna(row['geo_altitude']) else 'N/A'} m
Velocity:
{row['velocity'] if pd.notna(row['velocity']) else 'N/A'} m/s
True Track:
{row['true_track'] if pd.notna(row['true_track']) else 'N/A'}°
Vertical Rate:
{row['vertical_rate'] if pd.notna(row['vertical_rate']) else 'N/A'} m/s
Squawk:
{row['squawk'] if pd.notna(row['squawk']) else 'N/A'}
On Ground:
{row['on_ground'] if pd.notna(row['on_ground']) else 'N/A'}
SPI:
{row['spi'] if pd.notna(row['spi']) else 'N/A'}
Position Source:
{row['position_source'] if pd.notna(row['position_source']) else 'N/A'}
""" # Create custom icon icon = folium.DivIcon( html=icon_html, icon_size=(24, 24), icon_anchor=(12, 12) ) # Add marker to map folium.Marker( location=[row['latitude'], row['longitude']], icon=icon, popup=folium.Popup(popup_content, max_width=300), tooltip=tooltip_html ).add_to(m) # Add airports if selected if airport == 1: for idx, row in airport_country_loc.iterrows(): if pd.notna(row['Latitude']) and pd.notna(row['Longitude']): folium.Marker( location=[row['Latitude'], row['Longitude']], icon=folium.Icon(icon='plane', prefix='fa', color='blue'), popup=folium.Popup( f"""

{row['Name']}

IATA: {row['IATA/FAA']}
City: {row['City']}
Country: {row['Country']}

""", max_width=300 ), tooltip=f"Airport: {row['Name']}" ).add_to(m) # Add colormap to the map colormap.add_to(m) # Add a layer control folium.LayerControl().add_to(m) # Display the map in Streamlit st_folium.folium_static(m, width=1200, height=600) return None st.set_page_config( layout="wide" ) image = Image.open('logo.png') add_selectbox = st.sidebar.image( image, width=150 ) add_selectbox = st.sidebar.subheader( "Configure Map",divider='rainbow' ) with st.sidebar: Refresh = st.button('Update Map', key=1) on = st.toggle('View Airports') if on: air_port = 1 st.write(':rainbow[Nice Work Buddy!]') st.write('Now Airports are Visible') else: air_port=0 view = st.slider('Increase Flight Visibility',1,6,2) st.write("You Selected:", view) cou = st.text_input('Type Country Name', 'north america') st.write('The current Country name is', cou) time = st.text_input('Type Time Zone Name (Ex: America/Toronto, Europe/Berlin)', 'Asia/Kolkata') st.write('The current Time Zone is', time) info = st.selectbox( 'Select Flight Information', ('baro_altitude', 'on_ground', 'velocity', 'geo_altitude')) st.write('Plotting the data of Flight:', info) clr = st.radio('Pick A Color for Scatter Plot',["rainbow","ice","hot"]) if clr == "rainbow": st.write('The current color is', "****:rainbow[Rainbow]****") elif clr == 'ice': st.write('The current color is', "****:blue[Ice]****") elif clr == 'hot': st.write('The current color is', "****:red[Hot]****") else: None # with st.spinner('Wait!, We Requesting API Data...'): # try: flight_tracking(flight_view_level=view, country=cou,flight_info=info, local_time_zone=time, airport=air_port, color=clr) st.subheader('Ask your Questions!', divider='rainbow') st.write("Google's TAPAS base LLM model 🤖") geo_df = flight_data(flight_view_level = view, country= cou, flight_info=info, local_time_zone=time, airport=1) question = st.text_input('Type your questions here', "What is the squawk code for SWR9XD?") result = query_flight_data(geo_df, question) st.markdown(result) # except TypeError: # st.error(':red[Error: ] Please Re-run this page.', icon="🚨") # st.button('Re-run', type="primary") # st.snow() # import streamlit as st # from huggingface_hub import InferenceClient # import os # hf_token = os.getenv("HF_TOKEN") # # Set up the Hugging Face Inference Client # client = InferenceClient( # provider="together", # Replace with the correct provider if needed # api_key= hf_token # Replace with your Hugging Face API key # ) # # Streamlit app title # st.title("🤖 Deepseek R1 Chatbot") # st.write("Chat with the Deepseek R1 model powered by Hugging Face Inference API.") # # Initialize session state to store chat history # if "messages" not in st.session_state: # st.session_state.messages = [] # # Display chat history # for message in st.session_state.messages: # with st.chat_message(message["role"]): # st.markdown(message["content"]) # # User input # if prompt := st.chat_input("What would you like to ask?"): # # Add user message to chat history # st.session_state.messages.append({"role": "user", "content": prompt}) # with st.chat_message("user"): # st.markdown(prompt) # # Generate response from Deepseek R1 model # with st.spinner("Thinking..."): # try: # # Prepare the messages for the model # messages = [{"role": m["role"], "content": m["content"]} for m in st.session_state.messages] # # Call the Hugging Face Inference API # completion = client.chat.completions.create( # model="deepseek-ai/DeepSeek-R1", # Replace with the correct model name # messages=messages, # max_tokens=500 # ) # # Extract the model's response # response = completion.choices[0].message.content # # Add model's response to chat history # st.session_state.messages.append({"role": "assistant", "content": response}) # with st.chat_message("assistant"): # st.markdown(response) # except Exception as e: # st.error(f"An error occurred: {e}")