To upload files, please first save the app
import streamlit as st
from kafka import KafkaProducer, KafkaConsumer
# Function to produce a message to Kafka
def produce_message(topic, message):
try:
# Adjust the bootstrap_servers and SSL file paths as necessary.
producer = KafkaProducer(
bootstrap_servers=['localhost:9093'],
security_protocol='SSL',
ssl_cafile='path/to/ca-cert.pem',
ssl_certfile='path/to/client-cert.pem',
ssl_keyfile='path/to/client-key.pem'
)
producer.send(topic, value=message.encode('utf-8'))
producer.flush()
return "Message sent successfully!"
except Exception as e:
return f"Error sending message: {e}"
# Function to consume messages from Kafka
def consume_messages(topic):
try:
# Adjust the bootstrap_servers and SSL file paths as necessary.
consumer = KafkaConsumer(
topic,
bootstrap_servers=['localhost:9093'],
security_protocol='SSL',
ssl_cafile='path/to/ca-cert.pem',
ssl_certfile='path/to/client-cert.pem',
ssl_keyfile='path/to/client-key.pem',
auto_offset_reset='earliest',
consumer_timeout_ms=2000 # Timeout to stop polling
)
messages = []
for msg in consumer:
messages.append(msg.value.decode('utf-8'))
consumer.close()
return messages
except Exception as e:
return [f"Error consuming messages: {e}"]
def main():
st.title("Demo Workshop GUI with Kafka Integration")
st.header("Kafka Server, Producer & Consumer Demo")
st.write(
"This demo showcases a Kafka integration using a producer to send messages and a consumer to fetch them. "
"Ensure that your Kafka server is running with SSL as per the workshop setup."
)
# Sidebar for navigation
st.sidebar.title("Navigation")
module = st.sidebar.selectbox("Select a Module", ["Introduction", "Kafka Demo", "Q&A"])
if module == "Introduction":
st.subheader("Introduction")
st.write(
"Welcome to the workshop demo. In this session, we demonstrate how to interact with a Kafka server "
"that uses self-signed SSL certificates. You will see how to produce and consume messages using Kafka."
)
elif module == "Kafka Demo":
st.subheader("Kafka Producer/Consumer Demo")
st.write(
"Below you can send messages to a Kafka topic and fetch messages from it. "
"Please update the certificate paths and server details as needed."
)
# Kafka Producer Section
st.markdown("### Kafka Producer")
topic = st.text_input("Enter Kafka Topic", value="demo")
message = st.text_input("Message to send", value="Hello, Kafka!")
if st.button("Send Message to Kafka"):
result = produce_message(topic, message)
st.write(result)
st.markdown("### Kafka Consumer")
if st.button("Fetch Messages from Kafka"):
messages = consume_messages(topic)
if messages:
st.write("Received messages:")
for m in messages:
st.write("- " + m)
else:
st.write("No messages received or there was an error.")
elif module == "Q&A":
st.subheader("Q&A")
st.write("Feel free to ask any questions below.")
question = st.text_area("Your question:")
if st.button("Submit Question"):
st.write("Thank you for your question! We will get back to you soon.")
if __name__ == "__main__":
main()
Hi! I can help you with any questions about Streamlit and Python. What would you like to know?