File size: 3,934 Bytes
ca97f63
 
 
 
 
dd0d47d
ca97f63
dd0d47d
ca97f63
 
dd0d47d
 
 
ca97f63
 
 
dd0d47d
 
 
ca97f63
 
 
dd0d47d
ca97f63
dd0d47d
 
 
 
 
ca97f63
 
dd0d47d
 
 
 
 
ca97f63
dd0d47d
ca97f63
 
 
 
 
 
dd0d47d
ca97f63
dd0d47d
ca97f63
dd0d47d
 
 
 
 
 
 
 
 
ca97f63
dd0d47d
ca97f63
dd0d47d
ca97f63
dd0d47d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import os
import sys
import time
import socket
import atexit
import subprocess
import shutil
from pathlib import Path

import streamlit as st
import cv2
from PIL import Image
import base64
import requests
from huggingface_hub import hf_hub_download

# --- Configuration (reuse from main.py) ---
PORT = 8000
BASE_URL = f"http://localhost:{PORT}/v1"
MODEL_ALIAS = "gpt-4-vision-preview"
REPO_ID = "ggml-org/SmolVLM2-500M-Video-Instruct-GGUF"
MODEL_FILE = "SmolVLM2-500M-Video-Instruct-Q8_0.gguf"
PROJ_FILE  = "mmproj-SmolVLM2-500M-Video-Instruct-Q8_0.gguf"

# Download model files if missing

def download_if_missing(repo_id: str, filename: str) -> None:
    if not os.path.isfile(filename):
        cached = hf_hub_download(repo_id=repo_id, filename=filename)
        shutil.copy(cached, filename)

# Ensure models on startup
ensure_models = lambda: [download_if_missing(REPO_ID, MODEL_FILE), download_if_missing(REPO_ID, PROJ_FILE)]
ensure_models()

# Start local server for captioning

def start_server() -> subprocess.Popen:
    cmd = [
        sys.executable, "-m", "llama_cpp.server",
        "--model", MODEL_FILE,
        "--clip_model_path", PROJ_FILE,
        "--chat_format", "llava-1-5",
        "--port", str(PORT),
        "--model_alias", MODEL_ALIAS
    ]
    proc = subprocess.Popen(cmd)
    atexit.register(proc.terminate)
    # wait until responsive
    for _ in range(40):
        try:
            with socket.create_connection(("localhost", PORT), timeout=1):
                return proc
        except OSError:
            time.sleep(0.25)
    proc.terminate()
    raise RuntimeError(f"Server failed to start on port {PORT}.")

server_proc = start_server()

# Send image to caption API

def caption_image_file(path: str) -> str:
    b64 = base64.b64encode(open(path, "rb").read()).decode()
    uri = f"data:image/jpeg;base64,{b64}"
    payload = {
        "model": MODEL_ALIAS,
        "messages": [
            {"role": "system", "content": (
                "You are a precise image-captioning assistant. "
                "Identify the main subject, their clothing, posture, and environment."
            )},
            {"role": "user", "content": [
                {"type": "image_url", "image_url": {"url": uri}},
                {"type": "text",      "text": "Caption this image in one detailed sentence."}
            ]}
        ],
        "temperature": 0.1,
        "max_tokens": 100
    }
    resp = requests.post(BASE_URL + "/chat/completions", json=payload)
    resp.raise_for_status()
    return resp.json()["choices"][0]["message"]["content"]

# Helper to handle PIL image

def run_caption(pil_img: Image.Image) -> str:
    tmp = Path("frame.jpg")
    pil_img.save(tmp)
    return caption_image_file(str(tmp))

# --- Streamlit UI ---
st.set_page_config(page_title="Real-Time Camera Captioning", layout="wide")
st.title("🎥 Real-Time Camera Captioning")

interval = st.sidebar.slider("Interval between captions (seconds)", min_value=1, max_value=10, value=3)
start = st.sidebar.button("Start")
stop = st.sidebar.button("Stop")

if 'running' not in st.session_state:
    st.session_state.running = False

if start:
    st.session_state.running = True
if stop:
    st.session_state.running = False

# Placeholders for video and caption
frame_placeholder = st.empty()
caption_placeholder = st.empty()

# OpenCV camera
cap = cv2.VideoCapture(0)

while st.session_state.running:
    ret, frame = cap.read()
    if not ret:
        st.error("Unable to read from camera.")
        break
    # Convert BGR to RGB
    rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    img = Image.fromarray(rgb)

    # Show frame
    frame_placeholder.image(img, caption="Live Feed", use_container_width=True)

    # Generate and show caption
    with st.spinner("Generating caption..."):
        caption = run_caption(img)
    caption_placeholder.markdown(f"**Caption:** {caption}")

    time.sleep(interval)

cap.release()