我正在準備一個 apache kafka 的 workshop,需要一個能夠 展示 kafka pub/sub 能力的 GUI。
Drop files here
or click to upload
import streamlit as st
from kafka import KafkaProducer, KafkaConsumer
import json
from datetime import datetime
import time
from faker import Faker
import threading
import queue
# 初始化 Faker 用於生成模擬數據
fake = Faker()
# Kafka 配置
KAFKA_BOOTSTRAP_SERVERS = 'localhost:9092'
KAFKA_TOPIC = 'demo_topic'
# 初始化 session state
if 'messages' not in st.session_state:
st.session_state.messages = queue.Queue()
if 'consumer_running' not in st.session_state:
st.session_state.consumer_running = False
def create_producer():
try:
return KafkaProducer(
bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
except Exception as e:
st.error(f"無法連接到 Kafka 伺服器: {str(e)}")
return None
def consume_messages():
try:
consumer = KafkaConsumer(
KAFKA_TOPIC,
bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
value_deserializer=lambda x: json.loads(x.decode('utf-8')),
auto_offset_reset='latest',
enable_auto_commit=True
)
for message in consumer:
if not st.session_state.consumer_running:
break
st.session_state.messages.put({
'value': message.value,
'timestamp': datetime.fromtimestamp(message.timestamp/1000.0).strftime('%Y-%m-%d %H:%M:%S')
})
except Exception as e:
st.error(f"消費者錯誤: {str(e)}")
st.title('Kafka Pub/Sub 示範')
# 分成兩列
col1, col2 = st.columns(2)
with col1:
st.header('生產者 (Producer)')
message_type = st.selectbox(
'選擇消息類型',
['用戶資料', '訂單資料', '自定義消息']
)
if message_type == '用戶資料':
message = {
'type': 'user',
'name': fake.name(),
'email': fake.email(),
'address': fake.address()
}
elif message_type == '訂單資料':
message = {
'type': 'order',
'order_id': fake.uuid4(),
'product': fake.word(),
'quantity': fake.random_int(min=1, max=100),
'price': round(fake.random_float(min=10, max=1000), 2)
}
else:
message = st.text_area('輸入自定義 JSON 消息', '{"key": "value"}')
try:
message = json.loads(message)
except json.JSONDecodeError:
st.error('請輸入有效的 JSON 格式')
if st.button('發送消息'):
producer = create_producer()
if producer:
try:
producer.send(KAFKA_TOPIC, message)
producer.flush()
st.success('消息發送成功!')
except Exception as e:
st.error(f'發送消息時發生錯誤: {str(e)}')
finally:
producer.close()
with col2:
st.header('消費者 (Consumer)')
if not st.session_state.consumer_running:
if st.button('開始接收消息'):
st.session_state.consumer_running = True
consumer_thread = threading.Thread(target=consume_messages)
consumer_thread.start()
else:
if st.button('停止接收消息'):
st.session_state.consumer_running = False
# 顯示接收到的消息
st.subheader('接收到的消息')
# 創建一個容器來顯示消息
message_container = st.empty()
while True:
try:
while not st.session_state.messages.empty():
msg = st.session_state.messages.get_nowait()
with message_container.container():
st.json(msg)
time.sleep(0.1)
except queue.Empty:
break
st.sidebar.markdown("""
### 使用說明
1. 確保 Kafka 伺服器正在運行於 localhost:9092
2. 在生產者面板選擇消息類型或輸入自定義消息
3. 點擊「發送消息」按鈕發送消息
4. 在消費者面板點擊「開始接收消息」來監聽消息
5. 接收到的消息會即時顯示在消費者面板中
### 注意事項
- 需要預先安裝並運行 Apache Kafka
- 請確保 Kafka Topic 'demo_topic' 已經創建
- 如遇到連接問題,請檢查 Kafka 配置
""")
Hi! I can help you with any questions about Streamlit and Python. What would you like to know?