Multimodal AI Assistant with Memory and Speech Multimodal Input Support: Accepts images, video, text, and audio inputs, allowing for versatile applications such as visual question answering, speech recognition, and more. Real-Time Speech Interaction: Supports bilingual real-time speech conversations with configurable voices, including features like emotion, speed, and style control, as well as end-to-end voice cloning and role play. GitHub
To upload files, please first save the app
import streamlit as st
import openai
import speech_recognition as sr
import pyttsx3
import cv2
import numpy as np
from PIL import Image
import base64
import io
import json
from datetime import datetime
import time
from audio_recorder_streamlit import audio_recorder
from gtts import gTTS
import pygame
import tempfile
import os
# Configure page
st.set_page_config(
page_title="Multimodal AI Assistant",
page_icon="🤖",
layout="wide",
initial_sidebar_state="expanded"
)
# Initialize session state
if "messages" not in st.session_state:
st.session_state.messages = []
if "voice_enabled" not in st.session_state:
st.session_state.voice_enabled = False
if "conversation_memory" not in st.session_state:
st.session_state.conversation_memory = []
if "user_profile" not in st.session_state:
st.session_state.user_profile = {}
# Sidebar configuration
with st.sidebar:
st.title("🤖 AI Assistant Settings")
# API Configuration
st.subheader("API Configuration")
api_key = st.text_input("OpenAI API Key", type="password", help="Enter your OpenAI API key")
# Voice Settings
st.subheader("🎤 Voice Settings")
voice_enabled = st.checkbox("Enable Voice", value=st.session_state.voice_enabled)
st.session_state.voice_enabled = voice_enabled
if voice_enabled:
voice_language = st.selectbox("Voice Language", ["en", "es", "fr", "de", "it", "ja", "ko", "zh"])
voice_speed = st.slider("Speech Speed", 0.5, 2.0, 1.0, 0.1)
voice_emotion = st.selectbox("Voice Emotion", ["neutral", "happy", "sad", "excited", "calm"])
# Memory Settings
st.subheader("🧠 Memory Settings")
max_memory = st.slider("Max Memory Items", 10, 100, 50)
clear_memory = st.button("Clear Memory")
if clear_memory:
st.session_state.conversation_memory = []
st.success("Memory cleared!")
# User Profile
st.subheader("👤 User Profile")
user_name = st.text_input("Your Name", value=st.session_state.user_profile.get("name", ""))
user_preferences = st.text_area("Preferences", value=st.session_state.user_profile.get("preferences", ""))
if st.button("Save Profile"):
st.session_state.user_profile = {
"name": user_name,
"preferences": user_preferences
}
st.success("Profile saved!")
# Main interface
st.title("🤖 Multimodal AI Assistant with Memory and Speech")
st.markdown("*Supports text, images, audio, and video with persistent memory*")
# Initialize OpenAI client
if api_key:
client = openai.OpenAI(api_key=api_key)
else:
st.warning("Please enter your OpenAI API key in the sidebar to use the assistant.")
st.stop()
# Helper functions
def encode_image(image):
"""Encode image to base64"""
buffered = io.BytesIO()
image.save(buffered, format="PNG")
return base64.b64encode(buffered.getvalue()).decode()
def process_audio_input(audio_bytes):
"""Process audio input and convert to text"""
if audio_bytes:
try:
# Save audio to temporary file
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp_file:
tmp_file.write(audio_bytes)
tmp_file_path = tmp_file.name
# Use OpenAI Whisper for transcription
with open(tmp_file_path, "rb") as audio_file:
transcript = client.audio.transcriptions.create(
model="whisper-1",
file=audio_file
)
# Clean up
os.unlink(tmp_file_path)
return transcript.text
except Exception as e:
st.error(f"Error processing audio: {str(e)}")
return None
return None
def text_to_speech(text, language="en"):
"""Convert text to speech using gTTS"""
try:
tts = gTTS(text=text, lang=language, slow=False)
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_file:
tts.save(tmp_file.name)
return tmp_file.name
except Exception as e:
st.error(f"Error generating speech: {str(e)}")
return None
def add_to_memory(user_input, ai_response, input_type="text"):
"""Add interaction to conversation memory"""
memory_item = {
"timestamp": datetime.now().isoformat(),
"user_input": user_input,
"ai_response": ai_response,
"input_type": input_type
}
st.session_state.conversation_memory.append(memory_item)
# Keep only recent memories
if len(st.session_state.conversation_memory) > max_memory:
st.session_state.conversation_memory = st.session_state.conversation_memory[-max_memory:]
def generate_ai_response(messages, images=None):
"""Generate AI response using OpenAI API"""
try:
# Build context from memory
context = ""
if st.session_state.conversation_memory:
context = "Previous conversation context:\n"
for memory in st.session_state.conversation_memory[-5:]: # Last 5 interactions
context += f"User: {memory['user_input']}\nAssistant: {memory['ai_response']}\n\n"
# Add user profile to context
if st.session_state.user_profile.get("name"):
context += f"User's name: {st.session_state.user_profile['name']}\n"
if st.session_state.user_profile.get("preferences"):
context += f"User preferences: {st.session_state.user_profile['preferences']}\n"
# Prepare messages for API
api_messages = []
if context:
api_messages.append({"role": "system", "content": context})
for message in messages:
if message["role"] == "user":
content = []
content.append({"type": "text", "text": message["content"]})
# Add images if present
if images:
for img in images:
img_base64 = encode_image(img)
content.append({
"type": "image_url",
"image_url": {"url": f"data:image/png;base64,{img_base64}"}
})
api_messages.append({"role": "user", "content": content})
else:
api_messages.append(message)
response = client.chat.completions.create(
model="gpt-4-vision-preview",
messages=api_messages,
max_tokens=1000
)
return response.choices[0].message.content
except Exception as e:
return f"Error generating response: {str(e)}"
# Input methods
st.subheader("Choose Input Method")
# Create tabs for different input types
tab1, tab2, tab3, tab4 = st.tabs(["💬 Text", "🖼️ Image", "🎤 Audio", "📹 Video"])
user_input = None
uploaded_images = []
input_type = "text"
with tab1:
st.subheader("Text Input")
user_input = st.text_area("Enter your message:", height=100)
if st.button("Send Text", type="primary"):
if user_input:
input_type = "text"
with tab2:
st.subheader("Image Input")
uploaded_files = st.file_uploader(
"Upload images",
type=['png', 'jpg', 'jpeg'],
accept_multiple_files=True
)
if uploaded_files:
uploaded_images = []
for uploaded_file in uploaded_files:
image = Image.open(uploaded_file)
uploaded_images.append(image)
st.image(image, caption=f"Uploaded: {uploaded_file.name}", use_column_width=True)
image_question = st.text_input("Ask a question about the image(s):")
if st.button("Analyze Images", type="primary"):
if uploaded_images and image_question:
user_input = image_question
input_type = "image"
with tab3:
st.subheader("Audio Input")
audio_bytes = audio_recorder(
text="Click to record",
recording_color="#e74c3c",
neutral_color="#34495e"
)
if audio_bytes:
st.audio(audio_bytes, format="audio/wav")
if st.button("Process Audio", type="primary"):
transcribed_text = process_audio_input(audio_bytes)
if transcribed_text:
user_input = transcribed_text
input_type = "audio"
st.success(f"Transcribed: {transcribed_text}")
with tab4:
st.subheader("Video Input")
video_file = st.file_uploader("Upload a video", type=['mp4', 'avi', 'mov'])
if video_file:
st.video(video_file)
video_question = st.text_input("Ask a question about the video:")
if st.button("Analyze Video", type="primary"):
if video_question:
# For video, we'll extract a frame and treat it as an image
try:
# Save video temporarily
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp_file:
tmp_file.write(video_file.read())
tmp_file_path = tmp_file.name
# Extract frame using OpenCV
cap = cv2.VideoCapture(tmp_file_path)
ret, frame = cap.read()
if ret:
# Convert frame to PIL Image
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
frame_image = Image.fromarray(frame_rgb)
uploaded_images = [frame_image]
user_input = f"[Video Analysis] {video_question}"
input_type = "video"
st.success("Video frame extracted for analysis")
cap.release()
os.unlink(tmp_file_path)
except Exception as e:
st.error(f"Error processing video: {str(e)}")
# Process input and generate response
if user_input:
# Add user message to chat history
st.session_state.messages.append({"role": "user", "content": user_input})
# Generate AI response
with st.spinner("Generating response..."):
ai_response = generate_ai_response(st.session_state.messages, uploaded_images)
# Add AI response to chat history
st.session_state.messages.append({"role": "assistant", "content": ai_response})
# Add to memory
add_to_memory(user_input, ai_response, input_type)
# Text-to-speech if enabled
if st.session_state.voice_enabled and 'voice_language' in locals():
with st.spinner("Generating speech..."):
audio_file = text_to_speech(ai_response, voice_language)
if audio_file:
st.audio(audio_file)
# Clean up audio file
try:
os.unlink(audio_file)
except:
pass
# Display chat history
st.subheader("💬 Conversation")
for message in st.session_state.messages:
with st.chat_message(message["role"]):
st.markdown(message["content"])
# Display conversation memory
if st.session_state.conversation_memory:
with st.expander("🧠 Conversation Memory"):
for i, memory in enumerate(reversed(st.session_state.conversation_memory[-10:])):
st.write(f"**{memory['input_type'].title()} Input {len(st.session_state.conversation_memory)-i}:**")
st.write(f"*{memory['timestamp']}*")
st.write(f"**You:** {memory['user_input'][:100]}...")
st.write(f"**Assistant:** {memory['ai_response'][:100]}...")
st.divider()
# Quick actions
st.subheader("⚡ Quick Actions")
col1, col2, col3, col4 = st.columns(4)
with col1:
if st.button("Clear Chat"):
st.session_state.messages = []
st.rerun()
with col2:
if st.button("Export Chat"):
chat_data = {
"messages": st.session_state.messages,
"memory": st.session_state.conversation_memory,
"profile": st.session_state.user_profile
}
st.download_button(
"Download Chat History",
json.dumps(chat_data, indent=2),
"chat_history.json",
"application/json"
)
with col3:
if st.button("Voice Test"):
if st.session_state.voice_enabled:
test_audio = text_to_speech("Hello! This is a voice test.", voice_language if 'voice_language' in locals() else 'en')
if test_audio:
st.audio(test_audio)
with col4:
if st.button("Memory Stats"):
st.info(f"Memory items: {len(st.session_state.conversation_memory)}")
# Footer
st.markdown("---")
st.markdown(
"🤖 **Multimodal AI Assistant** - Supports text, images, audio, and video with persistent memory and speech capabilities"
)
Hi! I can help you with any questions about Streamlit and Python. What would you like to know?