stocktrader / app.py
Benjamin Consolvo
syntax error colon
3fd0696
import streamlit as st
import yfinance as yf
import alpaca_trade_api as alpaca
from newsapi import NewsApiClient
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
from datetime import datetime, timedelta
import streamlit as st
import pandas as pd
import matplotlib.pyplot as plt
import logging
import threading
import time
import json
import os
import plotly.graph_objs as go
from sklearn.preprocessing import minmax_scale
from plotly.subplots import make_subplots
# Configure logging with timestamps
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
logger = logging.getLogger(__name__)
# Use session state keys instead of file paths
AUTO_TRADE_LOG_KEY = "auto_trade_log" # Session state key for trade log
AUTO_TRADE_INTERVAL = 10800 # Interval in seconds (e.g., 10800 seconds = 3 hours)
st.set_page_config(layout="wide")
class AlpacaTrader:
def __init__(self, API_KEY, API_SECRET, BASE_URL):
self.alpaca = alpaca.REST(API_KEY, API_SECRET, BASE_URL)
self.cash = 0
self.holdings = {}
self.trades = []
def get_market_status(self):
return self.alpaca.get_clock().is_open
def buy(self, symbol, qty, reason=None):
try:
# Ensure at least $1000 in cash before buying
account = self.alpaca.get_account()
cash_balance = float(account.cash)
if cash_balance < 1000:
logger.warning(f"Low cash: (${cash_balance}) to buy {symbol}. Minimum $1000 required.")
return None
order = self.alpaca.submit_order(symbol=symbol, qty=qty, side='buy', type='market', time_in_force='day')
if reason:
logger.info(f"Bought {qty} shares of {symbol} [Reason: {reason}]")
else:
logger.info(f"Bought {qty} shares of {symbol}")
# Record the trade
if order:
self.trades.append({
'symbol': symbol,
'qty': qty,
'action': 'Buy',
'time': datetime.now(),
'reason': reason
})
return order
except Exception as e:
logger.error(f"Error buying {symbol}: {e}")
return None
def sell(self, symbol, qty, reason=None):
# Check if position exists and has enough quantity before attempting to sell
positions = {p.symbol: float(p.qty) for p in self.alpaca.list_positions()}
if symbol not in positions:
logger.warning(f"No position in {symbol}. Sell not attempted.")
return None
if positions[symbol] < qty:
logger.warning(f"Not enough shares to sell: {qty} requested, {positions[symbol]} available for {symbol}. Sell not attempted.")
return None
try:
order = self.alpaca.submit_order(symbol=symbol, qty=qty, side='sell', type='market', time_in_force='day')
if reason:
logger.info(f"Sold {qty} shares of {symbol} [Reason: {reason}]")
else:
logger.info(f"Sold {qty} shares of {symbol}")
# Record the trade
if order:
self.trades.append({
'symbol': symbol,
'qty': qty,
'action': 'Sell',
'time': datetime.now(),
'reason': reason
})
return order
except Exception as e:
logger.error(f"Error selling {symbol}: {e}")
return None
def getHoldings(self):
positions = self.alpaca.list_positions()
for position in positions:
self.holdings[position.symbol] = float(position.market_value)
# Return holdings as a dictionary for internal use
return self.holdings
def getCash(self):
return self.alpaca.get_account().cash
def update_portfolio(self, symbol, price, qty, action):
if action == 'buy':
self.cash -= price * qty
if symbol in self.holdings:
self.holdings[symbol] += price * qty
else:
self.holdings[symbol] = price * qty
elif action == 'sell':
self.cash += price * qty
self.holdings[symbol] -= price * qty
if self.holdings[symbol] <= 0:
del self.holdings[symbol]
self.trades.append({'symbol': symbol, 'price': price, 'qty': qty, 'action': action, 'time': datetime.now()})
class NewsSentiment:
def __init__(self, API_KEY):
'''
Hutto, C.J. & Gilbert, E.E. (2014). VADER: A Parsimonious Rule-based Model for Sentiment Analysis of Social Media Text. Eighth International Conference on Weblogs and Social Media (ICWSM-14). Ann Arbor, MI, June 2014.
'''
self.newsapi = NewsApiClient(api_key=API_KEY)
self.sia = SentimentIntensityAnalyzer()
def get_news_sentiment(self, symbols):
'''
The News API has a rate limit of 100 requests per day for free accounts. If you exceed this limit, you'll get a rateLimited error. Example error message:
ERROR:__main__:Error getting news for APLD: {'status': 'error', 'code': 'rateLimited', 'message': 'You have made too many requests recently. Developer accounts are limited to 100 requests over a 24 hour period (50 requests available every 12 hours). Please upgrade to a paid plan if you need more requests.'}
'''
sentiment = {}
for symbol in symbols:
try:
articles = self.newsapi.get_everything(q=symbol,
language='en',
sort_by='publishedAt', # <-- fixed argument name
page=1)
compound_score = 0
for article in articles['articles'][:5]: # Check first 5 articles
# print(f'article= {article}')
score = self.sia.polarity_scores(article['title'])['compound']
compound_score += score
avg_score = compound_score / 5 if articles['articles'] else 0
if avg_score > 0.1:
sentiment[symbol] = 'Positive'
elif avg_score < -0.1:
sentiment[symbol] = 'Negative'
else:
sentiment[symbol] = 'Neutral'
except Exception as e:
logger.error(f"Error getting news for {symbol}: {e}")
sentiment[symbol] = 'Neutral'
return sentiment
class StockAnalyzer:
def __init__(self, alpaca):
self.alpaca = alpaca
self.symbols = self.get_top_volume_stocks()
# Build a symbol->name mapping for use in plots/tables
self.symbol_to_name = self.get_symbol_to_name()
def get_symbol_to_name(self):
# Get mapping from symbol to company name using Alpaca asset info
assets = self.alpaca.alpaca.list_assets(status='active')
return {asset.symbol: asset.name for asset in assets}
def get_bars(self, alp_api, symbols, timeframe='1D'):
bars_data = {}
try:
bars = alp_api.get_bars(list(symbols), timeframe).df
if 'symbol' not in bars.columns:
logger.warning("The 'symbol' column is missing in the bars DataFrame.")
return {symbol: {'bar_data': None} for symbol in symbols}
for symbol in symbols:
symbol_bars = bars[bars['symbol'] == symbol]
if not symbol_bars.empty:
bar_info = symbol_bars.iloc[-1]
# Handle index type for timestamp
if isinstance(bar_info.name, tuple):
timestamp = bar_info.name[1].isoformat()
else:
timestamp = bar_info.name.isoformat()
bars_data[symbol] = {
'bar_data': {
'volume': bar_info['volume'],
'open': bar_info['open'],
'high': bar_info['high'],
'low': bar_info['low'],
'close': bar_info['close'],
'timestamp': timestamp
}
}
else:
logger.debug(f"No bar data for symbol: {symbol}")
bars_data[symbol] = {'bar_data': None}
except Exception as e:
logger.warning(f"Error fetching bars in batch: {e}")
for symbol in symbols:
bars_data[symbol] = {'bar_data': None}
return bars_data
def assetswithconditions(self,stock_assets):
cond = {
'class': ['us_equity'],
'exchange': ['NASDAQ', 'NYSE'],
'status': ['active'],
'tradable': [True],
'marginable': [True],
'shortable': [True],
'easy_to_borrow': [True],
'fractionable': [True]
}
assets_with_conditions = []
asset_symbol_dict = {}
for asset in stock_assets:
# Skip symbols with '.' or '/' (preferred shares, warrants, etc.)
if '.' in asset.symbol or '/' in asset.symbol:
continue
if (asset.__getattr__('class') in cond['class'] and
asset.exchange in cond['exchange'] and
asset.status in cond['status'] and
asset.tradable in cond['tradable'] and
asset.marginable in cond['marginable'] and
asset.shortable in cond['shortable'] and
asset.easy_to_borrow in cond['easy_to_borrow'] and
asset.fractionable in cond['fractionable']
):
assets_with_conditions.append(asset)
asset_no_comma = asset.name.replace(',', '')
asset_first_word = asset_no_comma.split()[0]
asset_symbol_dict[asset.symbol] = asset._raw
asset_symbol_dict[asset.symbol]['firstWord'] = asset_first_word
sorted_dict = dict(sorted(asset_symbol_dict.items()))
# print(f'Length of Alpaca assets with conditions = {len(assets_with_conditions)}')
# print(f'assets_with_conditions = {assets_with_conditions}')
return assets_with_conditions, sorted_dict
def get_top_volume_stocks(self,num_stocks=10):
try:
# Get all tradable assets
assets = self.alpaca.alpaca.list_assets(status='active')
# tradable_assets = {asset.symbol: {} for asset in assets if asset.tradable}
# print(f'tradable_assets = {tradable_assets}')
assets_with_conditions, sorted_dict = self.assetswithconditions(assets)
# print(f'sorted_dict = {sorted_dict}')
# Fetch bar data for all tradable assets
# print(f'sorted_dict.keys()={sorted_dict.keys()}')
tradable_assets = self.get_bars(self.alpaca.alpaca, sorted_dict.keys(), timeframe='1D')
# Extract volume and calculate the top 10 stocks by volume
volume_data = {
symbol: info['bar_data']['volume']
for symbol, info in tradable_assets.items()
if info['bar_data'] is not None
}
top_volume_stocks = sorted(volume_data, key=volume_data.get, reverse=True)[:num_stocks]
logger.info(f'top_volume_stocks = {top_volume_stocks}')
return top_volume_stocks
except Exception as e:
logger.error(f"Error fetching top volume stocks: {e}")
return []
def get_historical_data(self, symbols):
data = {}
for symbol in symbols:
try:
# Pull historical data from 2000-01-01 to today, daily interval
ticker = yf.Ticker(symbol)
hist = ticker.history(start='2023-01-01', end=datetime.now().strftime('%Y-%m-%d'), interval='1d')
data[symbol] = hist
except Exception as e:
logger.error(f"Error getting data for {symbol}: {e}")
return data
class TradingApp:
def __init__(self):
self.alpaca = AlpacaTrader(st.secrets['ALPACA_API_KEY'], st.secrets['ALPACA_SECRET_KEY'], 'https://paper-api.alpaca.markets')
self.sentiment = NewsSentiment(st.secrets['NEWS_API_KEY'])
self.analyzer = StockAnalyzer(self.alpaca)
self.data = self.analyzer.get_historical_data(self.analyzer.symbols)
self.auto_trade_log = [] # Store automatic trade actions
def display_charts(self):
# Dynamically adjust columns based on number of stocks and available width
symbols = list(self.data.keys())
if not symbols:
st.warning("No stock data available to display charts.")
return # Exit the function if no symbols are available
symbol_to_name = self.analyzer.symbol_to_name
n = len(symbols)
# Calculate columns based on n for best fit
cols = 3
rows = (n + cols - 1) // cols
subplot_titles = [
f"{symbol} - {symbol_to_name.get(symbol, '')}" for symbol in symbols
]
fig = make_subplots(rows=rows, cols=cols, subplot_titles=subplot_titles)
for idx, symbol in enumerate(symbols):
df = self.data[symbol]
if not df.empty:
row = idx // cols + 1
col = idx % cols + 1
fig.add_trace(
go.Scatter(
x=df.index,
y=df['Close'],
mode='lines',
name=symbol,
hovertemplate=f"%{{x}}<br>{symbol}: %{{y:.2f}}<extra></extra>"
),
row=row,
col=col
)
fig.update_layout(
title="Top Volume Stocks - Price Charts (Since 2023)",
height=max(400 * rows, 600),
showlegend=False,
dragmode=False,
)
# Enable scroll-zoom for each subplot (individual zoom)
fig.update_layout(
xaxis=dict(fixedrange=False),
yaxis=dict(fixedrange=False),
)
for i in range(1, rows * cols + 1):
fig.layout[f'xaxis{i}'].update(fixedrange=False)
fig.layout[f'yaxis{i}'].update(fixedrange=False)
st.plotly_chart(fig, use_container_width=True, config={"scrollZoom": True})
def manual_trade(self):
# Move all user inputs to the sidebar
with st.sidebar:
st.header("Manual Trade")
symbol = st.text_input('Enter stock symbol')
# Fetch the current stock price dynamically using Alpaca's API
def get_stock_price(symbol):
try:
if not symbol:
return None
last_trade = self.alpaca.alpaca.get_latest_trade(symbol)
return last_trade.price
except Exception as e:
logger.error(f"Error fetching stock price for {symbol}: {e}")
return None
# Update stock price when a new symbol is entered
if symbol:
if "stock_price" not in st.session_state or st.session_state.get("last_symbol") != symbol:
st.session_state["stock_price"] = get_stock_price(symbol)
st.session_state["last_symbol"] = symbol
stock_price = st.session_state.get("stock_price")
# Explicitly display the stock price below the input field
if stock_price is not None:
st.write(f"Current stock price for {symbol.upper()}: ${stock_price:,.2f}")
else:
st.write("Enter a valid stock symbol to see the price.")
# Allow user to enter either quantity or amount
trade_option = st.radio("Trade Option", ["Enter Quantity", "Enter Amount"])
qty = st.number_input('Enter quantity', min_value=0.0, step=0.01, value=0.0) if trade_option == "Enter Quantity" else None
amount = st.number_input('Enter amount ($)', min_value=0.0, step=0.01, value=0.0) if trade_option == "Enter Amount" else None
# Dynamically calculate the other field
if stock_price:
if trade_option == "Enter Quantity" and qty:
amount = qty * stock_price
st.write(f"Calculated Amount: ${amount:,.2f}")
elif trade_option == "Enter Amount" and amount:
qty = float(amount / stock_price)
st.write(f"Calculated Quantity: {qty:,.2f}")
action = st.selectbox('Action', ['Buy', 'Sell'])
if st.button('Execute'):
if stock_price and qty:
is_market_open = self.alpaca.get_market_status()
if action == 'Buy':
order = self.alpaca.buy(symbol, qty, reason="Manual Trade")
else:
order = self.alpaca.sell(symbol, qty, reason="Manual Trade")
if order:
if not is_market_open:
_, _, next_open, _ = get_market_times(self.alpaca.alpaca)
next_open_time = next_open.strftime('%Y-%m-%d %H:%M:%S') if next_open else "unknown"
st.warning(f"Market is currently closed. The {action.lower()} order for {qty} shares of {symbol} has been submitted and will execute when the market opens at {next_open_time}.")
else:
st.success(f"Order executed: {action} {qty} shares of {symbol}")
else:
st.error("Order failed")
else:
st.error("Please enter a valid stock symbol and trade details.")
# Display portfolio information in the sidebar
st.header("Alpaca Cash Portfolio")
def refresh_portfolio():
account = self.alpaca.alpaca.get_account()
portfolio_data = {
"Metric": ["Cash Balance", "Buying Power", "Equity", "Portfolio Value"],
"Value": [
f"${int(float(account.cash)):,.0f}",
f"${int(float(account.buying_power)):,.0f}",
f"${int(float(account.equity)):,.0f}",
f"${int(float(account.portfolio_value)):,.0f}"
]
}
df = pd.DataFrame(portfolio_data)
st.table(df.to_dict(orient="records")) # Convert DataFrame to a list of dictionaries
refresh_portfolio()
st.button("Refresh Portfolio", on_click=refresh_portfolio)
def auto_trade_based_on_sentiment(self, sentiment):
"""Execute trades based on sentiment analysis and return actions taken."""
actions = self._execute_sentiment_trades(sentiment)
self.auto_trade_log = actions
return actions
def _execute_sentiment_trades(self, sentiment):
"""Helper method to execute trades based on sentiment.
Used by both auto_trade_based_on_sentiment and background_auto_trade."""
actions = []
symbol_to_name = self.analyzer.symbol_to_name
for symbol, sentiment_value in sentiment.items():
action = None
is_market_open = self.alpaca.get_market_status()
if sentiment_value == 'Positive':
order = self.alpaca.buy(symbol, 1, reason="Sentiment: Positive")
action = 'Buy'
elif sentiment_value == 'Negative':
order = self.alpaca.sell(symbol, 1, reason="Sentiment: Negative")
action = 'Sell'
else:
order = None
action = 'Hold'
logger.info(f"Held {symbol}")
if order:
if not is_market_open:
_, _, next_open, _ = get_market_times(self.alpaca.alpaca)
next_open_time = next_open.strftime('%Y-%m-%d %H:%M:%S') if next_open else "unknown"
logger.warning(f"Market is currently closed. The {action.lower()} order for 1 share of {symbol} has been submitted and will execute when the market opens at {next_open_time}.")
else:
logger.info(f"Order executed: {action} 1 share of {symbol}")
actions.append({
'symbol': symbol,
'company_name': symbol_to_name.get(symbol, ''),
'sentiment': sentiment_value,
'action': action
})
return actions
def background_auto_trade(app):
"""This function runs in a background thread and updates session state with automatic trades."""
while True:
start_time = time.time() # Record the start time of the iteration
sentiment = app.sentiment.get_news_sentiment(app.analyzer.symbols)
# Use the shared method to execute trades
actions = app._execute_sentiment_trades(sentiment)
# Create log entry
log_entry = {
"timestamp": datetime.now().isoformat(),
"actions": actions,
"sentiment": sentiment
}
# Update session state - ensure the UI reflects the latest data
if AUTO_TRADE_LOG_KEY not in st.session_state:
st.session_state[AUTO_TRADE_LOG_KEY] = []
st.session_state[AUTO_TRADE_LOG_KEY].append(log_entry)
# Limit size to avoid memory issues (keep last 50 entries)
if len(st.session_state[AUTO_TRADE_LOG_KEY]) > 50:
st.session_state[AUTO_TRADE_LOG_KEY] = st.session_state[AUTO_TRADE_LOG_KEY][-50:]
# Log the update
logger.info(f"Auto-trade completed. Actions: {actions}")
# Calculate the time taken for this iteration
elapsed_time = time.time() - start_time
sleep_time = max(0, AUTO_TRADE_INTERVAL - elapsed_time) # Ensure non-negative sleep time
logger.info(f"Sleeping for {sleep_time:.2f} seconds before the next auto-trade.")
time.sleep(sleep_time)
def get_auto_trade_log():
"""Get the auto trade log from session state."""
if AUTO_TRADE_LOG_KEY not in st.session_state:
st.session_state[AUTO_TRADE_LOG_KEY] = []
return st.session_state[AUTO_TRADE_LOG_KEY]
def get_market_times(alpaca_api):
try:
clock = alpaca_api.get_clock()
is_open = clock.is_open
now = pd.Timestamp(clock.timestamp).tz_convert('America/New_York')
next_close = pd.Timestamp(clock.next_close).tz_convert('America/New_York')
next_open = pd.Timestamp(clock.next_open).tz_convert('America/New_York')
return is_open, now, next_open, next_close
except Exception as e:
logger.error(f"Error fetching market times: {e}")
return None, None, None, None
def main():
st.title("Ben's Stock Trading Application")
st.markdown("This is a fun stock trading application that uses Alpaca API for trading and News API for sentiment analysis. Come and trade my money! Well, it's a paper account, so it's not real money. But still, have fun!")
if not st.secrets['ALPACA_API_KEY'] or not st.secrets['NEWS_API_KEY']:
st.error("Please configure your ALPACA_API_KEY and NEWS_API_KEY")
return
# Prevent Streamlit from rerunning the script on every widget interaction
# Use session state to persist objects and only update when necessary
if "app_instance" not in st.session_state:
st.session_state["app_instance"] = TradingApp()
app = st.session_state["app_instance"]
# Create two columns for market status and portfolio holdings
col1, col2 = st.columns([1, 1])
# Column 1: Portfolio holdings bar chart
with col1:
st.subheader("Portfolio Holdings")
holdings_container = st.empty() # Create a container for dynamic updates
def update_holdings():
holdings = app.alpaca.getHoldings()
if holdings:
df = pd.DataFrame(list(holdings.items()), columns=['Ticker', 'Market Value'])
fig = go.Figure(
data=[
go.Bar(
x=df['Ticker'],
y=df['Market Value'],
marker=dict(color=df['Market Value'], colorscale='Viridis'),
)
]
)
fig.update_layout(
xaxis_title="Ticker",
yaxis_title="$ USD",
height=400,
)
# Use a unique key by appending the current timestamp
holdings_container.plotly_chart(fig, use_container_width=True, key=f"portfolio_holdings_chart_{time.time()}")
else:
holdings_container.info("No holdings to display.")
# Periodically refresh the holdings plot
update_holdings()
st.button("Refresh Holdings", on_click=update_holdings)
# Add an expandable section for detailed holdings
st.subheader("Detailed Holdings")
with st.expander("View Detailed Holdings"):
holdings = app.alpaca.getHoldings() # Use self.alpaca instead of app.alpaca
if holdings:
detailed_holdings = pd.DataFrame(
[{"Ticker": ticker, "Amount (USD)": round(value)} for ticker, value in holdings.items()]
)
st.table(detailed_holdings)
else:
st.info("No holdings to display.")
# Column 2: Market status
with col2:
is_open, now, next_open, next_close = get_market_times(app.alpaca.alpaca)
market_status = "🟒 Market is OPEN" if is_open else "πŸ”΄ Market is CLOSED"
st.markdown(f"### {market_status}")
if now is not None:
st.markdown(f"**Current time (ET):** {now.strftime('%Y-%m-%d %H:%M:%S')}")
if is_open and next_close is not None:
st.markdown(f"**Market closes at:** {next_close.strftime('%Y-%m-%d %H:%M:%S')} ET")
seconds_left = int((next_close - now).total_seconds())
st.markdown(f"**Time until close:** {pd.to_timedelta(seconds_left, unit='s')}")
elif not is_open and next_open is not None:
st.markdown(f"**Market opens at:** {next_open.strftime('%Y-%m-%d %H:%M:%S')} ET")
seconds_left = int((next_open - now).total_seconds())
st.markdown(f"**Time until open:** {pd.to_timedelta(seconds_left, unit='s')}")
# Initialize auto trade log in session state if needed
if AUTO_TRADE_LOG_KEY not in st.session_state:
st.session_state[AUTO_TRADE_LOG_KEY] = []
# Only start the background thread once
if "auto_trade_thread_started" not in st.session_state:
thread = threading.Thread(target=background_auto_trade, args=(app,), daemon=True)
thread.start()
st.session_state["auto_trade_thread_started"] = True
# Main area: plots and data
app.manual_trade()
app.display_charts()
# Read and display latest auto-trade actions
st.write("Automatic Trading Actions Based on Sentiment (background):")
auto_trade_log = get_auto_trade_log()
if auto_trade_log:
# Show the most recent entry
last_entry = auto_trade_log[-1]
st.write(f"Last checked: {last_entry['timestamp']}")
df = pd.DataFrame(last_entry["actions"])
if "company_name" in df.columns:
df = df[["symbol", "company_name", "sentiment", "action"]]
st.dataframe(df)
st.write("Sentiment Analysis (latest):")
st.write(last_entry["sentiment"])
# Plot buy/sell actions over time
st.write("Auto-Trading History (Buy/Sell Actions Over Time):")
history = []
for entry in auto_trade_log:
ts = entry["timestamp"]
for act in entry["actions"]:
if act["action"] in ("Buy", "Sell"):
history.append({
"timestamp": ts,
"symbol": act["symbol"],
"action": act["action"]
})
if history:
hist_df = pd.DataFrame(history)
if not hist_df.empty:
hist_df["timestamp"] = pd.to_datetime(hist_df["timestamp"])
hist_df["action_value"] = hist_df["action"].replace({"Buy": 1, "Sell": -1}).astype(float)
pivot = hist_df.pivot_table(index="timestamp", columns="symbol", values="action_value", aggfunc="sum")
st.line_chart(pivot.fillna(0))
else:
st.info("Waiting for first background auto-trade run...")
if __name__ == "__main__":
main()