Drop files here
or click to upload
import os
import io
import json
import time
import math
import random
from dataclasses import dataclass
from typing import Dict, List, Tuple, Optional
import numpy as np
import pandas as pd
import streamlit as st
# Optional deps: these are used only if available
try:
import graphviz # for DFG visualization
except Exception:
graphviz = None
try:
import networkx as nx
except Exception:
nx = None
# -----------------------------
# App Config
# -----------------------------
st.set_page_config(
page_title="Process Mining & Simulation",
layout="wide",
page_icon="đ",
)
# -----------------------------
# Utility functions
# -----------------------------
REQUIRED_EVENT_COLUMNS = ["case_id", "activity", "timestamp"]
def _is_datetime(s: pd.Series) -> bool:
try:
pd.to_datetime(s.dropna().head(25), errors="raise")
return True
except Exception:
return False
def detect_event_log_columns(df: pd.DataFrame) -> Optional[Dict[str, str]]:
"""Best-effort detection of case_id, activity, timestamp columns."""
cols = [c for c in df.columns]
lower = {c.lower(): c for c in cols}
mapping = {}
# Heuristics by common names
for key, candidates in {
"case_id": ["case_id", "case", "trace", "processinstanceid", "id"],
"activity": ["activity", "event", "task", "step", "lifecycle:transition"],
"timestamp": ["timestamp", "time", "start_time", "end_time", "date"]
}.items():
for cand in candidates:
if cand in lower:
mapping[key] = lower[cand]
break
# Validate timestamp
if mapping.get("timestamp") and not _is_datetime(df[mapping["timestamp"]]):
mapping.pop("timestamp", None)
# If incomplete, try to guess by dtype patterns
if "case_id" not in mapping:
# pick a column with low cardinality but not 1 and not timestamp
candidates = [
c for c in cols
if df[c].nunique() <= max(1000, len(df)//5) and not _is_datetime(df[c])
]
if candidates:
mapping["case_id"] = candidates[0]
if "activity" not in mapping:
# pick a text-like column with moderate cardinality
candidates = [c for c in cols if df[c].dtype == 'object']
if candidates:
mapping["activity"] = candidates[0]
if "timestamp" not in mapping:
candidates = [c for c in cols if _is_datetime(df[c])]
if candidates:
mapping["timestamp"] = candidates[0]
if set(REQUIRED_EVENT_COLUMNS).issubset(mapping.keys()):
return mapping
return None
def normalize_event_log(df: pd.DataFrame, mapping: Dict[str, str]) -> pd.DataFrame:
log = df.rename(columns={
mapping['case_id']: 'case_id',
mapping['activity']: 'activity',
mapping['timestamp']: 'timestamp',
}).copy()
log['timestamp'] = pd.to_datetime(log['timestamp'], errors='coerce')
log = log.dropna(subset=['timestamp'])
# Sort and reset index
log = log.sort_values(['case_id', 'timestamp']).reset_index(drop=True)
return log
def compute_directly_follows(log: pd.DataFrame) -> pd.DataFrame:
"""Compute a simple Directly-Follows Graph (DFG) with frequencies and mean durations."""
pairs = []
for cid, grp in log.groupby('case_id', sort=False):
acts = grp['activity'].tolist()
times = grp['timestamp'].tolist()
for i in range(len(acts)-1):
a, b = acts[i], acts[i+1]
dt = (times[i+1] - times[i]).total_seconds()
pairs.append((a, b, dt))
if not pairs:
return pd.DataFrame(columns=['source', 'target', 'count', 'mean_sec'])
df = pd.DataFrame(pairs, columns=['source', 'target', 'dt'])
agg = df.groupby(['source','target']).agg(count=('dt','size'), mean_sec=('dt','mean')).reset_index()
return agg
def compute_activity_stats(log: pd.DataFrame) -> pd.DataFrame:
"""Per-activity duration estimates based on time between activity and next in same case."""
rows = []
for cid, grp in log.groupby('case_id', sort=False):
acts = grp['activity'].tolist()
times = grp['timestamp'].tolist()
for i in range(len(acts)):
start = times[i]
end = times[i+1] if i < len(acts)-1 else None
dur = (end - start).total_seconds() if end else np.nan
rows.append((acts[i], dur))
stat = pd.DataFrame(rows, columns=['activity','duration_sec'])
agg = stat.groupby('activity').agg(
events=('duration_sec','size'),
mean_sec=('duration_sec','mean'),
p50_sec=('duration_sec', lambda s: np.nanpercentile(s.dropna(), 50) if s.notna().any() else np.nan),
p90_sec=('duration_sec', lambda s: np.nanpercentile(s.dropna(), 90) if s.notna().any() else np.nan),
).reset_index()
return agg
def plot_dfg_gv(dfg: pd.DataFrame) -> Optional["graphviz.Digraph"]:
if graphviz is None or dfg.empty:
return None
g = graphviz.Digraph(format='svg')
# nodes
nodes = set(dfg['source']).union(set(dfg['target']))
for n in nodes:
g.node(str(n))
# edges
for _, r in dfg.iterrows():
label = f"{int(r['count'])} | {int(r['mean_sec'])}s"
g.edge(str(r['source']), str(r['target']), label=label)
return g
# -----------------------------
# Simple Monte Carlo Simulation
# -----------------------------
@dataclass
class ActivityConfig:
activity: str
base_mean_sec: float
cost_per_event: float
emission_kg_co2e: float
defect_rate: float # probability that the activity introduces a defect
def infer_transition_probabilities(dfg: pd.DataFrame) -> Dict[str, List[Tuple[str, float]]]:
"""From DFG counts compute outgoing probabilities per source node."""
trans: Dict[str, List[Tuple[str, float]]] = {}
if dfg.empty:
return trans
for src, grp in dfg.groupby('source'):
total = grp['count'].sum()
probs = [(t, c/total) for t, c in zip(grp['target'], grp['count'])]
trans[src] = probs
return trans
def simulate_cases(
start_nodes: List[str],
trans: Dict[str, List[Tuple[str, float]]],
cfg: Dict[str, ActivityConfig],
n_cases: int = 1000,
mean_scale: float = 1.0,
branch_bias: float = 0.0,
rng_seed: int = 42,
) -> pd.DataFrame:
"""Simulate cases traversing the DFG. If multiple start nodes, pick by frequency.
mean_scale multiplies all mean durations. branch_bias shifts probabilities slightly towards more frequent edges.
"""
rng = random.Random(rng_seed)
def pick_next(options: List[Tuple[str, float]]):
if not options:
return None
probs = np.array([p for _, p in options], dtype=float)
if branch_bias != 0:
# simple sharpening/flattening
probs = probs ** (1 - branch_bias)
probs = probs / probs.sum()
choice = rng.choices([t for t,_ in options], weights=probs, k=1)[0]
return choice
results = []
for case in range(n_cases):
# pick a random starting node from provided list
if not start_nodes:
break
node = rng.choice(start_nodes)
t_total = 0.0
cost_total = 0.0
co2_total = 0.0
defect_flag = False
visited = set()
steps = 0
while node is not None and steps < 200: # guard against loops
steps += 1
conf = cfg.get(node)
if conf is not None and conf.base_mean_sec > 0:
# sample duration from lognormal with mean ~ base_mean_sec
mu = math.log(max(conf.base_mean_sec * mean_scale, 1e-3)) - 0.5 # rough
dur = rng.lognormvariate(mu, 0.5)
t_total += dur
cost_total += conf.cost_per_event
co2_total += conf.emission_kg_co2e
if rng.random() < conf.defect_rate:
defect_flag = True
# move to next
options = trans.get(node, [])
nxt = pick_next(options)
node = nxt
results.append({
'case_id': case,
'cycle_time_sec': t_total,
'cost': cost_total,
'co2_kg': co2_total,
'defect': int(defect_flag),
})
return pd.DataFrame(results)
# -----------------------------
# OpenAI helper (optional)
# -----------------------------
def generate_ai_insights(text: str) -> Optional[str]:
"""Stub for OpenAI API. Uses environment variable OPENAI_API_KEY if available.
Returns a short bullet list of insights.
"""
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
return None
try:
from openai import OpenAI
client = OpenAI(api_key=api_key)
prompt = (
"Du bist ein Process-Mining-Analyst. Verdichte die folgenden Kennzahlen "
"zu drei prägnanten Optimierungsvorschlägen mit je einem konkreten Hebel.\n\n" + text
)
resp = client.chat.completions.create(
model="gpt-4o-mini", # adjust as needed
messages=[{"role":"user","content": prompt}],
temperature=0.2,
max_tokens=300,
)
return resp.choices[0].message.content
except Exception:
return None
# -----------------------------
# Sidebar â Upload & Mapping
# -----------------------------
st.sidebar.title("Daten")
up = st.sidebar.file_uploader("CSV/XLSX hochladen", type=["csv", "xlsx"])
if up is not None:
if up.name.lower().endswith(".csv"):
df_raw = pd.read_csv(up)
else:
df_raw = pd.read_excel(up)
else:
st.info("Laden Sie eine Event-Log-Datei hoch (CSV/XLSX) oder testen Sie mit den Beispieldaten unten.")
if st.sidebar.button("Beispieldaten laden"):
# tiny demo log: three variants
data = {
'case_id': [1,1,1, 2,2, 3,3,3,3],
'activity': ['A','B','C', 'A','C', 'A','B','B','C'],
'timestamp': [
'2024-01-01 08:00','2024-01-01 09:00','2024-01-01 10:00',
'2024-01-02 11:00','2024-01-02 12:00',
'2024-01-03 08:30','2024-01-03 09:00','2024-01-03 09:30','2024-01-03 10:30'
]
}
df_raw = pd.DataFrame(data)
if 'df_raw' in locals():
st.success(f"Daten geladen: {df_raw.shape[0]} Zeilen Ă {df_raw.shape[1]} Spalten")
st.dataframe(df_raw.head(25))
mapping_auto = detect_event_log_columns(df_raw)
with st.expander("Spalten-Mapping prĂźfen"):
if mapping_auto:
st.caption("Automatisch erkannt. Bei Bedarf anpassen.")
col_case = st.selectbox("Fall-ID (case_id)", df_raw.columns, index=(df_raw.columns.get_loc(mapping_auto['case_id']) if mapping_auto and 'case_id' in mapping_auto else 0))
col_act = st.selectbox("Aktivität (activity)", df_raw.columns, index=(df_raw.columns.get_loc(mapping_auto['activity']) if mapping_auto and 'activity' in mapping_auto else 1))
col_time = st.selectbox("Zeitstempel (timestamp)", df_raw.columns, index=(df_raw.columns.get_loc(mapping_auto['timestamp']) if mapping_auto and 'timestamp' in mapping_auto else 2))
mapping = {"case_id": col_case, "activity": col_act, "timestamp": col_time}
log = normalize_event_log(df_raw, mapping)
# -----------------------------
# Tabs: Overview | Process | Simulation | Insights
# -----------------------------
tab_overview, tab_process, tab_sim, tab_ai = st.tabs(["đ Ăbersicht", "đ¸ď¸ Prozess", "đ§Ş Simulation", "đ¤ Insights"])
with tab_overview:
st.subheader("Kernkennzahlen")
# Basic KPIs
cases = log['case_id'].nunique()
events = len(log)
activities = log['activity'].nunique()
start_time = log['timestamp'].min()
end_time = log['timestamp'].max()
# Cycle time per case
ct = log.groupby('case_id')['timestamp'].agg(['min','max'])
ct['cycle_time_sec'] = (ct['max'] - ct['min']).dt.total_seconds()
avg_ct = ct['cycle_time_sec'].mean() if not ct.empty else np.nan
c1, c2, c3, c4 = st.columns(4)
c1.metric("Fälle", f"{cases}")
c2.metric("Events", f"{events}")
c3.metric("Aktivitäten", f"{activities}")
c4.metric("Ă Durchlaufzeit", f"{avg_ct/3600:.2f} h" if not math.isnan(avg_ct) else "â")
st.caption(f"Zeitraum: {start_time} bis {end_time}")
st.markdown("---")
st.subheader("Aktivitätsstatistik")
act_stats = compute_activity_stats(log)
st.dataframe(act_stats)
with tab_process:
st.subheader("Directly-Follows Graph (DFG)")
dfg = compute_directly_follows(log)
st.dataframe(dfg)
gv = plot_dfg_gv(dfg)
if gv is not None:
st.graphviz_chart(gv)
else:
st.warning("Graphviz nicht verfĂźgbar â Edge-Tabelle zeigt Frequenzen und mittlere Dauer in Sekunden.")
with tab_sim:
st.subheader("What-if Simulation & Dashboard")
st.caption("Simulieren Sie Ănderungen und beobachten Sie 4 KPIs: Emissionen, Finanzen, Durchlaufzeit, Qualität.")
dfg = compute_directly_follows(log)
trans = infer_transition_probabilities(dfg)
activities_list = sorted(set(log['activity']))
# estimate base means from act_stats
act_stats = compute_activity_stats(log).set_index('activity')
default_cfg = []
for a in activities_list:
m = float(act_stats.loc[a, 'mean_sec']) if a in act_stats.index and not math.isnan(act_stats.loc[a,'mean_sec']) else 60.0
default_cfg.append({
'activity': a,
'base_mean_sec': round(m,2),
'cost_per_event': 1.0,
'emission_kg_co2e': 0.01,
'defect_rate': 0.01,
})
st.markdown("**Parameter pro Aktivität** (editierbar)")
cfg_df = st.data_editor(pd.DataFrame(default_cfg), num_rows="dynamic")
st.markdown("**Globale Stellhebel**")
colA, colB, colC = st.columns(3)
with colA:
mean_scale = st.slider("Dauerfaktor (0.5xâ2x)", 0.5, 2.0, 1.0, 0.05)
with colB:
branch_bias = st.slider("Pfadgewichtung (â0.9 bis +0.9)", -0.9, 0.9, 0.0, 0.1)
with colC:
n_cases = st.slider("# simulierte Fälle", 100, 10000, 2000, 100)
# Determine start nodes as those without incoming edges
starts = list(set(dfg['source']) - set(dfg['target'])) if not dfg.empty else activities_list[:1]
cfg_map = {
r['activity']: ActivityConfig(
activity=r['activity'],
base_mean_sec=float(r['base_mean_sec'] or 60.0),
cost_per_event=float(r['cost_per_event'] or 0.0),
emission_kg_co2e=float(r['emission_kg_co2e'] or 0.0),
defect_rate=float(r['defect_rate'] or 0.0),
)
for _, r in cfg_df.iterrows()
}
sim_btn = st.button("Simulation starten", type="primary")
if sim_btn:
with st.spinner("Simuliere Fälle..."):
sim = simulate_cases(starts, trans, cfg_map, n_cases=n_cases, mean_scale=mean_scale, branch_bias=branch_bias)
if sim.empty:
st.error("Simulation ergab keine Daten. PrĂźfen Sie das Event-Log.")
else:
k1, k2, k3, k4 = st.columns(4)
k1.metric("Ă Durchlaufzeit", f"{sim['cycle_time_sec'].mean()/3600:.2f} h")
k2.metric("Ă Kosten/Fall", f"{sim['cost'].mean():.2f}")
k3.metric("Ă Emissionen/Fall", f"{sim['co2_kg'].mean():.4f} kg COâe")
k4.metric("Fehlerquote", f"{sim['defect'].mean()*100:.2f}%")
st.bar_chart(sim[['cycle_time_sec','cost','co2_kg']])
st.caption("Tipp: Mit der Daten-Editor-Tabelle oben kĂśnnen Sie z.âŻB. Automatisierungseffekte (geringere Dauer/Kosten) oder grĂźne Energie (geringere Emissionen) simulieren.")
with tab_ai:
st.subheader("Automatische Text-Insights (optional)")
st.caption("Wenn eine OPENAI_API_KEY-Umgebungsvariable gesetzt ist, werden hier Vorschläge generiert.")
# Compile a short context from previous tabs
try:
dfg = compute_directly_follows(log)
stats = compute_activity_stats(log)
context = {
'kpis': {
'cases': int(log['case_id'].nunique()),
'events': int(len(log)),
'activities': int(log['activity'].nunique()),
},
'top_edges': dfg.sort_values('count', ascending=False).head(5).to_dict(orient='records'),
'act_stats_head': stats.head(5).to_dict(orient='records'),
}
insight = generate_ai_insights(json.dumps(context, ensure_ascii=False))
if insight:
st.markdown(insight)
else:
st.info("Kein API-Key gefunden oder Aufruf fehlgeschlagen. Setzen Sie OPENAI_API_KEY, um Insights zu erhalten.")
except Exception as e:
st.error(f"Fehler bei der Insight-Erzeugung: {e}")
else:
st.stop()
Hi! I can help you with any questions about Streamlit and Python. What would you like to know?