|
import glob |
|
import streamlit as st |
|
import wget |
|
from PIL import Image |
|
import torch |
|
import cv2 |
|
import os |
|
import time |
|
|
|
st.set_page_config(layout="wide") |
|
|
|
cfg_model_path = 'models/yolov5s.pt' |
|
model = None |
|
confidence = .25 |
|
|
|
|
|
def image_input(data_src): |
|
img_file = None |
|
if data_src == 'Sample data': |
|
|
|
img_path = glob.glob('data/sample_images/*') |
|
img_slider = st.slider("Select a test image.", min_value=1, max_value=len(img_path), step=1) |
|
img_file = img_path[img_slider - 1] |
|
else: |
|
img_bytes = st.sidebar.file_uploader("Upload an image", type=['png', 'jpeg', 'jpg',"jfif","iff"]) |
|
if img_bytes: |
|
img_file = "data/uploaded_data/upload." + img_bytes.name.split('.')[-1] |
|
Image.open(img_bytes).save(img_file) |
|
|
|
if img_file: |
|
col1, col2 = st.columns(2) |
|
with col1: |
|
st.image(img_file, caption="Selected Image") |
|
with col2: |
|
img = infer_image(img_file) |
|
st.image(img, caption="Model prediction") |
|
|
|
|
|
def video_input(data_src): |
|
vid_file = None |
|
if data_src == 'Sample data': |
|
vid_file = "data/sample_videos/sample.mp4" |
|
else: |
|
vid_bytes = st.sidebar.file_uploader("Upload a video", type=['mp4', 'mpv', 'avi']) |
|
if vid_bytes: |
|
vid_file = "data/uploaded_data/upload." + vid_bytes.name.split('.')[-1] |
|
with open(vid_file, 'wb') as out: |
|
out.write(vid_bytes.read()) |
|
|
|
if vid_file: |
|
cap = cv2.VideoCapture(vid_file) |
|
custom_size = st.sidebar.checkbox("Custom frame size") |
|
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
|
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
|
if custom_size: |
|
width = st.sidebar.number_input("Width", min_value=120, step=20, value=width) |
|
height = st.sidebar.number_input("Height", min_value=120, step=20, value=height) |
|
|
|
fps = 0 |
|
st1, st2, st3 = st.columns(3) |
|
with st1: |
|
st.markdown("## Height") |
|
st1_text = st.markdown(f"{height}") |
|
with st2: |
|
st.markdown("## Width") |
|
st2_text = st.markdown(f"{width}") |
|
with st3: |
|
st.markdown("## FPS") |
|
st3_text = st.markdown(f"{fps}") |
|
|
|
st.markdown("---") |
|
output = st.empty() |
|
prev_time = 0 |
|
curr_time = 0 |
|
update_frequency = 5 |
|
frames_processed = 0 |
|
last_fps_update = time.time() |
|
|
|
while True: |
|
ret, frame = cap.read() |
|
if not ret: |
|
st.write("Can't read frame, stream ended? Exiting ....") |
|
break |
|
|
|
frames_processed += 1 |
|
if frames_processed >= update_frequency: |
|
frames_processed = 0 |
|
|
|
frame = cv2.resize(frame, (width, height)) |
|
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
|
output_img = infer_image(frame) |
|
|
|
|
|
output.image(output_img) |
|
|
|
curr_time = time.time() |
|
fps = 1 / (curr_time - prev_time) |
|
prev_time = curr_time |
|
|
|
|
|
if curr_time - last_fps_update >= 1: |
|
st3_text.markdown(f"**{fps:.2f}**") |
|
last_fps_update = curr_time |
|
|
|
cap.release() |
|
|
|
|
|
def infer_image(img, size=None): |
|
model.conf = confidence |
|
result = model(img, size=size) if size else model(img) |
|
result.render() |
|
image = Image.fromarray(result.ims[0]) |
|
return image |
|
|
|
|
|
@st.cache_resource |
|
def load_model(path, device): |
|
model_ = torch.hub.load('ultralytics/yolov5', 'custom', path=path, force_reload=True) |
|
model_.to(device) |
|
print("model to ", device) |
|
return model_ |
|
|
|
|
|
@st.cache_resource |
|
def download_model(url): |
|
model_file = wget.download(url, out="models") |
|
return model_file |
|
|
|
|
|
def get_user_model(): |
|
model_src = st.sidebar.radio("Model source", ["file upload", "url"]) |
|
model_file = None |
|
if model_src == "file upload": |
|
model_bytes = st.sidebar.file_uploader("Upload a model file", type=['pt']) |
|
if model_bytes: |
|
model_file = "models/uploaded_" + model_bytes.name |
|
with open(model_file, 'wb') as out: |
|
out.write(model_bytes.read()) |
|
else: |
|
url = st.sidebar.text_input("model url") |
|
if url: |
|
model_file_ = download_model(url) |
|
if model_file_.split(".")[-1] == "pt": |
|
model_file = model_file_ |
|
|
|
return model_file |
|
|
|
def main(): |
|
|
|
global model, confidence, cfg_model_path |
|
|
|
st.title("Object Recognition Dashboard") |
|
|
|
st.sidebar.title("Settings") |
|
|
|
|
|
model_src = st.sidebar.radio("Select yolov5 weight file", ["Use our demo model 5s", "Use your own model"]) |
|
|
|
if model_src == "Use your own model": |
|
user_model_path = get_user_model() |
|
if user_model_path: |
|
cfg_model_path = user_model_path |
|
|
|
st.sidebar.text(cfg_model_path.split("/")[-1]) |
|
st.sidebar.markdown("---") |
|
|
|
|
|
if not os.path.isfile(cfg_model_path): |
|
st.warning("Model file not available!!!, please added to the model folder.", icon="⚠️") |
|
else: |
|
|
|
if torch.cuda.is_available(): |
|
device_option = st.sidebar.radio("Select Device", ['cpu', 'cuda'], disabled=False, index=0) |
|
else: |
|
device_option = st.sidebar.radio("Select Device", ['cpu', 'cuda'], disabled=True, index=0) |
|
|
|
|
|
model = load_model(cfg_model_path, device_option) |
|
|
|
|
|
confidence = st.sidebar.slider('Confidence', min_value=0.1, max_value=1.0, value=.45) |
|
|
|
|
|
if st.sidebar.checkbox("Custom Classes"): |
|
model_names = list(model.names.values()) |
|
assigned_class = st.sidebar.multiselect("Select Classes", model_names, default=[model_names[0]]) |
|
classes = [model_names.index(name) for name in assigned_class] |
|
model.classes = classes |
|
else: |
|
model.classes = list(model.names.keys()) |
|
|
|
st.sidebar.markdown("---") |
|
|
|
|
|
input_option = st.sidebar.radio("Select input type: ", ['image', 'video']) |
|
|
|
|
|
data_src = st.sidebar.radio("Select input source: ", ['Sample data', 'Upload your own data']) |
|
|
|
if input_option == 'image': |
|
image_input(data_src) |
|
else: |
|
video_input(data_src) |
|
|
|
|
|
if __name__ == "__main__": |
|
try: |
|
main() |
|
except SystemExit: |
|
pass |
|
|