create an rag pipeline for video
To upload files, please first save the app
import streamlit as st
import os
from dotenv import load_dotenv
from langchain.vectorstores import Chroma
from langchain.embeddings import OpenAIEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.llms import OpenAI
from langchain.chains import ConversationalRetrievalChain
from langchain.document_loaders import TextLoader
from pytube import YouTube
from moviepy.editor import VideoFileClip
import pytesseract
import tempfile
# Load environment variables
load_dotenv()
# Initialize session state
if 'chat_history' not in st.session_state:
st.session_state.chat_history = []
if 'vector_store' not in st.session_state:
st.session_state.vector_store = None
if 'chain' not in st.session_state:
st.session_state.chain = None
def extract_frames_text(video_path):
"""Extract text from video frames using OCR"""
video = VideoFileClip(video_path)
texts = []
# Sample frames every 1 second
for t in range(0, int(video.duration), 1):
frame = video.get_frame(t)
text = pytesseract.image_to_string(frame)
if text.strip(): # Only add non-empty text
texts.append(f"Time {t}s: {text}")
video.close()
return "\n".join(texts)
def process_video(url):
"""Download video and process it for RAG"""
try:
# Download video
yt = YouTube(url)
with tempfile.TemporaryDirectory() as temp_dir:
video_path = os.path.join(temp_dir, "video.mp4")
yt.streams.filter(progressive=True, file_extension='mp4').first().download(filename=video_path)
# Extract text from video frames
text_content = extract_frames_text(video_path)
# Create documents from text
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
length_function=len
)
texts = text_splitter.split_text(text_content)
# Create vector store
embeddings = OpenAIEmbeddings()
vector_store = Chroma.from_texts(texts, embeddings)
# Create conversation chain
st.session_state.vector_store = vector_store
st.session_state.chain = ConversationalRetrievalChain.from_llm(
llm=OpenAI(temperature=0.7),
retriever=vector_store.as_retriever(),
)
return True
except Exception as e:
st.error(f"Error processing video: {str(e)}")
return False
def get_response(query):
"""Get response from the conversation chain"""
if st.session_state.chain is None:
return "Please process a video first."
result = st.session_state.chain({"question": query, "chat_history": st.session_state.chat_history})
st.session_state.chat_history.append((query, result['answer']))
return result['answer']
# UI
st.title("Video RAG Assistant")
# Video URL input
video_url = st.text_input("Enter YouTube video URL")
if video_url and st.button("Process Video"):
with st.spinner("Processing video..."):
success = process_video(video_url)
if success:
st.success("Video processed successfully!")
# Chat interface
if st.session_state.chain is not None:
for message in st.session_state.chat_history:
with st.chat_message("user"):
st.write(message[0])
with st.chat_message("assistant"):
st.write(message[1])
if query := st.chat_input("Ask about the video"):
with st.chat_message("user"):
st.write(query)
with st.chat_message("assistant"):
response = get_response(query)
st.write(response)
Hi! I can help you with any questions about Streamlit and Python. What would you like to know?