LiveCC / app.py
chenjoya's picture
fix
138eaec
raw
history blame
7.47 kB
import os
import gradio as gr
from demo.infer import LiveCCDemoInfer
class GradioBackend:
waiting_video_response = 'Waiting for video input...'
not_found_video_response = 'Video does not exist...'
mode2api = {
'Real-Time Commentary': 'live_cc',
'Conversation': 'video_qa'
}
def __init__(self, model_path: str = 'chenjoya/LiveCC-7B-Instruct'):
self.infer = LiveCCDemoInfer(model_path)
from kokoro import KPipeline
self.audio_pipeline = KPipeline(lang_code='a')
def __call__(self, query: str = None, state: dict = {}, mode: str = 'Real-Time Commentary', **kwargs):
return getattr(self.infer, self.mode2api[mode])(query=query, state=state, **kwargs)
gradio_backend = GradioBackend()
with gr.Blocks() as demo:
gr.Markdown("## LiveCC Real-Time Commentary and Conversation - Gradio Demo")
gr.Markdown("#### [LiveCC: Learning Video LLM with Streaming Speech Transcription at Scale](https://showlab.github.io/livecc/)")
gr_state = gr.State({}, render=False) # control all useful state, including kv cache
gr_video_state = gr.JSON({}, visible=False) # only record video state, belong to gr_state but lightweight
gr_static_trigger = gr.Number(value=0, visible=False) # control start streaming or stop
gr_dynamic_trigger = gr.Number(value=0, visible=False) # for continuous refresh
with gr.Row():
with gr.Column():
gr_video = gr.Video(
label="video",
elem_id="gr_video",
visible=True,
sources=['upload'],
autoplay=True,
include_audio=False,
width=720,
height=480
)
gr_examples = gr.Examples(
examples=[
'demo/sources/howto_fix_laptop_mute_720p.mp4',
'demo/sources/howto_fix_laptop_mute_1080p.mp4',
],
inputs=[gr_video],
)
gr_clean_button = gr.Button("Clean (Press me before changing video)", elem_id="gr_button")
with gr.Column():
with gr.Row():
gr_radio_mode = gr.Radio(label="Select Mode", choices=["Real-Time Commentary", "Conversation"], elem_id="gr_radio_mode", value='Real-Time Commentary', interactive=True)
def gr_chatinterface_fn(message, history, state, mode):
response, state = gradio_backend(query=message, state=state, mode=mode)
return response, state
def gr_chatinterface_chatbot_clear_fn():
return {}, {}, 0, 0
gr_chatinterface = gr.ChatInterface(
fn=gr_chatinterface_fn,
type="messages",
additional_inputs=[gr_state, gr_radio_mode],
additional_outputs=[gr_state],
)
gr_chatinterface.chatbot.clear(fn=gr_chatinterface_chatbot_clear_fn, outputs=[gr_video_state, gr_state, gr_static_trigger, gr_dynamic_trigger])
gr_clean_button.click(fn=lambda :[[], *gr_chatinterface_chatbot_clear_fn()], outputs=[gr_video_state, gr_state, gr_static_trigger, gr_dynamic_trigger])
def gr_for_streaming(history: list[gr.ChatMessage], video_state: dict, state: dict, mode: str, static_trigger: int, dynamic_trigger: int):
# if static_trigger == 0:
# return gr_chatinterface_chatbot_clear_fn()
# if video_state['video_path'] != state.get('video_path', None):
# return gr_chatinterface_chatbot_clear_fn()
state.update(video_state)
query, assistant_waiting_message = None, None
for message in history[::-1]:
if message['role'] == 'user':
if message['metadata'] is None or message['metadata'].get('status', '') == '':
query = message['content']
if message['metadata'] is None:
message['metadata'] = {}
message['metadata']['status'] = 'pending'
continue
if query is not None: # put others as done
message['metadata']['status'] = 'done'
elif message['content'] == GradioBackend.waiting_video_response:
assistant_waiting_message = message
for (start_timestamp, stop_timestamp), response, state in gradio_backend(query=query, state=state, mode=mode):
if start_timestamp >= 0:
response_with_timestamp = f'{start_timestamp:.1f}s-{stop_timestamp:.1f}s: {response}'
if assistant_waiting_message is None:
history.append(gr.ChatMessage(role="assistant", content=response_with_timestamp))
else:
assistant_waiting_message['content'] = response_with_timestamp
assistant_waiting_message = None
yield history, state, dynamic_trigger
yield history, state, 1 - dynamic_trigger
js_video_timestamp_fetcher = """
(state, video_state) => {
const videoEl = document.querySelector("#gr_video video");
return { video_path: videoEl.currentSrc, video_timestamp: videoEl.currentTime };
}
"""
gr_video.change(fn=lambda :[1,1], outputs=[gr_static_trigger, gr_dynamic_trigger])
def gr_get_video_state(video_state):
print(video_state)
if 'file=' in video_state['video_path']:
video_state['video_path'] = video_state['video_path'].split('file=')[1]
return video_state
gr_dynamic_trigger.change(
fn=gr_get_video_state,
inputs=[gr_video_state],
outputs=[gr_video_state],
js=js_video_timestamp_fetcher
).then(
fn=gr_for_streaming,
inputs=[gr_chatinterface.chatbot, gr_video_state, gr_state, gr_radio_mode, gr_static_trigger, gr_dynamic_trigger],
outputs=[gr_chatinterface.chatbot, gr_state, gr_dynamic_trigger],
)
demo.queue(max_size=5, default_concurrency_limit=5)
demo.launch(share=True)
# --- for streaming ---
# gr_tts = gr.Audio(visible=False, elem_id="gr_tts", streaming=True, autoplay=True)
# def tts():
# while True:
# contents = ''
# while not gradio_backend.contents.empty():
# content = gradio_backend.contents.get()
# contents += ' ' + content.rstrip(' ...')
# contents = contents.strip()
# if contents:
# generator = gradio_backend.audio_pipeline(contents, voice='af_heart', speed=1.2)
# for _, _, audio_torch in generator:
# audio_np = audio_torch.cpu().numpy()
# max_val = np.max(np.abs(audio_np))
# if max_val > 0:
# audio_np = audio_np / max_val
# audio_int16 = (audio_np * 32767).astype(np.int16)
# yield (24000, audio_int16)
# gr_video.change(fn=tts, outputs=[gr_tts])