# app.py import streamlit as st st.set_page_config(layout="wide") import av import cv2 import time import tempfile import os from pathlib import Path from huggingface_hub import hf_hub_download from streamlit_webrtc import webrtc_streamer, VideoProcessorBase, RTCConfiguration from llama_cpp import Llama from llama_cpp.llama_chat_format import LlamaChatCompletionHandlerRegistry, Llava15ChatHandler from termcolor import cprint # ————————————————————————————————————————— # 1) Inline definition & registration of SmolVLM2ChatHandler class SmolVLM2ChatHandler(Llava15ChatHandler): CHAT_FORMAT = ( "<|im_start|>" "{% for message in messages %}" "{{ message['role'] | capitalize }}" "{% if message['role']=='user' and message['content'][0]['type']=='image_url' %}:" "{% else %}: " "{% endif %}" "{% for content in message['content'] %}" "{% if content['type']=='text' %}{{ content['text'] }}" "{% elif content['type']=='image_url' %}" "{% if content['image_url'] is string %}" "{{ content['image_url'] }}\n" "{% elif content['image_url'] is mapping %}" "{{ content['image_url']['url'] }}\n" "{% endif %}" "{% endif %}" "{% endfor %}" "\n" "{% endfor %}" "{% if add_generation_prompt %}Assistant:{% endif %}" ) # Overwrite any previous registration LlamaChatCompletionHandlerRegistry().register_chat_completion_handler( "smolvlm2", SmolVLM2ChatHandler, overwrite=True ) # ————————————————————————————————————————— # 2) Model & CLIP files — download if missing MODEL_FILE = "SmolVLM2-500M-Video-Instruct.Q8_0.gguf" CLIP_FILE = "mmproj-SmolVLM2-500M-Video-Instruct-Q8_0.gguf" MODEL_REPO = "mradermacher/SmolVLM2-500M-Video-Instruct-GGUF" CLIP_REPO = "ggml-org/SmolVLM2-500M-Video-Instruct-GGUF" def ensure_models(): if not os.path.exists(MODEL_FILE): path = hf_hub_download(repo_id=MODEL_REPO, filename=MODEL_FILE) os.symlink(path, MODEL_FILE) if not os.path.exists(CLIP_FILE): path = hf_hub_download(repo_id=CLIP_REPO, filename=CLIP_FILE) os.symlink(path, CLIP_FILE) ensure_models() @st.cache_resource def load_llm(): handler = SmolVLM2ChatHandler(clip_model_path=CLIP_FILE, verbose=False) return Llama( model_path=MODEL_FILE, chat_handler=handler, n_ctx=8192, verbose=False, ) llm = load_llm() # ————————————————————————————————————————— # 3) Helper to run a single frame through the model (with debug) def caption_frame(frame): with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as f: cv2.imwrite(f.name, frame) uri = Path(f.name).absolute().as_uri() messages = [ { "role": "system", "content": ( "Focus only on describing the key dramatic action or notable event occurring " "in this image. Skip general context or scene-setting details unless they are " "crucial to understanding the main action." ), }, { "role": "user", "content": [ {"type": "image_url", "image_url": {"url": uri}}, {"type": "text", "text": "What is happening in this image?"}, ], }, ] print("DEBUG ▶ caption_frame: invoking LLM") resp = llm.create_chat_completion( messages=messages, max_tokens=128, temperature=0.1, repeat_penalty=1.1, # discourage exact token repeats stop=[""], ) out = (resp["choices"][0].get("message", {}).get("content") or "").strip() print(f"DEBUG ▶ LLM returned: {out!r}") return out # ————————————————————————————————————————— # 4) Streamlit UI + WebRTC configuration st.title("🎥 Real-Time Camera Captioning with SmolVLM2 (CPU)") interval_ms = st.slider( "Caption every N ms", min_value=100, max_value=10000, value=1000, step=100 ) RTC_CONFIG = RTCConfiguration({ "iceServers": [{"urls": ["stun:stun.l.google.com:19302"]}] }) class CaptionProcessor(VideoProcessorBase): def __init__(self): self.interval = 1.0 self.last_time = time.time() self.caption = "" def recv(self, frame: av.VideoFrame) -> av.VideoFrame: img = frame.to_ndarray(format="bgr24") now = time.time() if now - self.last_time >= self.interval: self.last_time = now print("DEBUG ▶ CaptionProcessor.recv: time reached, generating caption") self.caption = caption_frame(img) return av.VideoFrame.from_ndarray(img, format="bgr24") ctx = webrtc_streamer( key="smolvlm2-captioner", video_processor_factory=CaptionProcessor, rtc_configuration=RTC_CONFIG, media_stream_constraints={"video": True, "audio": False}, ) # Update the processor interval if ctx.video_processor: ctx.video_processor.interval = interval_ms / 1000.0 # Placeholder for showing captions placeholder = st.empty() if ctx.state.playing: placeholder.markdown("**Caption:** _Waiting for first inference…_") while ctx.state.playing: vp = ctx.video_processor if vp is not None: txt = vp.caption or "_…thinking…_" else: txt = "_…loading…_" placeholder.markdown(f"**Caption:** {txt}") time.sleep(0.1) else: st.info("▶️ Click **Start** above to begin streaming")