Update main.py
Browse files
main.py
CHANGED
@@ -16,15 +16,16 @@ logging.basicConfig(level=logging.INFO,
|
|
16 |
|
17 |
logger = logging.getLogger(__name__)
|
18 |
|
19 |
-
|
20 |
# MongoDB connection setup
|
21 |
db_name = 'property-listing'
|
22 |
collection_name = 'activities'
|
|
|
23 |
connection_string = os.getenv('CONNECTION_STRING')
|
24 |
|
25 |
client = MongoClient(connection_string)
|
26 |
db = client[db_name]
|
27 |
collection = db[collection_name]
|
|
|
28 |
|
29 |
# Load pre-trained SVD model and user-item matrix columns
|
30 |
svd = joblib.load('svd_model.joblib')
|
@@ -67,28 +68,30 @@ async def check_for_new_session():
|
|
67 |
logger.error(f"Error in check_for_new_session: {e}")
|
68 |
await asyncio.sleep(5) # Wait before retrying
|
69 |
|
70 |
-
def
|
71 |
try:
|
72 |
-
# Retrieve all documents for the given session
|
73 |
session_data = list(collection.find({'sessionId': session_id}))
|
74 |
if not session_data:
|
75 |
logger.warning(f"No data found for session {session_id}")
|
76 |
return None
|
77 |
|
78 |
-
# Convert session data to a DataFrame
|
79 |
raw_df = pd.DataFrame(session_data)
|
80 |
-
|
81 |
-
# Debug: Print column names
|
82 |
logger.debug(f"Columns in raw_df: {raw_df.columns.tolist()}")
|
83 |
-
|
84 |
-
# Check if required columns exist
|
85 |
required_columns = ['id', 'action']
|
86 |
missing_columns = [col for col in required_columns if col not in raw_df.columns]
|
87 |
if missing_columns:
|
88 |
logger.error(f"Missing required columns: {missing_columns}")
|
89 |
return None
|
90 |
|
91 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
92 |
if 'duration' in raw_df.columns:
|
93 |
aggregated_data = raw_df.groupby(['id', 'action']).agg(
|
94 |
presence=('action', 'size'),
|
@@ -99,7 +102,6 @@ def generate_recommendations_for_session(session_id):
|
|
99 |
presence=('action', 'size')
|
100 |
).reset_index()
|
101 |
|
102 |
-
# Create pivot table
|
103 |
pivot_columns = ['presence', 'total_duration'] if 'duration' in raw_df.columns else ['presence']
|
104 |
pivot_df = aggregated_data.pivot_table(
|
105 |
index=['id'],
|
@@ -108,45 +110,100 @@ def generate_recommendations_for_session(session_id):
|
|
108 |
fill_value=0
|
109 |
)
|
110 |
|
111 |
-
# Flatten column names
|
112 |
pivot_df.columns = ['_'.join(col).strip() for col in pivot_df.columns.values]
|
113 |
|
114 |
-
# Ensure all expected columns exist in the pivot table
|
115 |
for col in ALL_COLUMNS:
|
116 |
if f'presence_{col}' not in pivot_df.columns and col != 'time_spent':
|
117 |
pivot_df[f'presence_{col}'] = 0
|
118 |
elif col == 'time_spent' and 'duration' in raw_df.columns and 'total_duration_time_spent' not in pivot_df.columns:
|
119 |
pivot_df['total_duration_time_spent'] = 0
|
120 |
|
121 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
122 |
pivot_df['interaction_score'] = pivot_df.apply(calculate_interaction_score, axis=1)
|
123 |
|
124 |
-
# Create a user vector based on the interaction scores
|
125 |
user_vector = pd.Series(index=user_item_matrix_columns, dtype=float).fillna(0)
|
126 |
for property_id, score in pivot_df['interaction_score'].items():
|
127 |
if property_id in user_vector.index:
|
128 |
user_vector[property_id] = score
|
129 |
|
130 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
131 |
user_vector_array = user_vector.values.reshape(1, -1)
|
132 |
user_latent = svd.transform(user_vector_array)
|
133 |
|
134 |
-
# Calculate similarity scores between the user vector and item factors
|
135 |
similarity_scores = cosine_similarity(user_latent, item_factors)
|
136 |
-
|
137 |
-
# Get the indices of the top 10 most similar items
|
138 |
top_indices = similarity_scores.argsort()[0][-10:][::-1]
|
139 |
-
|
140 |
-
# Get the corresponding property IDs for the top indices
|
141 |
recommendations = user_item_matrix_columns[top_indices].tolist()
|
142 |
|
143 |
return recommendations
|
144 |
|
145 |
except Exception as e:
|
146 |
-
logger.error(f"Error in
|
147 |
-
logger.debug(f"Raw dataframe info: {raw_df.info()}")
|
148 |
return None
|
149 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
150 |
|
151 |
def calculate_interaction_score(row):
|
152 |
try:
|
@@ -200,4 +257,3 @@ async def get_recommendations():
|
|
200 |
else:
|
201 |
logger.info("No recommendations available")
|
202 |
return []
|
203 |
-
|
|
|
16 |
|
17 |
logger = logging.getLogger(__name__)
|
18 |
|
|
|
19 |
# MongoDB connection setup
|
20 |
db_name = 'property-listing'
|
21 |
collection_name = 'activities'
|
22 |
+
user_recommendation_collection_name = 'user_recommendation_collection'
|
23 |
connection_string = os.getenv('CONNECTION_STRING')
|
24 |
|
25 |
client = MongoClient(connection_string)
|
26 |
db = client[db_name]
|
27 |
collection = db[collection_name]
|
28 |
+
user_recommendation_collection = db[user_recommendation_collection_name]
|
29 |
|
30 |
# Load pre-trained SVD model and user-item matrix columns
|
31 |
svd = joblib.load('svd_model.joblib')
|
|
|
68 |
logger.error(f"Error in check_for_new_session: {e}")
|
69 |
await asyncio.sleep(5) # Wait before retrying
|
70 |
|
71 |
+
def get_session_data(session_id):
|
72 |
try:
|
|
|
73 |
session_data = list(collection.find({'sessionId': session_id}))
|
74 |
if not session_data:
|
75 |
logger.warning(f"No data found for session {session_id}")
|
76 |
return None
|
77 |
|
|
|
78 |
raw_df = pd.DataFrame(session_data)
|
|
|
|
|
79 |
logger.debug(f"Columns in raw_df: {raw_df.columns.tolist()}")
|
80 |
+
|
|
|
81 |
required_columns = ['id', 'action']
|
82 |
missing_columns = [col for col in required_columns if col not in raw_df.columns]
|
83 |
if missing_columns:
|
84 |
logger.error(f"Missing required columns: {missing_columns}")
|
85 |
return None
|
86 |
|
87 |
+
return raw_df
|
88 |
+
|
89 |
+
except Exception as e:
|
90 |
+
logger.error(f"Error in get_session_data: {str(e)}")
|
91 |
+
return None
|
92 |
+
|
93 |
+
def create_pivot_table(raw_df):
|
94 |
+
try:
|
95 |
if 'duration' in raw_df.columns:
|
96 |
aggregated_data = raw_df.groupby(['id', 'action']).agg(
|
97 |
presence=('action', 'size'),
|
|
|
102 |
presence=('action', 'size')
|
103 |
).reset_index()
|
104 |
|
|
|
105 |
pivot_columns = ['presence', 'total_duration'] if 'duration' in raw_df.columns else ['presence']
|
106 |
pivot_df = aggregated_data.pivot_table(
|
107 |
index=['id'],
|
|
|
110 |
fill_value=0
|
111 |
)
|
112 |
|
|
|
113 |
pivot_df.columns = ['_'.join(col).strip() for col in pivot_df.columns.values]
|
114 |
|
|
|
115 |
for col in ALL_COLUMNS:
|
116 |
if f'presence_{col}' not in pivot_df.columns and col != 'time_spent':
|
117 |
pivot_df[f'presence_{col}'] = 0
|
118 |
elif col == 'time_spent' and 'duration' in raw_df.columns and 'total_duration_time_spent' not in pivot_df.columns:
|
119 |
pivot_df['total_duration_time_spent'] = 0
|
120 |
|
121 |
+
return pivot_df
|
122 |
+
|
123 |
+
except Exception as e:
|
124 |
+
logger.error(f"Error in create_pivot_table: {str(e)}")
|
125 |
+
return None
|
126 |
+
|
127 |
+
def create_user_vector(pivot_df):
|
128 |
+
try:
|
129 |
pivot_df['interaction_score'] = pivot_df.apply(calculate_interaction_score, axis=1)
|
130 |
|
|
|
131 |
user_vector = pd.Series(index=user_item_matrix_columns, dtype=float).fillna(0)
|
132 |
for property_id, score in pivot_df['interaction_score'].items():
|
133 |
if property_id in user_vector.index:
|
134 |
user_vector[property_id] = score
|
135 |
|
136 |
+
return user_vector
|
137 |
+
|
138 |
+
except Exception as e:
|
139 |
+
logger.error(f"Error in create_user_vector: {str(e)}")
|
140 |
+
return None
|
141 |
+
|
142 |
+
def generate_recommendations(user_vector):
|
143 |
+
try:
|
144 |
user_vector_array = user_vector.values.reshape(1, -1)
|
145 |
user_latent = svd.transform(user_vector_array)
|
146 |
|
|
|
147 |
similarity_scores = cosine_similarity(user_latent, item_factors)
|
|
|
|
|
148 |
top_indices = similarity_scores.argsort()[0][-10:][::-1]
|
|
|
|
|
149 |
recommendations = user_item_matrix_columns[top_indices].tolist()
|
150 |
|
151 |
return recommendations
|
152 |
|
153 |
except Exception as e:
|
154 |
+
logger.error(f"Error in generate_recommendations: {str(e)}")
|
|
|
155 |
return None
|
156 |
|
157 |
+
def generate_recommendations_for_session(session_id):
|
158 |
+
try:
|
159 |
+
raw_df = get_session_data(session_id)
|
160 |
+
if raw_df is None:
|
161 |
+
return None
|
162 |
+
|
163 |
+
pivot_df = create_pivot_table(raw_df)
|
164 |
+
if pivot_df is None:
|
165 |
+
return None
|
166 |
+
|
167 |
+
user_vector = create_user_vector(pivot_df)
|
168 |
+
if user_vector is None:
|
169 |
+
return None
|
170 |
+
|
171 |
+
recommendations = generate_recommendations(user_vector)
|
172 |
+
|
173 |
+
# Check if recommendations already exist for the session
|
174 |
+
existing_recommendations = user_recommendation_collection.find_one({"sessionId": session_id})
|
175 |
+
|
176 |
+
if existing_recommendations:
|
177 |
+
# Compare the existing recommendations with the new recommendations
|
178 |
+
if existing_recommendations["recommendations"] != recommendations:
|
179 |
+
# Update the recommendations if they are different
|
180 |
+
recommendation_data = {
|
181 |
+
"sessionId": session_id,
|
182 |
+
"recommendations": recommendations,
|
183 |
+
"timestamp": datetime.now()
|
184 |
+
}
|
185 |
+
user_recommendation_collection.update_one(
|
186 |
+
{"sessionId": session_id},
|
187 |
+
{"$set": recommendation_data}
|
188 |
+
)
|
189 |
+
logger.info(f"Updated recommendations for session {session_id}: {recommendations}")
|
190 |
+
else:
|
191 |
+
logger.info(f"Recommendations for session {session_id} remain unchanged")
|
192 |
+
else:
|
193 |
+
# Save the recommendations if they don't exist for the session
|
194 |
+
recommendation_data = {
|
195 |
+
"sessionId": session_id,
|
196 |
+
"recommendations": recommendations,
|
197 |
+
"timestamp": datetime.now()
|
198 |
+
}
|
199 |
+
user_recommendation_collection.insert_one(recommendation_data)
|
200 |
+
logger.info(f"Saved recommendations for session {session_id}: {recommendations}")
|
201 |
+
|
202 |
+
return recommendations
|
203 |
+
|
204 |
+
except Exception as e:
|
205 |
+
logger.error(f"Error in generate_recommendations_for_session: {str(e)}")
|
206 |
+
return None
|
207 |
|
208 |
def calculate_interaction_score(row):
|
209 |
try:
|
|
|
257 |
else:
|
258 |
logger.info("No recommendations available")
|
259 |
return []
|
|