|
|
|
|
|
|
|
import gradio as gr |
|
import yfinance as yf |
|
import requests |
|
from bs4 import BeautifulSoup |
|
import pandas as pd |
|
import numpy as np |
|
from transformers import AutoModelForSequenceClassification, AutoTokenizer |
|
import xgboost as xgb |
|
from datetime import datetime, timedelta |
|
import json |
|
import warnings |
|
import os |
|
from dotenv import load_dotenv |
|
from decouple import config |
|
import logging |
|
|
|
|
|
warnings.filterwarnings('ignore') |
|
load_dotenv() |
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
|
|
|
|
|
WHATSAPP_API_BASE_URL = os.getenv('WHATSAPP_API_BASE_URL') |
|
WHATSAPP_API_KEY = os.getenv('WHATSAPP_API_KEY') |
|
WHATSAPP_INSTANCE_NAME = os.getenv('WHATSAPP_INSTANCE_NAME') |
|
|
|
class ConfigManager: |
|
""" |
|
Centralized configuration management |
|
""" |
|
@staticmethod |
|
def get_api_config(): |
|
""" |
|
Retrieve API configurations securely |
|
""" |
|
return { |
|
'amfi_base_url': config('AMFI_API_URL', default='https://api.mfapi.in/mf'), |
|
} |
|
|
|
class WhatsAppManager: |
|
def __init__(self, base_url, api_key): |
|
self.base_url = base_url |
|
self.api_key = api_key |
|
|
|
def send_message(self, instance, phone, message): |
|
if not self.base_url or not self.api_key or not instance: |
|
logging.error("WhatsApp API base URL, key or instance not configured.") |
|
return "WhatsApp API base URL, key or instance not configured." |
|
|
|
headers = { |
|
'Content-Type': 'application/json', |
|
'Authorization': self.api_key |
|
} |
|
payload = json.dumps({ |
|
"phone": phone, |
|
"message": message |
|
}) |
|
try: |
|
response = requests.post(f"{self.base_url}/message/sendText/{instance}", headers=headers, data=payload) |
|
response.raise_for_status() |
|
return response.json() |
|
except requests.exceptions.RequestException as e: |
|
logging.error(f"Error sending WhatsApp message: {str(e)}") |
|
return f"Error sending message: {str(e)}" |
|
|
|
|
|
class AMFIApi: |
|
""" |
|
Mutual Fund API Handler with real-time data fetching |
|
""" |
|
@staticmethod |
|
def get_all_mutual_funds(): |
|
""" |
|
Retrieve comprehensive mutual funds list from AMFI API |
|
""" |
|
config = ConfigManager.get_api_config() |
|
try: |
|
response = requests.get(config['amfi_base_url']) |
|
if response.status_code == 200: |
|
return response.json() |
|
else: |
|
logging.error("Failed to fetch mutual fund data.") |
|
return "Error fetching mutual funds." |
|
except Exception as e: |
|
logging.error(f"API request error: {str(e)}") |
|
return f"Error fetching mutual funds: {str(e)}" |
|
|
|
@staticmethod |
|
def analyze_mutual_fund(scheme_code): |
|
""" |
|
Fetch real-time mutual fund NAV and analyze returns. |
|
""" |
|
try: |
|
config = ConfigManager.get_api_config() |
|
amfi_url = f"{config['amfi_base_url']}/{scheme_code}" |
|
response = requests.get(amfi_url) |
|
|
|
if response.status_code != 200: |
|
logging.error("Failed to fetch NAV data.") |
|
return None, None, "Failed to fetch live NAV data." |
|
|
|
fund_data = response.json() |
|
if 'data' not in fund_data: |
|
logging.error("Invalid fund data received.") |
|
return None, None, "Invalid fund data received." |
|
|
|
nav_data = pd.DataFrame(fund_data['data']) |
|
|
|
|
|
nav_data['date'] = pd.to_datetime(nav_data['date'], format='%d-%m-%Y') |
|
nav_data['nav'] = pd.to_numeric(nav_data['nav'], errors='coerce') |
|
|
|
|
|
nav_data = nav_data.sort_values('date') |
|
|
|
|
|
latest_nav = nav_data.iloc[-1]['nav'] |
|
first_nav = nav_data.iloc[0]['nav'] |
|
|
|
returns = { |
|
'scheme_name': fund_data.get('meta', {}).get('scheme_name', 'Unknown'), |
|
'current_nav': latest_nav, |
|
'initial_nav': first_nav, |
|
'total_return': ((latest_nav - first_nav) / first_nav) * 100 |
|
} |
|
|
|
return returns, nav_data[['date', 'nav']].rename(columns={'nav': 'NAV'}), None |
|
|
|
except Exception as e: |
|
logging.error(f"Analysis error: {str(e)}") |
|
return None, None, f"Analysis error: {str(e)}" |
|
|
|
|
|
def get_stock_data(symbol, period='3y'): |
|
try: |
|
logging.info(f"Fetching stock data for symbol: {symbol}") |
|
|
|
stock = yf.Ticker(symbol) |
|
logging.info(f"Ticker object created successfully for symbol: {symbol}") |
|
hist = stock.history(period=period) |
|
if hist.empty: |
|
logging.error(f"No stock data available for symbol: {symbol} after fetching history.") |
|
return f"No stock data available for symbol: {symbol}" |
|
logging.info(f"Successfully fetched stock data for symbol: {symbol}") |
|
return hist |
|
except Exception as e: |
|
logging.error(f"Error fetching stock data for symbol: {symbol}, error: {str(e)}") |
|
return f"Error fetching stock data: {str(e)}" |
|
|
|
def calculate_rsi(data, periods=14): |
|
delta = data.diff() |
|
gain = (delta.where(delta > 0, 0)).rolling(window=periods).mean() |
|
loss = (-delta.where(delta < 0, 0)).rolling(window=periods).mean() |
|
rs = gain / loss |
|
return 100 - (100 / (1 + rs)) |
|
|
|
def predict_stock(symbol): |
|
df = get_stock_data(symbol) |
|
if isinstance(df, str): |
|
return df |
|
|
|
logging.info(f"Dataframe before feature creation for {symbol}: \n{df.head()}") |
|
|
|
df['SMA_20'] = df['Close'].rolling(window=20).mean() |
|
df['SMA_50'] = df['Close'].rolling(window=50).mean() |
|
df['RSI'] = calculate_rsi(df['Close']) |
|
|
|
features = ['SMA_20', 'SMA_50', 'RSI', 'Volume'] |
|
X = df[features].dropna() |
|
y = df['Close'].shift(-1).dropna() |
|
|
|
logging.info(f"Dataframe after feature creation: \nX:\n{X.head()}\ny:\n{y.head()}") |
|
|
|
|
|
X = X.iloc[:len(y)] |
|
|
|
split = int(len(X) * 0.8) |
|
X_train, X_test = X[:split], X[split:] |
|
y_train, y_test = y[:split], y[split:] |
|
|
|
|
|
logging.info(f"Data split details: \nTrain Data size: {len(X_train)}\nTest Data Size: {len(X_test)}") |
|
|
|
if len(X_train) == 0 or len(y_train) == 0: |
|
logging.error(f"Insufficient training data for prediction for symbol: {symbol}") |
|
return "Insufficient data for prediction." |
|
|
|
model = xgb.XGBRegressor(objective='reg:squarederror', n_estimators=100) |
|
model.fit(X_train, y_train) |
|
|
|
if not X_test.empty: |
|
last_data = X_test.iloc[-1:] |
|
prediction = model.predict(last_data)[0] |
|
return prediction |
|
else: |
|
logging.warning(f"No test data available for prediction for symbol: {symbol}") |
|
return "No test data available for prediction." |
|
|
|
def analyze_sentiment(text): |
|
tokenizer = AutoTokenizer.from_pretrained("ProsusAI/finbert") |
|
model = AutoModelForSequenceClassification.from_pretrained("ProsusAI/finbert") |
|
|
|
inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True) |
|
outputs = model(**inputs) |
|
predictions = outputs.logits.softmax(dim=-1) |
|
|
|
labels = ['negative', 'neutral', 'positive'] |
|
return {label: float(pred) for label, pred in zip(labels, predictions[0])} |
|
|
|
def setup_notifications(phone, stock, mf, sentiment): |
|
if not WHATSAPP_API_BASE_URL or not WHATSAPP_API_KEY or not WHATSAPP_INSTANCE_NAME: |
|
return "WhatsApp API credentials or instance missing." |
|
|
|
whatsapp_manager = WhatsAppManager(WHATSAPP_API_BASE_URL, WHATSAPP_API_KEY) |
|
|
|
try: |
|
result = whatsapp_manager.send_message( |
|
WHATSAPP_INSTANCE_NAME, |
|
phone, |
|
"🎉 Welcome to AI Finance Manager!\nYour WhatsApp notifications have been set up successfully." |
|
) |
|
|
|
alerts = [] |
|
if stock: alerts.append("Stock") |
|
if mf: alerts.append("Mutual Fund") |
|
if sentiment: alerts.append("Sentiment") |
|
|
|
return f"WhatsApp notifications set up for: {', '.join(alerts)} - {result}" |
|
except Exception as e: |
|
logging.error(f"Error setting up notifications: {str(e)}") |
|
return f"Error setting up notifications: {str(e)}" |
|
|
|
|
|
def chatbot_response(user_input): |
|
user_input = user_input.lower() |
|
|
|
if "stock" in user_input: |
|
parts = user_input.split() |
|
if len(parts) > 1: |
|
symbol = parts[-1].upper() |
|
prediction = predict_stock(symbol) |
|
if isinstance(prediction,str): |
|
return prediction |
|
else: |
|
return f"The predicted next-day closing price for {symbol} is {prediction:.2f}" |
|
else: |
|
return "Please provide a stock symbol." |
|
|
|
|
|
elif "mutual fund" in user_input: |
|
parts = user_input.split() |
|
if len(parts) > 2 and parts[1] == "code": |
|
scheme_code = parts[-1] |
|
mf_returns, mf_nav_history, error = AMFIApi.analyze_mutual_fund(scheme_code) |
|
if error: |
|
return error |
|
else: |
|
return f"Mutual Fund Analysis:\nName: {mf_returns.get('scheme_name', 'Unknown')}\nCurrent NAV: {mf_returns.get('current_nav', 'N/A'):.2f}\nTotal Return: {mf_returns.get('total_return', 'N/A'):.2f}%" |
|
else: |
|
return "Please enter the mutual fund scheme code for analysis (e.g. 'analyze mutual fund code 123456')." |
|
elif "sentiment" in user_input: |
|
return "Enter the financial news text for sentiment analysis." |
|
elif user_input.startswith("analyze sentiment"): |
|
text = user_input[len("analyze sentiment"):].strip() |
|
if text: |
|
sentiment_result = analyze_sentiment(text) |
|
if sentiment_result: |
|
return f"Sentiment Analysis: {sentiment_result}" |
|
else: |
|
return "No text provided for sentiment analysis." |
|
else: |
|
return "Please provide text for sentiment analysis." |
|
return "I can help with Stock Analysis, Mutual Funds, and Sentiment Analysis. Please ask your query." |
|
|
|
|
|
|
|
def create_gradio_interface(): |
|
with gr.Blocks() as app: |
|
gr.Markdown("# AI Finance & Stock Manager with Chat and WhatsApp Alerts") |
|
|
|
with gr.Tab("Chat"): |
|
chat_input = gr.Textbox(label="Ask about Stocks, Mutual Funds, or Sentiment Analysis") |
|
chat_output = gr.Textbox(label="AI Response", interactive=False) |
|
chat_btn = gr.Button("Ask AI") |
|
|
|
with gr.Tab("Stock Analysis"): |
|
stock_input = gr.Textbox(label="Enter Stock Symbol (e.g., AAPL)") |
|
stock_btn = gr.Button("Analyze Stock") |
|
stock_output = gr.DataFrame() |
|
prediction_output = gr.Number(label="Predicted Next Day Close Price") |
|
|
|
with gr.Tab("Mutual Fund Analysis"): |
|
mf_code = gr.Textbox(label="Enter Scheme Code") |
|
mf_analyze_btn = gr.Button("Analyze Fund") |
|
|
|
|
|
mf_returns = gr.JSON(label="Fund Returns") |
|
mf_nav_history = gr.DataFrame(label="NAV History") |
|
mf_analysis_error = gr.Textbox(label="Error Messages", visible=False) |
|
|
|
with gr.Tab("WhatsApp Notifications"): |
|
phone_input = gr.Textbox(label="WhatsApp Number (with country code)") |
|
enable_stock_alerts = gr.Checkbox(label="Stock Alerts") |
|
enable_mf_alerts = gr.Checkbox(label="Mutual Fund Alerts") |
|
enable_sentiment_alerts = gr.Checkbox(label="Sentiment Alerts") |
|
notification_status = gr.Textbox(label="Notification Status", interactive=False) |
|
setup_btn = gr.Button("Setup WhatsApp Notifications") |
|
|
|
with gr.Tab("Sentiment Analysis"): |
|
text_input = gr.Textbox(label="Enter financial news or text") |
|
sentiment_btn = gr.Button("Analyze Sentiment") |
|
sentiment_output = gr.Label() |
|
|
|
chat_btn.click( |
|
fn=chatbot_response, |
|
inputs=chat_input, |
|
outputs=chat_output |
|
) |
|
|
|
stock_btn.click( |
|
fn=lambda x: (get_stock_data(x), predict_stock(x)), |
|
inputs=stock_input, |
|
outputs=[stock_output, prediction_output] |
|
) |
|
mf_analyze_btn.click( |
|
fn=AMFIApi.analyze_mutual_fund, |
|
inputs=mf_code, |
|
outputs=[mf_returns,mf_nav_history,mf_analysis_error] |
|
) |
|
|
|
sentiment_btn.click( |
|
fn=analyze_sentiment, |
|
inputs=text_input, |
|
outputs=sentiment_output |
|
) |
|
setup_btn.click( |
|
fn=setup_notifications, |
|
inputs=[phone_input, enable_stock_alerts, enable_mf_alerts, enable_sentiment_alerts], |
|
outputs=notification_status |
|
) |
|
return app |
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
app = create_gradio_interface() |
|
app.launch(share=True, debug=True) |