Create an app to show a duckdb database. I would like to see schemas and tables/views on the left hand side. Ideally the tables/views can be expanded to show their columns (and column types).
To upload files, please first save the app
import streamlit as st
import duckdb
import pandas as pd
import os
from pathlib import Path
st.set_page_config(layout="wide", page_title="DuckDB Explorer")
st.title("DuckDB Explorer")
# File uploader for database file
uploaded_file = st.sidebar.file_uploader("Upload DuckDB database file", type=["db"])
# Option to use in-memory database
use_memory = st.sidebar.checkbox("Use in-memory database", value=True)
# Connect to database
if uploaded_file is not None:
# Save the uploaded file to a temporary file
with open("temp.db", "wb") as f:
f.write(uploaded_file.getbuffer())
conn = duckdb.connect("temp.db", read_only=False)
st.sidebar.success(f"Connected to uploaded database")
elif use_memory:
conn = duckdb.connect(":memory:", read_only=False)
st.sidebar.success("Connected to in-memory database")
# Add sample data option for in-memory database
if st.sidebar.checkbox("Load sample data"):
conn.execute("CREATE TABLE customers AS SELECT * FROM (VALUES (1, 'John', 'Doe'), (2, 'Jane', 'Smith')) AS t(id, first_name, last_name)")
conn.execute("CREATE TABLE orders AS SELECT * FROM (VALUES (1, 1, 100.0, '2023-01-01'), (2, 1, 200.0, '2023-01-15'), (3, 2, 150.0, '2023-02-01')) AS t(order_id, customer_id, amount, order_date)")
st.sidebar.success("Sample data loaded")
else:
st.warning("Please upload a database file or use in-memory database")
st.stop()
# Function to get schema information
def get_schemas():
schemas = conn.execute("SELECT schema_name FROM information_schema.schemata").fetchall()
return [schema[0] for schema in schemas]
# Function to get tables for a schema
def get_tables(schema):
tables = conn.execute(f"SELECT table_name, table_type FROM information_schema.tables WHERE table_schema = '{schema}'").fetchall()
return tables
# Function to get columns for a table
def get_columns(schema, table):
columns = conn.execute(f"""
SELECT column_name, data_type
FROM information_schema.columns
WHERE table_schema = '{schema}'
AND table_name = '{table}'
ORDER BY ordinal_position
""").fetchall()
return columns
# Create two columns for layout
col1, col2 = st.columns([1, 3])
# Left sidebar for schema/table selection
with col1:
st.subheader("Database Explorer")
schemas = get_schemas()
for schema in schemas:
schema_expander = st.expander(f"Schema: {schema}")
with schema_expander:
tables = get_tables(schema)
for table_name, table_type in tables:
icon = "📊" if table_type == "VIEW" else "📋"
table_expander = st.expander(f"{icon} {table_name}")
with table_expander:
columns = get_columns(schema, table_name)
for col_name, col_type in columns:
st.text(f"{col_name} ({col_type})")
# Add button to view table in right panel
if st.button(f"View {table_name}", key=f"view_{schema}_{table_name}"):
st.session_state.selected_schema = schema
st.session_state.selected_table = table_name
# Initialize session state for selected table
if 'selected_schema' not in st.session_state:
st.session_state.selected_schema = None
if 'selected_table' not in st.session_state:
st.session_state.selected_table = None
# Right panel for displaying selected table
with col2:
if st.session_state.selected_schema and st.session_state.selected_table:
st.subheader(f"Table: {st.session_state.selected_schema}.{st.session_state.selected_table}")
# Add search filter
search_term = st.text_input("Search (column values)")
# Add limit control
limit = st.number_input("Limit rows", min_value=1, max_value=1000, value=100)
# Build query with search filter if provided
query = f"SELECT * FROM {st.session_state.selected_schema}.{st.session_state.selected_table}"
if search_term:
# Get column names to build search condition
columns = get_columns(st.session_state.selected_schema, st.session_state.selected_table)
search_conditions = []
for col_name, col_type in columns:
if "char" in col_type.lower() or "text" in col_type.lower():
search_conditions.append(f"CAST({col_name} AS VARCHAR) ILIKE '%{search_term}%'")
if search_conditions:
query += " WHERE " + " OR ".join(search_conditions)
# Add limit
query += f" LIMIT {limit}"
# Execute query and display results
try:
result = conn.execute(query).fetchdf()
st.dataframe(result, use_container_width=True)
st.text(f"Showing {len(result)} rows")
# Add SQL query editor
st.subheader("Custom SQL Query")
custom_query = st.text_area("Enter SQL query:",
value=f"SELECT * FROM {st.session_state.selected_schema}.{st.session_state.selected_table} LIMIT 100;")
if st.button("Run Query"):
try:
custom_result = conn.execute(custom_query).fetchdf()
st.dataframe(custom_result, use_container_width=True)
st.text(f"Query returned {len(custom_result)} rows")
except Exception as e:
st.error(f"Error executing query: {str(e)}")
except Exception as e:
st.error(f"Error: {str(e)}")
else:
st.info("Select a table from the left panel to view its contents")
# Add SQL console at the bottom
st.subheader("SQL Console")
sql_query = st.text_area("Enter SQL query:", height=100)
if st.button("Execute"):
if sql_query:
try:
# Check if query is a SELECT statement
is_select = sql_query.strip().upper().startswith("SELECT")
if is_select:
result = conn.execute(sql_query).fetchdf()
st.dataframe(result, use_container_width=True)
st.success(f"Query executed successfully. Returned {len(result)} rows.")
else:
conn.execute(sql_query)
st.success("Query executed successfully.")
except Exception as e:
st.error(f"Error executing query: {str(e)}")
# Clean up
if uploaded_file is not None and os.path.exists("temp.db"):
conn.close()
# Only attempt to remove if not in use
try:
os.remove("temp.db")
except:
pass
Hi! I can help you with any questions about Streamlit and Python. What would you like to know?