nano_bot / app.py
Dooratre's picture
Update app.py
8720f86 verified
import os
import json
import time
import threading
import re
from flask import Flask, render_template, request, redirect, url_for, jsonify, session
from flask_socketio import SocketIO, emit
from functools import wraps
from database.db import fetch_json_from_github, fetch_authenticity_token_and_commit_oid, update_user_json_file
from database.db import fetch_json_from_github as fetch_user_json
from database.db import update_user_json_file as update_user_json
from database.db import fetch_authenticity_token_and_commit_oid as fetch_user_tokens
from database.msg import fetch_json_from_github as fetch_messages_json
from database.msg import update_user_json_file as update_messages_json
from database.msg import fetch_authenticity_token_and_commit_oid as fetch_messages_tokens
app = Flask(__name__)
app.secret_key = os.urandom(24)
socketio = SocketIO(app, cors_allowed_origins="*")
# Cache for user data and messages
user_data_cache = {}
messages_cache = {}
last_fetch_time = 0
CACHE_TIMEOUT = 60 # seconds
# Background thread for checking updates
thread = None
thread_lock = threading.Lock()
def login_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if 'logged_in' not in session or not session['logged_in']:
return redirect(url_for('login'))
return f(*args, **kwargs)
return decorated_function
def background_thread():
"""Background thread for checking updates in messages"""
while True:
with thread_lock:
# Check for new messages every 10 seconds
fetch_and_update_messages()
socketio.sleep(10)
def fetch_and_update_messages():
"""Fetch messages and notify clients if there are changes"""
global messages_cache
# Only fetch if we have active users
if not user_data_cache:
return
for page_id in user_data_cache.get('page_ids', []):
result = fetch_messages_json()
if result['success']:
new_messages = result['data'].get(page_id, [])
old_messages = messages_cache.get(page_id, [])
# Check if there are new messages
if len(new_messages) > len(old_messages):
messages_cache[page_id] = new_messages
# Notify clients about new messages
socketio.emit('new_messages', {
'page_id': page_id,
'count': len(new_messages) - len(old_messages)
})
def get_user_data(force_refresh=False):
"""Get user data from GitHub or cache"""
global user_data_cache, last_fetch_time
current_time = time.time()
if force_refresh or not user_data_cache or (current_time - last_fetch_time) > CACHE_TIMEOUT:
result = fetch_user_json()
if result['success']:
# Process the data to make it easier to work with
pages = result['data'].get('pages', [])
page_ids = [page.get('page_id') for page in pages]
user_data_cache = {
'pages': pages,
'page_ids': page_ids,
'raw_data': result['data']
}
last_fetch_time = current_time
return user_data_cache
return user_data_cache
def get_messages(page_id, force_refresh=False):
"""Get messages for a specific page from GitHub or cache"""
global messages_cache
if force_refresh or page_id not in messages_cache:
result = fetch_messages_json()
if result['success']:
messages = result['data'].get(page_id, [])
messages_cache[page_id] = messages
return messages
return messages_cache.get(page_id, [])
@socketio.on('save_user_data')
def handle_save_user_data(data):
page_id = data.get('page_id')
page_access_token = data.get('page_token')
username = data.get('username')
password = data.get('password')
page_name = data.get('page_name', '') # Get page name if available
# Validate password
if not is_valid_password(password):
emit('registration_response', {'success': False, 'message': 'كلمة المرور غير صالحة. يجب أن تحتوي على 8 أحرف على الأقل، وحرف كبير، وحرف صغير، ورقم، ورمز خاص.'})
return
# Create the new system message format
default_system_message = f"you are AI MODEL from NANO BOT application System your task is connect to facebook pages and use messanger to response to customers\n\nTHE PAGE NAME YOU CONNECTED : {page_name}\n\n\nyou can add order to Central ORDER for this page by using the <order>put_here_your_message</order> \n\n use this format you can send the order to ORDER CNTER for page {page_name}\n\nEXAMPLE : \n\n<order>the (put_user_full_name) wants to buy (anything) his num phone etc.... address....</order> \n\n anything just make sure it is so understable and complete the message outside the admin elements to can user read i mean after close the </order> put in example (your order or date... sent succse adminstator will see it and response it to you thank you etc...) NOTE : DON'T CREATE ANY EXAMPLE ABOUT HOW TO SAVE JUST USE THE ADMIN ELEMENT IN NESSOSORY\n\n\nhere how to talk style and products if there : "
# Fetch current data from GitHub
result = fetch_json_from_github()
if result["success"]:
data = result["data"]
else:
print(f"Error fetching data: {result['message']}")
data = {"pages": []}
# Check if page already exists and update it, or add a new page
page_exists = False
for page in data["pages"]:
if page["page_id"] == page_id:
page["page_token"] = page_access_token
page["username"] = username
page["password"] = password
page["system"] = default_system_message # Update system message
# Add new fields or update if they exist
page["subscription"] = page.get("subscription", "NONE")
page["expiration"] = page.get("expiration", "NONE")
page["status"] = page.get("status", "ON")
page_exists = True
break
if not page_exists:
data["pages"].append({
"page_id": page_id,
"system": default_system_message,
"page_token": page_access_token,
"username": username,
"password": password,
"subscription": "Plus", # Add new field with default value
"expiration": "2025-10-10", # Add new field with default value
"status": "ON" # Add new field with default value
})
# Convert data to JSON string without pretty printing (no breaklines)
json_content = json.dumps(data, separators=(',', ':'))
# Get authentication token and commit ID
auth_token, commit_oid = fetch_authenticity_token_and_commit_oid()
if auth_token and commit_oid:
# Update the file on GitHub
update_result = update_user_json_file(auth_token, commit_oid, json_content)
if update_result["success"]:
print("User data saved successfully to GitHub.")
emit('registration_response', {'success': True, 'message': 'تم تسجيل الصفحة بنجاح!'})
else:
print(f"Failed to save user data: {update_result['message']}")
emit('registration_response', {'success': False, 'message': 'فشل في حفظ البيانات. الرجاء المحاولة مرة أخرى.'})
else:
print("Failed to get authentication token or commit ID.")
emit('registration_response', {'success': False, 'message': 'فشل في الحصول على رمز المصادقة. الرجاء المحاولة مرة أخرى.'})
def is_valid_password(password):
"""Check if password meets security requirements"""
if len(password) < 8:
return False
if not re.search(r'[A-Z]', password): # At least one uppercase letter
return False
if not re.search(r'[a-z]', password): # At least one lowercase letter
return False
if not re.search(r'[0-9]', password): # At least one digit
return False
if not re.search(r'[!@#$%^&*(),.?":{}|<>]', password): # At least one special character
return False
return True
@app.route('/register')
def register():
page_token = request.args.get('page_token', '')
page_id = request.args.get('page_id', '')
page_name = request.args.get('page_name', '')
if not page_token or not page_id:
return redirect('https://omarnuwara.pythonanywhere.com/')
return render_template('register.html',
page_token=page_token,
page_id=page_id,
page_name=page_name)
@app.route('/')
def index():
if 'logged_in' in session and session['logged_in']:
return redirect(url_for('dashboard'))
return redirect(url_for('login'))
@app.route('/login', methods=['GET', 'POST'])
def login():
error = None
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
# Get user data
user_data = get_user_data(force_refresh=True)
# Check credentials
authenticated = False
for page in user_data.get('pages', []):
if page.get('username') == username and page.get('password') == password:
authenticated = True
session['logged_in'] = True
session['page_id'] = page.get('page_id')
session['username'] = username
# Start background thread if not already running
global thread
with thread_lock:
if thread is None:
thread = socketio.start_background_task(background_thread)
return redirect(url_for('dashboard'))
if not authenticated:
error = 'اسم المستخدم أو كلمة المرور غير صحيح. الرجاء المحاولة مرة أخرى'
return render_template('login.html', error=error)
@app.route('/logout')
def logout():
session.clear()
return redirect(url_for('login'))
@app.route('/dashboard')
@login_required
def dashboard():
page_id = session.get('page_id')
user_data = get_user_data()
# Find the current page data
current_page = None
for page in user_data.get('pages', []):
if page.get('page_id') == page_id:
current_page = page
break
if not current_page:
return redirect(url_for('logout'))
# Extract system prompt, style, and products
system_prompt = current_page.get('system', '')
# Extract style if it exists
style = ""
style_start = system_prompt.find('<style>')
style_end = system_prompt.find('</style>')
if style_start != -1 and style_end != -1:
style = system_prompt[style_start + 7:style_end]
# Extract products if they exist and pre-parse them
products = []
product_start = system_prompt.find('<product>')
while product_start != -1:
product_end = system_prompt.find('</product>', product_start)
if product_end != -1:
product_text = system_prompt[product_start + 9:product_end]
# Parse the product text into a dictionary
product_data = {
'name': '',
'price': '',
'description': '',
'info': '',
'raw_text': product_text # Keep the raw text for editing/deleting
}
lines = product_text.split('\n')
for line in lines:
if 'NAME:' in line:
product_data['name'] = line.replace('NAME:', '').strip()
elif 'PRICE (LIBYAN DINNER):' in line:
product_data['price'] = line.replace('PRICE (LIBYAN DINNER):', '').strip()
elif 'DESCRIPTION:' in line:
product_data['description'] = line.replace('DESCRIPTION:', '').strip()
elif 'ANOTHER INFO:' in line:
product_data['info'] = line.replace('ANOTHER INFO:', '').strip()
products.append(product_data)
product_start = system_prompt.find('<product>', product_end)
else:
break
# Get bot status
bot_status = current_page.get('status', 'OFF')
# Get subscription info from the actual JSON structure and translate to Arabic
subscription_type = current_page.get('subscription', None)
subscription_message = None
if subscription_type == 'Basic':
subscription_status = 'اشتراك عادي'
elif subscription_type == 'Plus':
subscription_status = 'اشتراك +'
elif subscription_type == 'Premium':
subscription_status = 'اشتراك مميز'
elif subscription_type == 'Free':
subscription_status = 'اشتراك مجاني'
else:
subscription_status = 'لا يوجد اشتراك'
subscription_message = 'يرجى الاشتراك للاستفادة من جميع المميزات'
expiration_date = current_page.get('expiration', '2023-12-31')
# Calculate days until expiration
import datetime
is_expired = False
expiration_message = None
try:
exp_date = datetime.datetime.strptime(expiration_date, '%Y-%m-%d')
today = datetime.datetime.now()
days_left = (exp_date - today).days
if days_left <= 0:
is_expired = True
expiration_message = 'انتهت صلاحية اشتراكك! يرجى التجديد للاستمرار في استخدام الخدمة'
elif days_left <= 3:
expiration_message = f'سينتهي اشتراكك خلال {days_left} أيام!'
except:
days_left = 0
is_expired = True
expiration_message = 'تاريخ انتهاء غير صالح! يرجى التواصل مع الدعم الفني'
# Get unread messages count
messages = get_messages(page_id, force_refresh=True)
unread_count = sum(1 for msg in messages if not msg.get('seen', False))
return render_template(
'dashboard.html',
page=current_page,
style=style,
products=products,
system_prompt=system_prompt,
bot_status=bot_status,
subscription_status=subscription_status,
subscription_message=subscription_message,
expiration_date=expiration_date,
days_left=days_left,
is_expired=is_expired,
expiration_message=expiration_message,
unread_count=unread_count
)
@app.route('/messages')
@login_required
def messages_page():
page_id = session.get('page_id')
messages = get_messages(page_id, force_refresh=True)
# Sort messages by timestamp (newest first)
messages = sorted(messages, key=lambda x: x.get('timestamp', ''), reverse=True)
user_data = get_user_data()
current_page = None
for page in user_data.get('pages', []):
if page.get('page_id') == page_id:
current_page = page
break
if not current_page:
return redirect(url_for('logout'))
return render_template(
'messages.html',
page=current_page,
messages=messages
)
@app.route('/api/update_status', methods=['POST'])
@login_required
def update_status():
page_id = session.get('page_id')
status = request.json.get('status')
if status not in ['ON', 'OFF']:
return jsonify({'success': False, 'message': 'Invalid status value'})
# Get current data
user_data = get_user_data(force_refresh=True)
raw_data = user_data.get('raw_data', {})
# Update the status for the current page
for page in raw_data.get('pages', []):
if page.get('page_id') == page_id:
page['status'] = status
break
# Get tokens for updating
auth_token, commit_oid = fetch_user_tokens()
if not auth_token or not commit_oid:
return jsonify({'success': False, 'message': 'Failed to get authentication tokens'})
# Update the file
result = update_user_json(auth_token, commit_oid, json.dumps(raw_data))
if result['success']:
# Update cache
get_user_data(force_refresh=True)
# Notify all clients about the status change
socketio.emit('status_updated', {'page_id': page_id, 'status': status})
return jsonify({'success': True, 'message': 'Status updated successfully'})
else:
return jsonify({'success': False, 'message': 'Failed to update status'})
@app.route('/api/update_system', methods=['POST'])
@login_required
def update_system():
page_id = session.get('page_id')
style = request.json.get('style', '')
# Get current data
user_data = get_user_data(force_refresh=True)
raw_data = user_data.get('raw_data', {})
# Find the current page
current_page = None
for page in raw_data.get('pages', []):
if page.get('page_id') == page_id:
current_page = page
break
if not current_page:
return jsonify({'success': False, 'message': 'Page not found'})
# Get the current system prompt
system_prompt = current_page.get('system', '')
# Check if style tag already exists
style_start = system_prompt.find('<style>')
style_end = system_prompt.find('</style>')
if style_start != -1 and style_end != -1:
# Replace existing style
new_system = system_prompt[:style_start] + f'<style>{style}</style>' + system_prompt[style_end + 8:]
else:
# Add style at the end of the system prompt
base_prompt_end = system_prompt.find('here how to talk style and products if there : ')
if base_prompt_end != -1:
new_system = system_prompt[:base_prompt_end + 46] + f'<style>{style}</style>'
else:
new_system = system_prompt + f' <style>{style}</style>'
# Update the system prompt
current_page['system'] = new_system
# Get tokens for updating
auth_token, commit_oid = fetch_user_tokens()
if not auth_token or not commit_oid:
return jsonify({'success': False, 'message': 'Failed to get authentication tokens'})
# Update the file
result = update_user_json(auth_token, commit_oid, json.dumps(raw_data))
if result['success']:
# Update cache
get_user_data(force_refresh=True)
return jsonify({'success': True, 'message': 'System prompt updated successfully'})
else:
return jsonify({'success': False, 'message': 'Failed to update system prompt'})
@app.route('/api/add_product', methods=['POST'])
@login_required
def add_product():
page_id = session.get('page_id')
product_data = request.json.get('product', {})
# Format product data
product_text = f"NAME: {product_data.get('name', '')}\n"
product_text += f"DESCRIPTION: {product_data.get('description', '')}\n"
product_text += f"PRICE (LIBYAN DINNER): {product_data.get('price', '')}\n"
product_text += f"ANOTHER INFO: {product_data.get('info', '')}"
# Get current data
user_data = get_user_data(force_refresh=True)
raw_data = user_data.get('raw_data', {})
# Find the current page
current_page = None
for page in raw_data.get('pages', []):
if page.get('page_id') == page_id:
current_page = page
break
if not current_page:
return jsonify({'success': False, 'message': 'Page not found'})
# Get the current system prompt
system_prompt = current_page.get('system', '')
# Add product tag at the end
new_system = system_prompt + f' <product>{product_text}</product>'
# Update the system prompt
current_page['system'] = new_system
# Get tokens for updating
auth_token, commit_oid = fetch_user_tokens()
if not auth_token or not commit_oid:
return jsonify({'success': False, 'message': 'Failed to get authentication tokens'})
# Update the file
result = update_user_json(auth_token, commit_oid, json.dumps(raw_data))
if result['success']:
# Update cache
get_user_data(force_refresh=True)
return jsonify({'success': True, 'message': 'Product added successfully'})
else:
return jsonify({'success': False, 'message': 'Failed to add product'})
@app.route('/api/delete_product', methods=['POST'])
@login_required
def delete_product():
page_id = session.get('page_id')
product_index = request.json.get('index')
# Get current data
user_data = get_user_data(force_refresh=True)
raw_data = user_data.get('raw_data', {})
# Find the current page
current_page = None
for page in raw_data.get('pages', []):
if page.get('page_id') == page_id:
current_page = page
break
if not current_page:
return jsonify({'success': False, 'message': 'Page not found'})
# Get the current system prompt
system_prompt = current_page.get('system', '')
# Find all products
products = []
product_start = system_prompt.find('<product>')
while product_start != -1:
product_end = system_prompt.find('</product>', product_start)
if product_end != -1:
products.append({
'start': product_start,
'end': product_end + 10, # Include the closing tag
'text': system_prompt[product_start:product_end + 10]
})
product_start = system_prompt.find('<product>', product_end)
else:
break
# Check if the index is valid
if product_index < 0 or product_index >= len(products):
return jsonify({'success': False, 'message': 'Invalid product index'})
# Remove the product
product_to_remove = products[product_index]
new_system = system_prompt[:product_to_remove['start']] + system_prompt[product_to_remove['end']:]
# Update the system prompt
current_page['system'] = new_system
# Get tokens for updating
auth_token, commit_oid = fetch_user_tokens()
if not auth_token or not commit_oid:
return jsonify({'success': False, 'message': 'Failed to get authentication tokens'})
# Update the file
result = update_user_json(auth_token, commit_oid, json.dumps(raw_data))
if result['success']:
# Update cache
get_user_data(force_refresh=True)
return jsonify({'success': True, 'message': 'Product deleted successfully'})
else:
return jsonify({'success': False, 'message': 'Failed to delete product'})
@app.route('/api/refresh_data', methods=['POST'])
@login_required
def refresh_data():
data_type = request.json.get('type', 'all')
if data_type == 'messages' or data_type == 'all':
# Force refresh messages
page_id = session.get('page_id')
messages = get_messages(page_id, force_refresh=True)
# Notify clients about the refresh
socketio.emit('data_refreshed', {'type': data_type})
return jsonify({'success': True, 'message': 'تم تحديث البيانات بنجاح'})
return jsonify({'success': False, 'message': 'نوع البيانات غير صالح'})
@app.route('/api/mark_message_seen', methods=['POST'])
@login_required
def mark_message_seen():
page_id = session.get('page_id')
message_index = request.json.get('index')
# Ensure message_index is an integer
try:
message_index = int(message_index)
except (ValueError, TypeError):
return jsonify({'success': False, 'message': 'مؤشر الرسالة غير صالح'})
# Get current messages
result = fetch_messages_json()
if not result['success']:
return jsonify({'success': False, 'message': 'فشل في جلب الرسائل'})
messages_data = result['data']
page_messages = messages_data.get(page_id, [])
# Sort messages by timestamp (newest first) to match the display order
page_messages = sorted(page_messages, key=lambda x: x.get('timestamp', ''), reverse=True)
# Check if the index is valid
if message_index < 0 or message_index >= len(page_messages):
return jsonify({'success': False, 'message': 'مؤشر الرسالة غير صالح'})
# Mark the message as seen
page_messages[message_index]['seen'] = True
# Re-sort messages back to their original order before saving
# This is needed because we're working with the sorted list but need to update the original data
original_messages = messages_data.get(page_id, [])
# Find the corresponding message in the original list by matching properties
message_to_mark = page_messages[message_index]
for orig_msg in original_messages:
if (orig_msg.get('message') == message_to_mark.get('message') and
orig_msg.get('sender_id') == message_to_mark.get('sender_id') and
orig_msg.get('timestamp') == message_to_mark.get('timestamp')):
orig_msg['seen'] = True
break
# Update messages data
messages_data[page_id] = original_messages
# Get tokens for updating
auth_token, commit_oid = fetch_messages_tokens()
if not auth_token or not commit_oid:
return jsonify({'success': False, 'message': 'فشل في الحصول على رموز المصادقة'})
# Update the file
result = update_messages_json(auth_token, commit_oid, json.dumps(messages_data))
if result['success']:
# Update cache
messages_cache[page_id] = original_messages
# Notify clients about the message being seen
socketio.emit('message_seen', {'page_id': page_id, 'index': message_index})
return jsonify({'success': True, 'message': 'تم تعليم الرسالة كمقروءة'})
else:
return jsonify({'success': False, 'message': 'فشل في تحديث حالة الرسالة'})
@app.route('/api/delete_message', methods=['POST'])
@login_required
def delete_message():
page_id = session.get('page_id')
message_index = request.json.get('index')
# Ensure message_index is an integer
try:
message_index = int(message_index)
except (ValueError, TypeError):
return jsonify({'success': False, 'message': 'مؤشر الرسالة غير صالح'})
# Get current messages
result = fetch_messages_json()
if not result['success']:
return jsonify({'success': False, 'message': 'فشل في جلب الرسائل'})
messages_data = result['data']
page_messages = messages_data.get(page_id, [])
# Sort messages by timestamp (newest first) to match the display order
sorted_messages = sorted(page_messages, key=lambda x: x.get('timestamp', ''), reverse=True)
# Check if the index is valid
if message_index < 0 or message_index >= len(sorted_messages):
return jsonify({'success': False, 'message': 'مؤشر الرسالة غير صالح'})
# Get the message to delete
message_to_delete = sorted_messages[message_index]
# Find and remove the message from the original list
original_messages = messages_data.get(page_id, [])
for i, msg in enumerate(original_messages):
if (msg.get('message') == message_to_delete.get('message') and
msg.get('sender_id') == message_to_delete.get('sender_id') and
msg.get('timestamp') == message_to_delete.get('timestamp')):
original_messages.pop(i)
break
# Update messages data
messages_data[page_id] = original_messages
# Get tokens for updating
auth_token, commit_oid = fetch_messages_tokens()
if not auth_token or not commit_oid:
return jsonify({'success': False, 'message': 'فشل في الحصول على رموز المصادقة'})
# Update the file
result = update_messages_json(auth_token, commit_oid, json.dumps(messages_data))
if result['success']:
# Update cache
messages_cache[page_id] = original_messages
# Notify all clients about the message deletion
socketio.emit('message_deleted', {'page_id': page_id, 'index': message_index})
return jsonify({'success': True, 'message': 'تم حذف الرسالة بنجاح'})
else:
return jsonify({'success': False, 'message': 'فشل في حذف الرسالة'})
# WebSocket events
@socketio.on('connect')
def handle_connect():
# Start background thread if not already running
global thread
with thread_lock:
if thread is None:
thread = socketio.start_background_task(background_thread)
emit('connected', {'data': 'Connected'})
@socketio.on('request_messages_update')
def handle_messages_update(data):
page_id = data.get('page_id')
if page_id:
messages = get_messages(page_id, force_refresh=True)
emit('messages_updated', {'page_id': page_id, 'messages': messages})
if __name__ == '__main__':
socketio.run(app, host="0.0.0.0", port=7860, debug=True)