|
|
|
import pandas as pd |
|
|
|
pd.set_option('display.max_columns', None) |
|
pd.set_option('display.max_rows', None) |
|
|
|
|
|
df = pd.read_csv(f"Spotify_Youtube.csv") |
|
|
|
|
|
df = df.drop(['Album_type','Uri','Duration_ms'], axis=1) |
|
|
|
|
|
df = df.drop_duplicates() |
|
df = df.dropna() |
|
|
|
import pandas as pd |
|
from sklearn.preprocessing import MinMaxScaler |
|
|
|
def preprocess_song_data(df): |
|
|
|
required_columns = [ |
|
'Title', 'Artist', 'Valence', 'Energy', 'Danceability', |
|
'Tempo', 'Acousticness', 'Instrumentalness' |
|
] |
|
df_clean = df.dropna(subset=required_columns).copy() |
|
|
|
|
|
popularity_cols = ['Likes', 'Views', 'Comments', 'Stream'] |
|
for col in popularity_cols: |
|
if col in df_clean.columns: |
|
df_clean[col] = df_clean[col].fillna(0) |
|
|
|
scaler = MinMaxScaler() |
|
df_clean[popularity_cols] = scaler.fit_transform(df_clean[popularity_cols]) |
|
|
|
|
|
audio_feature_cols = [ |
|
'Valence', 'Energy', 'Danceability', 'Tempo', |
|
'Acousticness', 'Instrumentalness' |
|
] |
|
df_clean['audio_vector'] = df_clean[audio_feature_cols].values.tolist() |
|
|
|
return df_clean, audio_feature_cols, popularity_cols |
|
|
|
df_clean, audio_feature_cols, popularity_cols = preprocess_song_data(df) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline, AutoModelForSeq2SeqLM |
|
from langchain.llms import HuggingFacePipeline |
|
import torch |
|
|
|
model_name = "deepseek-ai/deepseek-llm-7b-chat" |
|
|
|
|
|
|
|
|
|
tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True) |
|
|
|
model = AutoModelForCausalLM.from_pretrained( |
|
model_name, |
|
trust_remote_code=True, |
|
torch_dtype=torch.float16, |
|
device_map="auto", |
|
offload_folder="offload" |
|
) |
|
|
|
hf_pipeline = pipeline( |
|
"text-generation", |
|
model=model, |
|
tokenizer=tokenizer, |
|
max_new_tokens=300 |
|
) |
|
|
|
llm = HuggingFacePipeline(pipeline=hf_pipeline) |
|
|
|
import re, json |
|
|
|
def parse_user_input(user_input): |
|
prompt = f""" |
|
You are an expert music assistant trained to recommend songs for use in creative projects like ads, short films, social media videos, and campaigns. |
|
|
|
Your job is to extract structured information from the user's input so that we can recommend songs from our database based on Spotify audio features and YouTube metadata. |
|
|
|
Only return the following fields in JSON format: |
|
- "mood": overall emotion (e.g., sad, happy, dramatic) |
|
- "context": what kind of scene or use (e.g., wedding, breakup scene, brand ad) |
|
- "preference": how to sort (e.g., likes, views, popularity) |
|
- "reference_song": name of a song the user wants similar songs to |
|
- "genre": intended musical style (e.g., acoustic, pop, ambient) |
|
- "instrumental": 'yes' if vocals aren't needed, otherwise 'no' |
|
- "tempo": slow / medium / fast |
|
- "artist": if they want something from a specific artist |
|
- "gender": if they prefer male / female vocal |
|
- "limit": how many results to show (as a number) |
|
|
|
{{ |
|
"mood": "...", |
|
"context": "...", |
|
"preference": "...", |
|
"reference_song": "...", |
|
"genre": "...", |
|
"instrumental": "...", |
|
"tempo": "...", |
|
"artist": "...", |
|
"gender": "...", |
|
"limit": "..." |
|
}} |
|
|
|
User input: "{user_input}" |
|
|
|
Respond ONLY with the JSON. Do not explain anything. |
|
""" |
|
output = llm(prompt) |
|
print("π§ Raw LLM Output:\n", output) |
|
|
|
try: |
|
json_blocks = re.findall(r"\{[\s\S]*?\}", output) |
|
json_str = json_blocks[-1] if json_blocks else "{}" |
|
parsed = json.loads(json_str) |
|
|
|
|
|
try: |
|
parsed["limit"] = str(int(parsed["limit"])) |
|
except: |
|
parsed["limit"] = "5" |
|
|
|
|
|
expected_keys = ["mood", "context", "preference", "reference_song", "genre", "instrumental", "tempo", "artist", "gender", "limit"] |
|
for key in expected_keys: |
|
if key not in parsed: |
|
parsed[key] = "" if key != "limit" else "5" |
|
|
|
except Exception as e: |
|
print("β JSON parsing failed:", e) |
|
parsed = {key: "" for key in expected_keys} |
|
parsed["limit"] = "5" |
|
|
|
return parsed |
|
|
|
|
|
def mood_to_valence_range(mood): |
|
mood = mood.strip().lower() |
|
mood_map = { |
|
"sad": (0.0, 0.3), |
|
"melancholy": (0.2, 0.4), |
|
"emotional": (0.3, 0.5), |
|
"chill": (0.4, 0.6), |
|
"nostalgic": (0.3, 0.5), |
|
"neutral": (0.4, 0.6), |
|
"hopeful": (0.5, 0.7), |
|
"happy": (0.6, 0.85), |
|
"cheerful": (0.7, 0.9), |
|
"upbeat": (0.7, 1.0), |
|
"energetic": (0.8, 1.0), |
|
"romantic": (0.4, 0.7), |
|
"heartbreak": (0.1, 0.3), |
|
"dark": (0.0, 0.2), |
|
"dramatic": (0.3, 0.5), |
|
"angry": (0.0, 0.3), |
|
"inspiring": (0.6, 0.85), |
|
"relaxing": (0.4, 0.6), |
|
"peaceful": (0.3, 0.5), |
|
"epic": (0.5, 0.8) |
|
} |
|
|
|
|
|
return mood_map.get(mood, (0.0, 1.0)) |
|
|
|
|
|
|
|
import gender_guesser.detector as gender |
|
gender_detector = gender.Detector() |
|
|
|
def infer_gender(artist_name): |
|
try: |
|
first_name = artist_name.split()[0] |
|
guess = gender_detector.get_gender(first_name) |
|
if "female" in guess: return "female" |
|
if "male" in guess: return "male" |
|
return "unknown" |
|
except: |
|
return "unknown" |
|
|
|
df_clean["Gender"] = df_clean["Artist"].apply(infer_gender) |
|
|
|
genre_filters = { |
|
"pop": lambda df: (df["Danceability"] > 0.6) & (df["Energy"] > 0.5) & (df["Acousticness"] < 0.5), |
|
"rock": lambda df: (df["Energy"] > 0.75) & (df["Acousticness"] < 0.4), |
|
"jazz": lambda df: (df["Acousticness"] > 0.6) & (df["Instrumentalness"] > 0.5) & (df["Energy"] < 0.6), |
|
"rnb": lambda df: (df["Danceability"] > 0.6) & (df["Speechiness"] < 0.33) & (df["Acousticness"] > 0.3), |
|
"acoustic": lambda df: df["Acousticness"] > 0.8, |
|
"hip hop": lambda df: (df["Speechiness"] > 0.4) & (df["Danceability"] > 0.6), |
|
"edm": lambda df: (df["Danceability"] > 0.7) & (df["Energy"] > 0.7) & (df["Acousticness"] < 0.3), |
|
"indie": lambda df: (df["Acousticness"] > 0.4) & (df["Energy"] < 0.6), |
|
"dangdut": lambda df: (df["Danceability"] > 0.6) & (df["Speechiness"] < 0.4) & (df["Acousticness"] > 0.4) & (df["Tempo"].between(70, 130)), |
|
"keroncong": lambda df: (df["Acousticness"] > 0.8) & (df["Energy"] < 0.5) & (df["Instrumentalness"] > 0.3), |
|
} |
|
|
|
|
|
def recommend_from_dataframe(parsed, df_clean, randomize = False): |
|
df_filtered = df_clean.copy() |
|
filters_applied = set() |
|
top_n = int(parsed["limit"]) |
|
sort_col = parsed.get("preference", "Likes") or "Likes" |
|
if sort_col not in df_clean.columns: |
|
sort_col = "Likes" |
|
|
|
print("π΅ Total songs before filtering:", len(df_filtered)) |
|
|
|
|
|
val_min, val_max = mood_to_valence_range(parsed["mood"]) |
|
df_filtered = df_filtered[(df_filtered["Valence"] >= val_min) & (df_filtered["Valence"] <= val_max)] |
|
filters_applied.add("Valence") |
|
print("π― Applied mood β Valence filter:", len(df_filtered)) |
|
|
|
|
|
genre = parsed["genre"].strip().lower() |
|
if genre not in ["", "...", "none", "n/a","null","unknown"] and genre in genre_filters and not {"Acousticness", "Energy", "Instrumentalness", "Speechiness", "Tempo"} & filters_applied: |
|
try: |
|
genre_mask = genre_filters[genre](df_filtered) |
|
df_filtered = df_filtered[genre_mask] |
|
filters_applied.update(genre_filters[genre].__code__.co_names) |
|
print(f"π― Applied genre filter for '{genre}':", len(df_filtered)) |
|
except Exception as e: |
|
print(f"β οΈ Failed to apply genre filter for '{genre}':", e) |
|
else: |
|
print("β οΈ Skipped genre filter due to value or overlap.") |
|
|
|
|
|
instrumental = parsed["instrumental"].strip().lower() |
|
if instrumental == "yes" and "Instrumentalness" not in filters_applied: |
|
df_filtered = df_filtered[df_filtered["Instrumentalness"] > 0.000002] |
|
filters_applied.add("Instrumentalness") |
|
print("π― Applied instrumental filter:", len(df_filtered)) |
|
else: |
|
print("β οΈ Skipped instrumental filter (empty or overlap)") |
|
|
|
|
|
tempo = parsed["tempo"].strip().lower() |
|
if tempo not in ["", "...", "none", "n/a","null","unknown"] and "Tempo" not in filters_applied: |
|
if tempo == "fast": |
|
df_filtered = df_filtered[df_filtered["Tempo"] > 120] |
|
elif tempo == "slow": |
|
df_filtered = df_filtered[df_filtered["Tempo"] < 90] |
|
elif tempo == "medium": |
|
df_filtered = df_filtered[(df_filtered["Tempo"] >= 90) & (df_filtered["Tempo"] <= 120)] |
|
filters_applied.add("Tempo") |
|
print("π― Applied tempo filter:", len(df_filtered)) |
|
else: |
|
print("β οΈ Skipped tempo filter (empty or overlap)") |
|
|
|
|
|
gender = parsed["gender"].strip().lower() |
|
if gender not in ["", "...", "none", "n/a","null"]: |
|
if "Gender" in df_filtered.columns: |
|
df_filtered = df_filtered[df_filtered["Gender"].str.lower() == gender] |
|
filters_applied.add("Gender") |
|
print("π― Applied gender filter:", len(df_filtered)) |
|
else: |
|
print("β οΈ Gender column not found β skipping.") |
|
|
|
if randomize: |
|
df_sorted = df_filtered.sample(frac=1).head(top_n) |
|
else: |
|
df_sorted = df_filtered.sort_values(by=sort_col, ascending=False).head(top_n) |
|
|
|
print("β
Final songs returned:", len(df_sorted)) |
|
return df_sorted |
|
|
|
from langchain.prompts import PromptTemplate |
|
|
|
reference_song_prompt = PromptTemplate.from_template(""" |
|
You are a music expert. A user has requested songs similar to a reference song that may not exist in our database. |
|
|
|
Your job is to describe the following audio features of the reference song as accurately as possible, based on your knowledge: |
|
|
|
- mood (e.g. sad, energetic, chill) |
|
- genre (e.g. pop, acoustic, EDM) |
|
- instrumental (yes/no) |
|
- tempo (slow, medium, fast) |
|
- artist (who performed the song) |
|
- gender (male, female, group, unknown) |
|
|
|
Respond ONLY in the following JSON format: |
|
{ |
|
"mood": "...", |
|
"genre": "...", |
|
"instrumental": "...", |
|
"tempo": "...", |
|
"artist": "...", |
|
"gender": "..." |
|
} |
|
|
|
Reference song: "{reference_song}" |
|
|
|
Return only the JSON. Do not include any other explanation. |
|
""") |
|
|
|
import re |
|
import json |
|
|
|
def recommend_by_reference_song(parsed, df_clean, audio_feature_cols, llm, reference_song_prompt): |
|
reference_title = parsed["reference_song"].strip().lower() |
|
top_n = int(parsed.get("limit", 5)) |
|
sort_col = parsed.get("preference", "Likes") or "Likes" |
|
if sort_col not in df_clean.columns: |
|
sort_col = "Likes" |
|
|
|
|
|
matches = df_clean[df_clean["Title"].str.lower().str.contains(reference_title, na=False)] |
|
|
|
if not matches.empty: |
|
print("β
Reference song found in catalog β using similarity-based recommendation.") |
|
|
|
from sklearn.metrics.pairwise import cosine_similarity |
|
import numpy as np |
|
|
|
ref_vec = np.array(matches[audio_feature_cols].values[0]).reshape(1, -1) |
|
all_vecs = np.array(df_clean[audio_feature_cols]) |
|
sims = cosine_similarity(ref_vec, all_vecs)[0] |
|
|
|
df_clean["similarity"] = sims |
|
results = df_clean[df_clean["Title"].str.lower() != reference_title] |
|
return results.sort_values("similarity", ascending=False).head(top_n) |
|
|
|
else: |
|
print("π§ Reference song NOT in catalog β asking LLM to describe its features.") |
|
|
|
prompt = reference_song_prompt.format(reference_song=parsed["reference_song"]) |
|
llm_output = llm(prompt) |
|
print("π LLM Output:", llm_output) |
|
|
|
|
|
try: |
|
json_str = re.search(r"\{[\s\S]*?\}", llm_output).group() |
|
ref_features = json.loads(json_str) |
|
except: |
|
print("β οΈ Failed to parse LLM output. Using fallback.") |
|
ref_features = { |
|
"mood": "", "genre": "", "instrumental": "", "tempo": "", |
|
"artist": "", "gender": "", "limit": parsed.get("limit", "5") |
|
} |
|
|
|
|
|
ref_features["limit"] = parsed.get("limit", "5") |
|
ref_features["preference"] = parsed.get("preference", "Likes") |
|
|
|
print("π― Parsed features from reference song:", ref_features) |
|
|
|
return recommend_from_dataframe(ref_features, df_clean) |
|
|
|
def extract_explanation(text): |
|
split_text = text.split("Write a 2-sentence explanation.") |
|
return split_text[1].strip() if len(split_text) > 1 else text.strip() |
|
|
|
def generate_explanation(user_input, song_row): |
|
prompt = f""" |
|
User asked: "{user_input}" |
|
Why is this song a good fit? |
|
|
|
- Title: {song_row['Title']} |
|
- Artist: {song_row['Artist']} |
|
- Valence: {song_row['Valence']} |
|
- Tempo: {song_row['Tempo']} |
|
- Instrumentalness: {song_row['Instrumentalness']} |
|
- Description: {song_row.get('Description', '')} |
|
|
|
Write a 2-sentence explanation. |
|
""" |
|
raw = llm(prompt) |
|
|
|
|
|
if isinstance(raw, str): |
|
return extract_explanation(raw) |
|
|
|
|
|
if isinstance(raw, list) and "generated_text" in raw[0]: |
|
return extract_explanation(raw[0]["generated_text"]) |
|
|
|
|
|
return str(raw) |
|
|
|
def handle_query(input_text): |
|
print("π₯ User input received:", input_text) |
|
|
|
try: |
|
parsed = parse_user_input(input_text) |
|
print("β
Parsed:", parsed) |
|
|
|
ref_song = parsed.get("reference_song") or "" |
|
if ref_song.strip().lower() not in ["", "...", "none", "n/a"]: |
|
results = recommend_by_reference_song(parsed, df_clean, audio_feature_cols, llm, reference_song_prompt) |
|
else: |
|
results = recommend_from_dataframe(parsed, df_clean) |
|
|
|
results = results.copy() |
|
|
|
print("β
Final results shape:", results.shape) |
|
|
|
|
|
return results, results |
|
|
|
except Exception as e: |
|
print("β ERROR:", e) |
|
empty_df = pd.DataFrame(columns=["Title", "Artist", "Explanation"]) |
|
return empty_df, empty_df |
|
|
|
|
|
|
|
import gradio as gr |
|
import pandas as pd |
|
import datetime |
|
|
|
|
|
feedback_df = pd.DataFrame(columns=["Query", "Title", "Artist", "Feedback"]) |
|
|
|
|
|
def submit_feedback(query, title, artist, feedback_type): |
|
global feedback_df |
|
new_feedback = pd.DataFrame([{ |
|
"Query": query, |
|
"Title": title, |
|
"Artist": artist, |
|
"Feedback": feedback_type |
|
}]) |
|
feedback_df = pd.concat([feedback_df, new_feedback], ignore_index=True) |
|
return f"β
Feedback recorded for {title}: {feedback_type}" |
|
|
|
def save_results(results_df): |
|
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") |
|
path = f"recommendations_{timestamp}.csv" |
|
results_df.to_csv(path, index=False) |
|
return path |
|
|
|
def save_feedback(): |
|
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") |
|
path = f"feedback_log_{timestamp}.csv" |
|
feedback_df.to_csv(path, index=False) |
|
return path |
|
|
|
|
|
with gr.Blocks() as demo: |
|
gr.Markdown("## π§ Song Recommender") |
|
|
|
|
|
user_input = gr.Textbox(label="What kind of song are you looking for?") |
|
btn_run = gr.Button("π Recommend Songs | It will takes 5-15 minutes (or longer) for each prompt. Please be patientπ") |
|
|
|
|
|
song_output = gr.Dataframe(label="π΅ Recommended Songs") |
|
results_state = gr.State() |
|
|
|
|
|
download_btn = gr.Button("β¬οΈ Download Recommendations CSV", visible=True) |
|
download_file = gr.File() |
|
|
|
|
|
def run_query_and_show_download(input_text): |
|
df_display, df_full = handle_query(input_text) |
|
return df_display, df_full, gr.update(visible=True) |
|
|
|
btn_run.click( |
|
fn=run_query_and_show_download, |
|
inputs=[user_input], |
|
outputs=[song_output, results_state, download_btn] |
|
) |
|
|
|
download_btn.click(fn=save_results, inputs=[results_state], outputs=download_file) |
|
|
|
|
|
gr.Markdown("### π§ Optional Feedback") |
|
toggle_feedback_btn = gr.Button("βοΈ Give Feedback") |
|
feedback_group = gr.Group(visible=False) |
|
|
|
with feedback_group: |
|
with gr.Row(): |
|
feedback_title = gr.Textbox(label="Song Title") |
|
feedback_artist = gr.Textbox(label="Artist") |
|
|
|
with gr.Row(): |
|
btn_like = gr.Button("π Relevant") |
|
btn_dislike = gr.Button("π Not Relevant") |
|
|
|
feedback_response = gr.Textbox(label="Feedback Message") |
|
|
|
toggle_feedback_btn.click(lambda: gr.update(visible=True), None, outputs=[feedback_group]) |
|
|
|
btn_like.click( |
|
fn=submit_feedback, |
|
inputs=[user_input, feedback_title, feedback_artist, gr.State("π")], |
|
outputs=feedback_response |
|
) |
|
|
|
btn_dislike.click( |
|
fn=submit_feedback, |
|
inputs=[user_input, feedback_title, feedback_artist, gr.State("π")], |
|
outputs=feedback_response |
|
) |
|
|
|
|
|
download_feedback_btn = gr.Button("β¬οΈ Download Feedback Log CSV", visible=True) |
|
download_feedback_file = gr.File() |
|
download_feedback_btn.click(fn=save_feedback, outputs=download_feedback_file) |
|
|
|
demo.launch() |