Display results from a custom orchestration tool that utilizes postgress to monitor and rerun failures jobs
To upload files, please first save the app
import streamlit as st
import pandas as pd
from datetime import datetime, timedelta
from sqlalchemy import create_engine, text
from sqlalchemy.orm import Session, DeclarativeBase
from sqlalchemy import Column, Integer, String, DateTime, Boolean
import plotly.express as px
# Database setup
class Base(DeclarativeBase):
pass
class Job(Base):
__tablename__ = 'jobs'
id = Column(Integer, primary_key=True)
job_name = Column(String)
status = Column(String)
start_time = Column(DateTime)
end_time = Column(DateTime)
error_message = Column(String, nullable=True)
retries = Column(Integer, default=0)
is_active = Column(Boolean, default=True)
# Create engine and tables
engine = create_engine('sqlite:///jobs.db')
Base.metadata.create_all(bind=engine)
# Initialize session state for job management
if 'selected_jobs' not in st.session_state:
st.session_state.selected_jobs = []
# Page title and description
st.title('Job Monitoring Dashboard')
st.markdown("""
This dashboard allows you to monitor job execution status and manage failed jobs.
You can view job statistics, analyze failures, and trigger reruns of failed jobs.
""")
# Sidebar filters
st.sidebar.header('Filters')
with Session(engine) as session:
# Get unique job names for filter
job_names = session.query(Job.job_name).distinct().all()
job_names = [job[0] for job in job_names]
selected_jobs = st.sidebar.multiselect('Filter by Job Name', job_names, default=job_names)
# Date range filter
date_range = st.sidebar.date_input(
'Date Range',
value=(datetime.now() - timedelta(days=7), datetime.now()),
max_value=datetime.now()
)
# Function to load job data
def load_job_data():
with Session(engine) as session:
query = text("""
SELECT *
FROM jobs
WHERE job_name IN :job_names
AND start_time >= :start_date
AND start_time <= :end_date
""")
df = pd.read_sql(
query,
session.bind,
params={
'job_names': tuple(selected_jobs) if selected_jobs else tuple(job_names),
'start_date': date_range[0],
'end_date': date_range[1]
}
)
return df
# Load data
df = load_job_data()
# Dashboard metrics
col1, col2, col3, col4 = st.columns(4)
with col1:
st.metric('Total Jobs', len(df))
with col2:
success_rate = (df['status'] == 'SUCCESS').mean() * 100
st.metric('Success Rate', f'{success_rate:.1f}%')
with col3:
failed_jobs = len(df[df['status'] == 'FAILED'])
st.metric('Failed Jobs', failed_jobs)
with col4:
avg_duration = (df['end_time'] - df['start_time']).mean()
if pd.notna(avg_duration):
avg_duration_mins = avg_duration.total_seconds() / 60
st.metric('Avg Duration (mins)', f'{avg_duration_mins:.1f}')
# Job Status Timeline
st.subheader('Job Execution Timeline')
fig = px.timeline(
df,
x_start='start_time',
x_end='end_time',
y='job_name',
color='status',
title='Job Execution Timeline'
)
st.plotly_chart(fig, use_container_width=True)
# Failed Jobs Table
st.subheader('Failed Jobs')
failed_df = df[df['status'] == 'FAILED'].copy()
if not failed_df.empty:
# Add a rerun button column
failed_jobs_display = failed_df[['job_name', 'start_time', 'error_message', 'retries']].copy()
st.table(failed_jobs_display)
# Rerun selected failed jobs
if st.button('Rerun Selected Failed Jobs', disabled=len(failed_df) == 0):
with Session(engine) as session:
for idx, job in failed_df.iterrows():
# Update job status and increment retry count
job_record = session.query(Job).filter_by(id=job['id']).first()
if job_record:
job_record.status = 'QUEUED'
job_record.retries += 1
job_record.start_time = datetime.now()
job_record.end_time = None
session.commit()
st.success('Selected jobs have been queued for rerun!')
st.rerun()
else:
st.info('No failed jobs in the selected time period.')
# Job Statistics
st.subheader('Job Statistics')
col1, col2 = st.columns(2)
with col1:
# Status distribution
status_counts = df['status'].value_counts()
fig = px.pie(values=status_counts.values, names=status_counts.index, title='Job Status Distribution')
st.plotly_chart(fig, use_container_width=True)
with col2:
# Retry distribution
retry_counts = df['retries'].value_counts().sort_index()
fig = px.bar(x=retry_counts.index, y=retry_counts.values, title='Retry Distribution',
labels={'x': 'Number of Retries', 'y': 'Count'})
st.plotly_chart(fig, use_container_width=True)
# Sample data generation (comment out in production)
def generate_sample_data():
with Session(engine) as session:
# Check if we already have data
if session.query(Job).first() is None:
# Generate sample jobs
job_names = ['ETL_Process', 'Data_Validation', 'Model_Training', 'API_Health_Check']
statuses = ['SUCCESS', 'FAILED', 'RUNNING']
for i in range(50):
start_time = datetime.now() - timedelta(days=i % 7, hours=i % 24)
end_time = start_time + timedelta(minutes=30)
status = statuses[i % len(statuses)]
job = Job(
job_name=job_names[i % len(job_names)],
status=status,
start_time=start_time,
end_time=end_time if status != 'RUNNING' else None,
error_message='Sample error message' if status == 'FAILED' else None,
retries=i % 3 if status == 'FAILED' else 0
)
session.add(job)
session.commit()
# Generate sample data if needed
generate_sample_data()
Hi! I can help you with any questions about Streamlit and Python. What would you like to know?