I need a SaaS web app for tracking stocks and creating stock recommendations. I need user accounts with Stripe integration for premium accounts. I need this written in Python and Flask with a good UI framework that I can easily build new premium pages in.
To upload files, please first save the app
import streamlit as st
import pandas as pd
import numpy as np
import yfinance as yf
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import stripe
from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, Boolean, Text
from sqlalchemy.orm import DeclarativeBase, Session
from datetime import datetime, timedelta
import bcrypt
from st_aggrid import AgGrid, GridOptionsBuilder
import os
# Initialize Stripe (use test keys for demo)
stripe.api_key = "sk_test_..." # Replace with your actual test key
# Database setup
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
username = Column(String(50), unique=True, nullable=False)
email = Column(String(100), unique=True, nullable=False)
password_hash = Column(String(255), nullable=False)
is_premium = Column(Boolean, default=False)
stripe_customer_id = Column(String(100))
created_at = Column(DateTime, default=datetime.utcnow)
class Stock(Base):
__tablename__ = "stocks"
id = Column(Integer, primary_key=True)
symbol = Column(String(10), nullable=False)
name = Column(String(100))
current_price = Column(Float)
change_percent = Column(Float)
last_updated = Column(DateTime, default=datetime.utcnow)
class Recommendation(Base):
__tablename__ = "recommendations"
id = Column(Integer, primary_key=True)
symbol = Column(String(10), nullable=False)
recommendation = Column(String(20)) # BUY, SELL, HOLD
target_price = Column(Float)
reasoning = Column(Text)
created_at = Column(DateTime, default=datetime.utcnow)
user_id = Column(Integer)
# Create database
engine = create_engine("sqlite:///stock_tracker.db")
Base.metadata.create_all(bind=engine)
# Helper functions
def hash_password(password):
return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt())
def verify_password(password, hashed):
return bcrypt.checkpw(password.encode('utf-8'), hashed)
def get_stock_data(symbol):
try:
stock = yf.Ticker(symbol)
info = stock.info
hist = stock.history(period="1y")
current_price = info.get('currentPrice', hist['Close'].iloc[-1])
previous_close = info.get('previousClose', hist['Close'].iloc[-2])
change_percent = ((current_price - previous_close) / previous_close) * 100
return {
'symbol': symbol.upper(),
'name': info.get('longName', symbol.upper()),
'current_price': current_price,
'change_percent': change_percent,
'history': hist
}
except Exception as e:
st.error(f"Error fetching data for {symbol}: {str(e)}")
return None
def create_stripe_customer(email):
try:
customer = stripe.Customer.create(email=email)
return customer.id
except Exception as e:
st.error(f"Error creating Stripe customer: {str(e)}")
return None
def create_checkout_session(customer_id):
try:
session = stripe.checkout.Session.create(
customer=customer_id,
payment_method_types=['card'],
line_items=[{
'price_data': {
'currency': 'usd',
'product_data': {
'name': 'Stock Tracker Premium',
'description': 'Premium features including advanced analytics and recommendations'
},
'unit_amount': 2999, # $29.99
'recurring': {
'interval': 'month'
}
},
'quantity': 1,
}],
mode='subscription',
success_url='http://localhost:8501/?success=true',
cancel_url='http://localhost:8501/?cancelled=true'
)
return session.url
except Exception as e:
st.error(f"Error creating checkout session: {str(e)}")
return None
# Initialize session state
if 'user' not in st.session_state:
st.session_state.user = None
if 'watchlist' not in st.session_state:
st.session_state.watchlist = []
# Sidebar for navigation
st.sidebar.title("Stock Tracker SaaS")
if st.session_state.user:
st.sidebar.write(f"Welcome, {st.session_state.user['username']}!")
if st.session_state.user['is_premium']:
st.sidebar.success("🌟 Premium Account")
else:
st.sidebar.info("📊 Free Account")
if st.sidebar.button("Upgrade to Premium"):
st.session_state.page = "upgrade"
if st.sidebar.button("Logout"):
st.session_state.user = None
st.session_state.watchlist = []
st.rerun()
# Main navigation
if st.session_state.user:
page = st.sidebar.selectbox("Navigate", [
"Dashboard",
"Stock Search",
"Watchlist",
"Recommendations",
"Premium Analytics" if st.session_state.user['is_premium'] else "Upgrade to Premium"
])
else:
page = st.sidebar.selectbox("Navigate", ["Login", "Register"])
# Authentication pages
if page == "Login":
st.title("Login")
with st.form("login_form"):
username = st.text_input("Username")
password = st.text_input("Password", type="password")
submitted = st.form_submit_button("Login")
if submitted:
with Session(engine) as session:
user = session.query(User).filter_by(username=username).first()
if user and verify_password(password, user.password_hash):
st.session_state.user = {
'id': user.id,
'username': user.username,
'email': user.email,
'is_premium': user.is_premium,
'stripe_customer_id': user.stripe_customer_id
}
st.success("Login successful!")
st.rerun()
else:
st.error("Invalid username or password")
elif page == "Register":
st.title("Register")
with st.form("register_form"):
username = st.text_input("Username")
email = st.text_input("Email")
password = st.text_input("Password", type="password")
confirm_password = st.text_input("Confirm Password", type="password")
submitted = st.form_submit_button("Register")
if submitted:
if password != confirm_password:
st.error("Passwords do not match")
else:
with Session(engine) as session:
# Check if user already exists
existing_user = session.query(User).filter(
(User.username == username) | (User.email == email)
).first()
if existing_user:
st.error("Username or email already exists")
else:
# Create Stripe customer
customer_id = create_stripe_customer(email)
# Create user
new_user = User(
username=username,
email=email,
password_hash=hash_password(password),
stripe_customer_id=customer_id
)
session.add(new_user)
session.commit()
st.success("Registration successful! Please login.")
# Main application pages
elif page == "Dashboard":
st.title("📈 Stock Tracker Dashboard")
# Market overview
st.subheader("Market Overview")
# Sample market data
market_indices = ['SPY', 'QQQ', 'DIA']
market_data = []
for symbol in market_indices:
data = get_stock_data(symbol)
if data:
market_data.append(data)
if market_data:
cols = st.columns(len(market_data))
for i, data in enumerate(market_data):
with cols[i]:
color = "green" if data['change_percent'] > 0 else "red"
st.metric(
label=data['symbol'],
value=f"${data['current_price']:.2f}",
delta=f"{data['change_percent']:.2f}%"
)
# Recent recommendations
st.subheader("Recent Recommendations")
with Session(engine) as session:
recommendations = session.query(Recommendation).order_by(
Recommendation.created_at.desc()
).limit(5).all()
if recommendations:
rec_data = []
for rec in recommendations:
rec_data.append({
'Symbol': rec.symbol,
'Recommendation': rec.recommendation,
'Target Price': f"${rec.target_price:.2f}" if rec.target_price else "N/A",
'Date': rec.created_at.strftime('%Y-%m-%d')
})
df = pd.DataFrame(rec_data)
st.table(df)
else:
st.info("No recommendations yet. Add some stocks to get started!")
elif page == "Stock Search":
st.title("🔍 Stock Search")
symbol = st.text_input("Enter stock symbol (e.g., AAPL, GOOGL, MSFT)")
if symbol:
data = get_stock_data(symbol)
if data:
col1, col2 = st.columns(2)
with col1:
st.metric(
label=f"{data['name']} ({data['symbol']})",
value=f"${data['current_price']:.2f}",
delta=f"{data['change_percent']:.2f}%"
)
if st.button("Add to Watchlist"):
if symbol.upper() not in st.session_state.watchlist:
st.session_state.watchlist.append(symbol.upper())
st.success(f"Added {symbol.upper()} to watchlist!")
else:
st.warning(f"{symbol.upper()} is already in your watchlist!")
with col2:
if st.session_state.user and st.session_state.user['is_premium']:
# Advanced metrics for premium users
st.subheader("Premium Analytics")
# Calculate some basic analytics
hist = data['history']
volatility = hist['Close'].pct_change().std() * np.sqrt(252) * 100
avg_volume = hist['Volume'].mean()
st.metric("Volatility (Annual)", f"{volatility:.2f}%")
st.metric("Avg Volume", f"{avg_volume:,.0f}")
else:
st.info("🌟 Upgrade to Premium for advanced analytics!")
# Price chart
st.subheader("Price Chart")
fig = go.Figure()
hist = data['history']
fig.add_trace(go.Scatter(
x=hist.index,
y=hist['Close'],
mode='lines',
name='Close Price'
))
fig.update_layout(
title=f"{data['symbol']} Stock Price",
xaxis_title="Date",
yaxis_title="Price ($)",
hovermode='x unified'
)
st.plotly_chart(fig, use_container_width=True)
elif page == "Watchlist":
st.title("👀 Your Watchlist")
if st.session_state.watchlist:
watchlist_data = []
for symbol in st.session_state.watchlist:
data = get_stock_data(symbol)
if data:
watchlist_data.append({
'Symbol': data['symbol'],
'Name': data['name'],
'Price': data['current_price'],
'Change %': data['change_percent']
})
if watchlist_data:
df = pd.DataFrame(watchlist_data)
# Configure AgGrid
gb = GridOptionsBuilder.from_dataframe(df)
gb.configure_pagination(paginationAutoPageSize=True)
gb.configure_side_bar()
# Color coding for positive/negative changes
cell_style = {
'Change %': {
'cellStyle': {
'function': '''
function(params) {
if (params.value > 0) {
return {color: 'green'};
} else if (params.value < 0) {
return {color: 'red'};
}
return null;
}
'''
}
}
}
grid_response = AgGrid(
df,
gridOptions=gb.build(),
theme="streamlit",
fit_columns_on_grid_load=True,
allow_unsafe_jscode=True
)
# Remove from watchlist
st.subheader("Remove from Watchlist")
remove_symbol = st.selectbox("Select stock to remove", st.session_state.watchlist)
if st.button("Remove"):
st.session_state.watchlist.remove(remove_symbol)
st.success(f"Removed {remove_symbol} from watchlist!")
st.rerun()
else:
st.info("Your watchlist is empty. Add some stocks from the Stock Search page!")
elif page == "Recommendations":
st.title("💡 Stock Recommendations")
if st.session_state.user and st.session_state.user['is_premium']:
# Premium users can create recommendations
st.subheader("Create Recommendation")
with st.form("recommendation_form"):
symbol = st.text_input("Stock Symbol")
recommendation = st.selectbox("Recommendation", ["BUY", "SELL", "HOLD"])
target_price = st.number_input("Target Price ($)", min_value=0.0, step=0.01)
reasoning = st.text_area("Reasoning")
submitted = st.form_submit_button("Add Recommendation")
if submitted and symbol:
with Session(engine) as session:
new_rec = Recommendation(
symbol=symbol.upper(),
recommendation=recommendation,
target_price=target_price if target_price > 0 else None,
reasoning=reasoning,
user_id=st.session_state.user['id']
)
session.add(new_rec)
session.commit()
st.success("Recommendation added!")
st.rerun()
# Show all recommendations
st.subheader("All Recommendations")
with Session(engine) as session:
recommendations = session.query(Recommendation).order_by(
Recommendation.created_at.desc()
).all()
if recommendations:
rec_data = []
for rec in recommendations:
rec_data.append({
'Symbol': rec.symbol,
'Recommendation': rec.recommendation,
'Target Price': f"${rec.target_price:.2f}" if rec.target_price else "N/A",
'Reasoning': rec.reasoning[:100] + "..." if len(rec.reasoning) > 100 else rec.reasoning,
'Date': rec.created_at.strftime('%Y-%m-%d %H:%M')
})
df = pd.DataFrame(rec_data)
AgGrid(df, theme="streamlit", fit_columns_on_grid_load=True)
else:
st.info("No recommendations yet.")
elif page == "Premium Analytics":
if st.session_state.user and st.session_state.user['is_premium']:
st.title("🌟 Premium Analytics")
st.subheader("Advanced Portfolio Analysis")
# Portfolio performance simulator
st.subheader("Portfolio Performance Simulator")
if st.session_state.watchlist:
selected_stocks = st.multiselect("Select stocks for analysis", st.session_state.watchlist)
weights = []
if selected_stocks:
st.write("Enter portfolio weights (must sum to 1.0):")
for stock in selected_stocks:
weight = st.number_input(f"{stock} weight", min_value=0.0, max_value=1.0, value=1.0/len(selected_stocks), step=0.01)
weights.append(weight)
if abs(sum(weights) - 1.0) < 0.01: # Allow small rounding errors
if st.button("Analyze Portfolio"):
# Calculate portfolio performance
portfolio_data = []
for stock in selected_stocks:
data = get_stock_data(stock)
if data:
portfolio_data.append(data)
if portfolio_data:
# Create portfolio performance chart
fig = go.Figure()
for i, data in enumerate(portfolio_data):
hist = data['history']
returns = hist['Close'].pct_change()
cumulative_returns = (1 + returns).cumprod()
fig.add_trace(go.Scatter(
x=hist.index,
y=cumulative_returns,
mode='lines',
name=f"{data['symbol']} ({weights[i]*100:.1f}%)"
))
fig.update_layout(
title="Portfolio Performance",
xaxis_title="Date",
yaxis_title="Cumulative Returns",
hovermode='x unified'
)
st.plotly_chart(fig, use_container_width=True)
else:
st.error("Portfolio weights must sum to 1.0")
else:
st.info("Add stocks to your watchlist to analyze portfolio performance.")
# Risk metrics
st.subheader("Risk Analysis")
risk_symbol = st.text_input("Enter symbol for risk analysis")
if risk_symbol:
data = get_stock_data(risk_symbol)
if data:
hist = data['history']
returns = hist['Close'].pct_change().dropna()
# Calculate risk metrics
volatility = returns.std() * np.sqrt(252) * 100
var_95 = np.percentile(returns, 5) * 100
max_drawdown = ((hist['Close'] / hist['Close'].cummax()) - 1).min() * 100
col1, col2, col3 = st.columns(3)
with col1:
st.metric("Annual Volatility", f"{volatility:.2f}%")
with col2:
st.metric("VaR (95%)", f"{var_95:.2f}%")
with col3:
st.metric("Max Drawdown", f"{max_drawdown:.2f}%")
# Risk distribution chart
fig = go.Figure()
fig.add_trace(go.Histogram(
x=returns * 100,
nbinsx=50,
name="Daily Returns"
))
fig.update_layout(
title="Return Distribution",
xaxis_title="Daily Returns (%)",
yaxis_title="Frequency"
)
st.plotly_chart(fig, use_container_width=True)
else:
st.title("🌟 Upgrade to Premium")
st.write("Unlock advanced analytics and features!")
premium_features = [
"Advanced portfolio analysis",
"Risk metrics and VaR calculations",
"Custom stock recommendations",
"Real-time alerts",
"Advanced charting tools",
"Export data to Excel/CSV"
]
for feature in premium_features:
st.write(f"✅ {feature}")
if st.button("Upgrade Now - $29.99/month"):
if st.session_state.user and st.session_state.user['stripe_customer_id']:
checkout_url = create_checkout_session(st.session_state.user['stripe_customer_id'])
if checkout_url:
st.markdown(f"[Click here to upgrade]({checkout_url})")
else:
st.error("Please ensure you're logged in to upgrade.")
elif page == "Upgrade to Premium":
st.title("🌟 Upgrade to Premium")
st.write("Unlock advanced analytics and features!")
premium_features = [
"Advanced portfolio analysis",
"Risk metrics and VaR calculations",
"Custom stock recommendations",
"Real-time alerts",
"Advanced charting tools",
"Export data to Excel/CSV"
]
for feature in premium_features:
st.write(f"✅ {feature}")
if st.button("Upgrade Now - $29.99/month"):
if st.session_state.user and st.session_state.user['stripe_customer_id']:
checkout_url = create_checkout_session(st.session_state.user['stripe_customer_id'])
if checkout_url:
st.markdown(f"[Click here to upgrade]({checkout_url})")
else:
st.error("Please ensure you're logged in to upgrade.")
# Handle Stripe webhook success
if st.query_params.get('success') == 'true':
st.success("Payment successful! Your account has been upgraded to Premium.")
# In a real app, you'd verify the payment with Stripe webhooks
# and update the user's premium status in the database
elif st.query_params.get('cancelled') == 'true':
st.warning("Payment was cancelled.")
Hi! I can help you with any questions about Streamlit and Python. What would you like to know?