Create a streamlit web app for showing data lineage for a lakehouse that consists of raw, silver and gold layer and consists of multiple databases, tables, table level attributes, data types, transformations etc as source and target. The lineage flow should show end to end source to target lineage based on the search criteria for a given attribute. The interface should allow for a search box, filters to select upstream or downstream flow. The web app should be a nice UI using streamlit flow library.
To upload files, please first save the app
import streamlit as st
import pandas as pd
import networkx as nx
from streamlit_flow import Flow, Node, Edge
# Initialize session state
if 'direction' not in st.session_state:
st.session_state.direction = 'both'
if 'search_term' not in st.session_state:
st.session_state.search_term = ''
# Sample data structure for data lineage
def load_sample_data():
# This would typically come from a database or configuration file
nodes = {
# Raw Layer
'raw.customer': {'layer': 'raw', 'attributes': ['customer_id:int', 'name:string', 'email:string']},
'raw.orders': {'layer': 'raw', 'attributes': ['order_id:int', 'customer_id:int', 'order_date:timestamp']},
'raw.products': {'layer': 'raw', 'attributes': ['product_id:int', 'name:string', 'price:decimal']},
# Silver Layer
'silver.customer_cleaned': {'layer': 'silver', 'attributes': ['customer_id:int', 'name:string', 'email:string', 'valid_email:boolean']},
'silver.orders_enriched': {'layer': 'silver', 'attributes': ['order_id:int', 'customer_id:int', 'order_date:timestamp', 'total_amount:decimal']},
# Gold Layer
'gold.customer_orders': {'layer': 'gold', 'attributes': ['customer_id:int', 'total_orders:int', 'lifetime_value:decimal']},
'gold.daily_sales': {'layer': 'gold', 'attributes': ['date:date', 'total_sales:decimal', 'order_count:int']}
}
edges = [
('raw.customer', 'silver.customer_cleaned', 'Clean customer data'),
('raw.orders', 'silver.orders_enriched', 'Enrich orders'),
('raw.products', 'silver.orders_enriched', 'Add product details'),
('silver.customer_cleaned', 'gold.customer_orders', 'Aggregate customer metrics'),
('silver.orders_enriched', 'gold.customer_orders', 'Calculate customer metrics'),
('silver.orders_enriched', 'gold.daily_sales', 'Aggregate daily sales')
]
return nodes, edges
def create_lineage_graph(nodes, edges, search_term='', direction='both'):
G = nx.DiGraph()
# Add all nodes first
for node_id, attrs in nodes.items():
# Check if the node contains the search term
node_matches = (
search_term.lower() in node_id.lower() or
any(search_term.lower() in attr.lower() for attr in attrs['attributes'])
)
if search_term == '' or node_matches:
G.add_node(node_id, **attrs)
# Add edges
for source, target, transform in edges:
if source in G.nodes and target in G.nodes:
G.add_edge(source, target, transform=transform)
if search_term and direction != 'both':
# Find nodes connected to matching nodes based on direction
matching_nodes = [n for n in G.nodes if search_term.lower() in n.lower()]
connected_nodes = set()
for node in matching_nodes:
if direction == 'upstream':
connected_nodes.update(nx.ancestors(G, node))
connected_nodes.add(node)
elif direction == 'downstream':
connected_nodes.update(nx.descendants(G, node))
connected_nodes.add(node)
# Remove nodes not in the connected set
nodes_to_remove = [n for n in G.nodes if n not in connected_nodes]
G.remove_nodes_from(nodes_to_remove)
return G
def create_flow_visualization(G):
flow = Flow()
# Color mapping for layers
colors = {
'raw': '#FFB6C1', # Light pink
'silver': '#ADD8E6', # Light blue
'gold': '#FFD700' # Gold
}
# Add nodes to the flow
for node_id in G.nodes:
node_attrs = G.nodes[node_id]
layer = node_attrs['layer']
attributes = node_attrs['attributes']
# Create label with attributes
label = f"{node_id}\n" + "\n".join(attributes)
flow.add_node(Node(
id=node_id,
label=label,
style={
"background": colors[layer],
"border": "1px solid gray",
"padding": "10px",
"borderRadius": "5px",
"fontSize": "12px"
}
))
# Add edges to the flow
for source, target, edge_attrs in G.edges(data=True):
flow.add_edge(Edge(
source=source,
target=target,
label=edge_attrs.get('transform', ''),
style={"stroke": "#999"}
))
return flow
def main():
st.title("Data Lineage Explorer")
st.write("Explore data lineage across Raw, Silver, and Gold layers")
# Sidebar controls
with st.sidebar:
st.header("Search and Filter")
search_term = st.text_input(
"Search tables and attributes",
value=st.session_state.search_term
)
direction = st.radio(
"Lineage Direction",
options=['both', 'upstream', 'downstream'],
index=['both', 'upstream', 'downstream'].index(st.session_state.direction)
)
st.session_state.search_term = search_term
st.session_state.direction = direction
# Load data
nodes, edges = load_sample_data()
# Create graph based on search and filters
G = create_lineage_graph(nodes, edges, search_term, direction)
if len(G.nodes) == 0:
st.warning("No matching nodes found. Please try a different search term.")
return
# Create and display the flow visualization
flow = create_flow_visualization(G)
flow.show()
# Display additional information
with st.expander("📊 Lineage Statistics"):
cols = st.columns(3)
with cols[0]:
st.metric("Total Tables", len(G.nodes))
with cols[1]:
st.metric("Total Dependencies", len(G.edges))
with cols[2]:
layers = [G.nodes[n]['layer'] for n in G.nodes]
st.metric("Layers", len(set(layers)))
if __name__ == '__main__':
main()
Hi! I can help you with any questions about Streamlit and Python. What would you like to know?