v3 / modules /pricing /services /price_prediction.py
EGYADMIN's picture
Upload 115 files
82676b8 verified
"""
خدمة التنبؤ بالأسعار
"""
import pandas as pd
import numpy as np
import joblib
import os
from datetime import datetime, timedelta
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
import config
class PricePrediction:
"""خدمة التنبؤ بالأسعار باستخدام التعلم الآلي"""
def __init__(self):
"""تهيئة خدمة التنبؤ بالأسعار"""
self.model_path = config.PRICE_PREDICTION_MODEL
self.model = self._load_model()
self.scaler = None
self.materials_data = self._load_materials_data()
self.market_indices = self._load_market_indices()
def _load_model(self):
"""تحميل نموذج التنبؤ المدرب مسبقاً"""
try:
if os.path.exists(self.model_path):
model = joblib.load(self.model_path)
return model
else:
# إذا لم يكن النموذج موجوداً، قم بإنشاء نموذج جديد
model = RandomForestRegressor(
n_estimators=100,
max_depth=15,
min_samples_split=5,
min_samples_leaf=2,
random_state=42
)
return model
except Exception as e:
print(f"خطأ في تحميل نموذج التنبؤ: {str(e)}")
return RandomForestRegressor(random_state=42)
def _load_materials_data(self):
"""تحميل بيانات المواد وأسعارها التاريخية"""
# محاكاة تحميل البيانات من مصدر بيانات
materials_data = {
'خرسانة': {
'تاريخ': [datetime(2025, 1, 1) - timedelta(days=30*i) for i in range(12)],
'سعر': [750, 740, 735, 730, 720, 715, 710, 700, 695, 690, 685, 680],
'وحدة': 'م3'
},
'حديد تسليح': {
'تاريخ': [datetime(2025, 1, 1) - timedelta(days=30*i) for i in range(12)],
'سعر': [5500, 5450, 5400, 5350, 5300, 5250, 5200, 5150, 5100, 5050, 5000, 4950],
'وحدة': 'طن'
},
'إسمنت': {
'تاريخ': [datetime(2025, 1, 1) - timedelta(days=30*i) for i in range(12)],
'سعر': [25, 25, 24.5, 24.5, 24, 24, 23.5, 23.5, 23, 23, 22.5, 22.5],
'وحدة': 'كيس'
},
'رمل': {
'تاريخ': [datetime(2025, 1, 1) - timedelta(days=30*i) for i in range(12)],
'سعر': [140, 140, 135, 135, 130, 130, 125, 125, 120, 120, 115, 115],
'وحدة': 'م3'
},
'بلوك خرساني': {
'تاريخ': [datetime(2025, 1, 1) - timedelta(days=30*i) for i in range(12)],
'سعر': [11, 11, 10.5, 10.5, 10, 10, 9.5, 9.5, 9, 9, 8.5, 8.5],
'وحدة': 'قطعة'
}
}
return materials_data
def _load_market_indices(self):
"""تحميل مؤشرات السوق المؤثرة على الأسعار"""
# محاكاة تحميل البيانات من مصدر بيانات
market_indices = {
'تاريخ': [datetime(2025, 1, 1) - timedelta(days=30*i) for i in range(12)],
'مؤشر_البناء': [105, 104, 103, 102, 101, 100, 99, 98, 97, 96, 95, 94],
'مؤشر_النفط': [80, 79, 78, 77, 76, 75, 74, 73, 72, 71, 70, 69],
'مؤشر_سعر_الصرف': [3.75, 3.75, 3.75, 3.75, 3.75, 3.75, 3.75, 3.75, 3.75, 3.75, 3.75, 3.75],
'مؤشر_التضخم': [2.5, 2.4, 2.3, 2.2, 2.1, 2.0, 1.9, 1.8, 1.7, 1.6, 1.5, 1.4]
}
return market_indices
def train(self, training_data=None):
"""
تدريب نموذج التنبؤ بالأسعار
المعلمات:
training_data: بيانات التدريب (اختياري)، إذا لم يتم توفيرها سيتم استخدام البيانات المتاحة
إرجاع:
مؤشرات أداء النموذج
"""
# تجهيز بيانات التدريب
if training_data is None:
# استخدام البيانات المتاحة لتوليد مجموعة تدريب
X, y = self._prepare_training_data()
else:
X, y = self._extract_features_target(training_data)
# تقسيم البيانات إلى تدريب واختبار
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25, random_state=42)
# تطبيع البيانات
self.scaler = StandardScaler()
X_train_scaled = self.scaler.fit_transform(X_train)
X_test_scaled = self.scaler.transform(X_test)
# تدريب النموذج
self.model.fit(X_train_scaled, y_train)
# تقييم النموذج
y_pred = self.model.predict(X_test_scaled)
# حساب مؤشرات الأداء
mae = mean_absolute_error(y_test, y_pred)
rmse = np.sqrt(mean_squared_error(y_test, y_pred))
r2 = r2_score(y_test, y_pred)
# حفظ النموذج
try:
joblib.dump(self.model, self.model_path)
joblib.dump(self.scaler, os.path.join(os.path.dirname(self.model_path), 'price_scaler.pkl'))
except Exception as e:
print(f"خطأ في حفظ النموذج: {str(e)}")
return {
'mae': mae,
'rmse': rmse,
'r2': r2
}
def _prepare_training_data(self):
"""تجهيز بيانات التدريب من البيانات المتاحة"""
# توليد بيانات تدريب افتراضية
data = []
target = []
# استخدام بيانات المواد وأسعارها التاريخية
for material_name, material_info in self.materials_data.items():
for i in range(len(material_info['تاريخ'])):
# استخراج المؤشرات في التاريخ المقابل
date_index = self.market_indices['تاريخ'].index(material_info['تاريخ'][i]) if material_info['تاريخ'][i] in self.market_indices['تاريخ'] else 0
# تكوين ميزات التدريب (المؤشرات السوقية والشهر)
features = [
material_info['تاريخ'][i].month, # الشهر
self.market_indices['مؤشر_البناء'][date_index],
self.market_indices['مؤشر_النفط'][date_index],
self.market_indices['مؤشر_سعر_الصرف'][date_index],
self.market_indices['مؤشر_التضخم'][date_index]
]
# إضافة معرّف للمادة (تمثيل رقمي)
material_id = list(self.materials_data.keys()).index(material_name)
features.append(material_id)
data.append(features)
target.append(material_info['سعر'][i])
# إضافة ضوضاء عشوائية لزيادة حجم البيانات
for _ in range(5):
noisy_features = features.copy()
for j in range(1, 5): # إضافة ضوضاء للمؤشرات فقط
noisy_features[j] += np.random.normal(0, 0.5)
noisy_price = material_info['سعر'][i] * (1 + np.random.normal(0, 0.02)) # ضوضاء 2%
data.append(noisy_features)
target.append(noisy_price)
return np.array(data), np.array(target)
def _extract_features_target(self, training_data):
"""استخراج الميزات والأهداف من بيانات التدريب"""
# استخراج الميزات والأهداف من البيانات المقدمة
features = []
target = []
for item in training_data:
features.append([
item['date'].month, # الشهر
item['building_index'],
item['oil_index'],
item['exchange_rate'],
item['inflation_rate'],
item['material_id']
])
target.append(item['price'])
return np.array(features), np.array(target)
def predict_prices(self, materials, prediction_date=None, market_conditions=None):
"""
التنبؤ بأسعار المواد
المعلمات:
materials: قائمة المواد المطلوب التنبؤ بأسعارها
prediction_date: تاريخ التنبؤ (اختياري)، إذا لم يتم توفيره سيتم استخدام التاريخ الحالي
market_conditions: ظروف السوق (اختياري)، إذا لم يتم توفيرها سيتم استخدام آخر قيم متاحة
إرجاع:
قاموس بأسعار المواد المتنبأ بها
"""
if prediction_date is None:
prediction_date = datetime.now()
if market_conditions is None:
# استخدام آخر قيم متاحة للمؤشرات
market_conditions = {
'مؤشر_البناء': self.market_indices['مؤشر_البناء'][0],
'مؤشر_النفط': self.market_indices['مؤشر_النفط'][0],
'مؤشر_سعر_الصرف': self.market_indices['مؤشر_سعر_الصرف'][0],
'مؤشر_التضخم': self.market_indices['مؤشر_التضخم'][0]
}
# التحقق من وجود المواد في البيانات
material_names = list(self.materials_data.keys())
valid_materials = [m for m in materials if m in material_names]
if not valid_materials:
return {}
# تحميل المعايير إذا كانت متوفرة
scaler_path = os.path.join(os.path.dirname(self.model_path), 'price_scaler.pkl')
if self.scaler is None and os.path.exists(scaler_path):
try:
self.scaler = joblib.load(scaler_path)
except Exception as e:
print(f"خطأ في تحميل المعايير: {str(e)}")
# إنشاء معايير جديدة
X, _ = self._prepare_training_data()
self.scaler = StandardScaler()
self.scaler.fit(X)
# إعداد ميزات التنبؤ
features = []
for material in valid_materials:
material_id = material_names.index(material)
material_features = [
prediction_date.month, # الشهر
market_conditions['مؤشر_البناء'],
market_conditions['مؤشر_النفط'],
market_conditions['مؤشر_سعر_الصرف'],
market_conditions['مؤشر_التضخم'],
material_id
]
features.append(material_features)
# تطبيع الميزات
if self.scaler is not None:
features_scaled = self.scaler.transform(features)
else:
features_scaled = features
# التنبؤ بالأسعار
predicted_prices = self.model.predict(features_scaled)
# إرجاع النتائج
results = {}
for i, material in enumerate(valid_materials):
# تطبيق عامل تصحيح (2% عشوائية)
correction_factor = 1.0 + np.random.uniform(-0.02, 0.02)
price = max(0, predicted_prices[i] * correction_factor)
results[material] = {
'سعر': price,
'وحدة': self.materials_data[material]['وحدة'],
'تاريخ_التنبؤ': prediction_date.strftime('%Y-%m-%d'),
'هامش_الخطأ': '±5%' # تقدير هامش الخطأ
}
return results
def get_price_trends(self, material, periods=6):
"""
الحصول على اتجاهات الأسعار المستقبلية
المعلمات:
material: المادة المطلوب التنبؤ باتجاهات أسعارها
periods: عدد الفترات المستقبلية (الشهور)
إرجاع:
قائمة بالأسعار المتوقعة للفترات المستقبلية
"""
if material not in self.materials_data:
return []
# الحصول على التاريخ الحالي
current_date = datetime.now()
# التنبؤ بالأسعار للفترات المستقبلية
price_trends = []
for i in range(periods):
prediction_date = current_date + timedelta(days=30 * (i + 1))
# افتراض تغيرات طفيفة في المؤشرات مع مرور الوقت
market_conditions = {
'مؤشر_البناء': self.market_indices['مؤشر_البناء'][0] * (1 + 0.01 * i), # زيادة 1% شهرياً
'مؤشر_النفط': self.market_indices['مؤشر_النفط'][0] * (1 + 0.005 * i), # زيادة 0.5% شهرياً
'مؤشر_سعر_الصرف': self.market_indices['مؤشر_سعر_الصرف'][0], # ثابت
'مؤشر_التضخم': self.market_indices['مؤشر_التضخم'][0] * (1 + 0.01 * i) # زيادة 1% شهرياً
}
# التنبؤ بالسعر
predicted_price = self.predict_prices([material], prediction_date, market_conditions)
price_trends.append({
'تاريخ': prediction_date.strftime('%Y-%m'),
'سعر': predicted_price[material]['سعر'] if material in predicted_price else 0
})
return price_trends
def analyze_factors(self, material):
"""
تحليل العوامل المؤثرة على سعر المادة
المعلمات:
material: المادة المطلوب تحليلها
إرجاع:
قاموس بالعوامل المؤثرة وأهميتها النسبية
"""
if material not in self.materials_data or not hasattr(self.model, 'feature_importances_'):
return {}
# الحصول على أهمية الميزات من النموذج
feature_importances = self.model.feature_importances_
# أسماء الميزات
feature_names = ['الشهر', 'مؤشر البناء', 'مؤشر النفط', 'سعر الصرف', 'معدل التضخم', 'نوع المادة']
# ترتيب الميزات حسب الأهمية
importance_pairs = [(name, importance) for name, importance in zip(feature_names, feature_importances)]
importance_pairs.sort(key=lambda x: x[1], reverse=True)
# إرجاع العوامل المؤثرة وأهميتها
factors = {}
for name, importance in importance_pairs:
factors[name] = round(importance * 100, 2) # تحويل إلى نسبة مئوية
return {
'العوامل_المؤثرة': factors,
'المادة': material,
'وحدة': self.materials_data[material]['وحدة'],
'سعر_حالي': self.materials_data[material]['سعر'][0],
'اتجاه_السعر': self._get_price_trend(material)
}
def _get_price_trend(self, material):
"""تحديد اتجاه سعر المادة بناءً على البيانات التاريخية"""
if material not in self.materials_data:
return "غير معروف"
prices = self.materials_data[material]['سعر']
if len(prices) < 2:
return "غير معروف"
# حساب متوسط التغير الشهري
price_changes = [(prices[i] - prices[i+1]) / prices[i+1] * 100 for i in range(len(prices)-1)]
avg_monthly_change = sum(price_changes) / len(price_changes)
if avg_monthly_change > 1:
return "ارتفاع حاد"
elif avg_monthly_change > 0.2:
return "ارتفاع معتدل"
elif avg_monthly_change > -0.2:
return "استقرار"
elif avg_monthly_change > -1:
return "انخفاض معتدل"
else:
return "انخفاض حاد"
def export_price_forecast(self, materials, periods=6, output_file=None):
"""
تصدير توقعات الأسعار إلى ملف
المعلمات:
materials: قائمة المواد المطلوب التنبؤ بأسعارها
periods: عدد الفترات المستقبلية (الشهور)
output_file: مسار ملف الإخراج (اختياري)
إرجاع:
مسار الملف المصدر أو البيانات مباشرة إذا لم يتم تحديد ملف
"""
# التحقق من وجود المواد في البيانات
valid_materials = [m for m in materials if m in self.materials_data]
if not valid_materials:
return None
# إعداد بيانات التوقعات
forecast_data = []
for material in valid_materials:
# الحصول على اتجاهات الأسعار
price_trends = self.get_price_trends(material, periods)
for trend in price_trends:
forecast_data.append({
'المادة': material,
'الوحدة': self.materials_data[material]['وحدة'],
'التاريخ': trend['تاريخ'],
'السعر المتوقع': trend['سعر'],
'هامش الخطأ': '±5%'
})
# تحويل البيانات إلى DataFrame
forecast_df = pd.DataFrame(forecast_data)
# تصدير البيانات إلى ملف إذا تم تحديده
if output_file:
try:
ext = os.path.splitext(output_file)[1].lower()
if ext == '.csv':
forecast_df.to_csv(output_file, index=False, encoding='utf-8-sig')
elif ext in ['.xlsx', '.xls']:
forecast_df.to_excel(output_file, index=False)
elif ext == '.json':
forecast_df.to_json(output_file, orient='records', force_ascii=False)
else:
print(f"تنسيق غير مدعوم: {ext}")
return None
return output_file
except Exception as e:
print(f"خطأ في تصدير توقعات الأسعار: {str(e)}")
return None
return forecast_df