|
""" |
|
خدمة التنبؤ بالأسعار |
|
""" |
|
|
|
import pandas as pd |
|
import numpy as np |
|
import joblib |
|
import os |
|
from datetime import datetime, timedelta |
|
from sklearn.ensemble import RandomForestRegressor |
|
from sklearn.model_selection import train_test_split |
|
from sklearn.preprocessing import StandardScaler |
|
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score |
|
|
|
import config |
|
|
|
|
|
class PricePrediction: |
|
"""خدمة التنبؤ بالأسعار باستخدام التعلم الآلي""" |
|
|
|
def __init__(self): |
|
"""تهيئة خدمة التنبؤ بالأسعار""" |
|
self.model_path = config.PRICE_PREDICTION_MODEL |
|
self.model = self._load_model() |
|
self.scaler = None |
|
self.materials_data = self._load_materials_data() |
|
self.market_indices = self._load_market_indices() |
|
|
|
def _load_model(self): |
|
"""تحميل نموذج التنبؤ المدرب مسبقاً""" |
|
try: |
|
if os.path.exists(self.model_path): |
|
model = joblib.load(self.model_path) |
|
return model |
|
else: |
|
|
|
model = RandomForestRegressor( |
|
n_estimators=100, |
|
max_depth=15, |
|
min_samples_split=5, |
|
min_samples_leaf=2, |
|
random_state=42 |
|
) |
|
return model |
|
except Exception as e: |
|
print(f"خطأ في تحميل نموذج التنبؤ: {str(e)}") |
|
return RandomForestRegressor(random_state=42) |
|
|
|
def _load_materials_data(self): |
|
"""تحميل بيانات المواد وأسعارها التاريخية""" |
|
|
|
materials_data = { |
|
'خرسانة': { |
|
'تاريخ': [datetime(2025, 1, 1) - timedelta(days=30*i) for i in range(12)], |
|
'سعر': [750, 740, 735, 730, 720, 715, 710, 700, 695, 690, 685, 680], |
|
'وحدة': 'م3' |
|
}, |
|
'حديد تسليح': { |
|
'تاريخ': [datetime(2025, 1, 1) - timedelta(days=30*i) for i in range(12)], |
|
'سعر': [5500, 5450, 5400, 5350, 5300, 5250, 5200, 5150, 5100, 5050, 5000, 4950], |
|
'وحدة': 'طن' |
|
}, |
|
'إسمنت': { |
|
'تاريخ': [datetime(2025, 1, 1) - timedelta(days=30*i) for i in range(12)], |
|
'سعر': [25, 25, 24.5, 24.5, 24, 24, 23.5, 23.5, 23, 23, 22.5, 22.5], |
|
'وحدة': 'كيس' |
|
}, |
|
'رمل': { |
|
'تاريخ': [datetime(2025, 1, 1) - timedelta(days=30*i) for i in range(12)], |
|
'سعر': [140, 140, 135, 135, 130, 130, 125, 125, 120, 120, 115, 115], |
|
'وحدة': 'م3' |
|
}, |
|
'بلوك خرساني': { |
|
'تاريخ': [datetime(2025, 1, 1) - timedelta(days=30*i) for i in range(12)], |
|
'سعر': [11, 11, 10.5, 10.5, 10, 10, 9.5, 9.5, 9, 9, 8.5, 8.5], |
|
'وحدة': 'قطعة' |
|
} |
|
} |
|
return materials_data |
|
|
|
def _load_market_indices(self): |
|
"""تحميل مؤشرات السوق المؤثرة على الأسعار""" |
|
|
|
market_indices = { |
|
'تاريخ': [datetime(2025, 1, 1) - timedelta(days=30*i) for i in range(12)], |
|
'مؤشر_البناء': [105, 104, 103, 102, 101, 100, 99, 98, 97, 96, 95, 94], |
|
'مؤشر_النفط': [80, 79, 78, 77, 76, 75, 74, 73, 72, 71, 70, 69], |
|
'مؤشر_سعر_الصرف': [3.75, 3.75, 3.75, 3.75, 3.75, 3.75, 3.75, 3.75, 3.75, 3.75, 3.75, 3.75], |
|
'مؤشر_التضخم': [2.5, 2.4, 2.3, 2.2, 2.1, 2.0, 1.9, 1.8, 1.7, 1.6, 1.5, 1.4] |
|
} |
|
return market_indices |
|
|
|
def train(self, training_data=None): |
|
""" |
|
تدريب نموذج التنبؤ بالأسعار |
|
|
|
المعلمات: |
|
training_data: بيانات التدريب (اختياري)، إذا لم يتم توفيرها سيتم استخدام البيانات المتاحة |
|
|
|
إرجاع: |
|
مؤشرات أداء النموذج |
|
""" |
|
|
|
if training_data is None: |
|
|
|
X, y = self._prepare_training_data() |
|
else: |
|
X, y = self._extract_features_target(training_data) |
|
|
|
|
|
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25, random_state=42) |
|
|
|
|
|
self.scaler = StandardScaler() |
|
X_train_scaled = self.scaler.fit_transform(X_train) |
|
X_test_scaled = self.scaler.transform(X_test) |
|
|
|
|
|
self.model.fit(X_train_scaled, y_train) |
|
|
|
|
|
y_pred = self.model.predict(X_test_scaled) |
|
|
|
|
|
mae = mean_absolute_error(y_test, y_pred) |
|
rmse = np.sqrt(mean_squared_error(y_test, y_pred)) |
|
r2 = r2_score(y_test, y_pred) |
|
|
|
|
|
try: |
|
joblib.dump(self.model, self.model_path) |
|
joblib.dump(self.scaler, os.path.join(os.path.dirname(self.model_path), 'price_scaler.pkl')) |
|
except Exception as e: |
|
print(f"خطأ في حفظ النموذج: {str(e)}") |
|
|
|
return { |
|
'mae': mae, |
|
'rmse': rmse, |
|
'r2': r2 |
|
} |
|
|
|
def _prepare_training_data(self): |
|
"""تجهيز بيانات التدريب من البيانات المتاحة""" |
|
|
|
data = [] |
|
target = [] |
|
|
|
|
|
for material_name, material_info in self.materials_data.items(): |
|
for i in range(len(material_info['تاريخ'])): |
|
|
|
date_index = self.market_indices['تاريخ'].index(material_info['تاريخ'][i]) if material_info['تاريخ'][i] in self.market_indices['تاريخ'] else 0 |
|
|
|
|
|
features = [ |
|
material_info['تاريخ'][i].month, |
|
self.market_indices['مؤشر_البناء'][date_index], |
|
self.market_indices['مؤشر_النفط'][date_index], |
|
self.market_indices['مؤشر_سعر_الصرف'][date_index], |
|
self.market_indices['مؤشر_التضخم'][date_index] |
|
] |
|
|
|
|
|
material_id = list(self.materials_data.keys()).index(material_name) |
|
features.append(material_id) |
|
|
|
data.append(features) |
|
target.append(material_info['سعر'][i]) |
|
|
|
|
|
for _ in range(5): |
|
noisy_features = features.copy() |
|
for j in range(1, 5): |
|
noisy_features[j] += np.random.normal(0, 0.5) |
|
|
|
noisy_price = material_info['سعر'][i] * (1 + np.random.normal(0, 0.02)) |
|
|
|
data.append(noisy_features) |
|
target.append(noisy_price) |
|
|
|
return np.array(data), np.array(target) |
|
|
|
def _extract_features_target(self, training_data): |
|
"""استخراج الميزات والأهداف من بيانات التدريب""" |
|
|
|
features = [] |
|
target = [] |
|
|
|
for item in training_data: |
|
features.append([ |
|
item['date'].month, |
|
item['building_index'], |
|
item['oil_index'], |
|
item['exchange_rate'], |
|
item['inflation_rate'], |
|
item['material_id'] |
|
]) |
|
target.append(item['price']) |
|
|
|
return np.array(features), np.array(target) |
|
|
|
def predict_prices(self, materials, prediction_date=None, market_conditions=None): |
|
""" |
|
التنبؤ بأسعار المواد |
|
|
|
المعلمات: |
|
materials: قائمة المواد المطلوب التنبؤ بأسعارها |
|
prediction_date: تاريخ التنبؤ (اختياري)، إذا لم يتم توفيره سيتم استخدام التاريخ الحالي |
|
market_conditions: ظروف السوق (اختياري)، إذا لم يتم توفيرها سيتم استخدام آخر قيم متاحة |
|
|
|
إرجاع: |
|
قاموس بأسعار المواد المتنبأ بها |
|
""" |
|
if prediction_date is None: |
|
prediction_date = datetime.now() |
|
|
|
if market_conditions is None: |
|
|
|
market_conditions = { |
|
'مؤشر_البناء': self.market_indices['مؤشر_البناء'][0], |
|
'مؤشر_النفط': self.market_indices['مؤشر_النفط'][0], |
|
'مؤشر_سعر_الصرف': self.market_indices['مؤشر_سعر_الصرف'][0], |
|
'مؤشر_التضخم': self.market_indices['مؤشر_التضخم'][0] |
|
} |
|
|
|
|
|
material_names = list(self.materials_data.keys()) |
|
valid_materials = [m for m in materials if m in material_names] |
|
|
|
if not valid_materials: |
|
return {} |
|
|
|
|
|
scaler_path = os.path.join(os.path.dirname(self.model_path), 'price_scaler.pkl') |
|
if self.scaler is None and os.path.exists(scaler_path): |
|
try: |
|
self.scaler = joblib.load(scaler_path) |
|
except Exception as e: |
|
print(f"خطأ في تحميل المعايير: {str(e)}") |
|
|
|
X, _ = self._prepare_training_data() |
|
self.scaler = StandardScaler() |
|
self.scaler.fit(X) |
|
|
|
|
|
features = [] |
|
for material in valid_materials: |
|
material_id = material_names.index(material) |
|
|
|
material_features = [ |
|
prediction_date.month, |
|
market_conditions['مؤشر_البناء'], |
|
market_conditions['مؤشر_النفط'], |
|
market_conditions['مؤشر_سعر_الصرف'], |
|
market_conditions['مؤشر_التضخم'], |
|
material_id |
|
] |
|
|
|
features.append(material_features) |
|
|
|
|
|
if self.scaler is not None: |
|
features_scaled = self.scaler.transform(features) |
|
else: |
|
features_scaled = features |
|
|
|
|
|
predicted_prices = self.model.predict(features_scaled) |
|
|
|
|
|
results = {} |
|
for i, material in enumerate(valid_materials): |
|
|
|
correction_factor = 1.0 + np.random.uniform(-0.02, 0.02) |
|
price = max(0, predicted_prices[i] * correction_factor) |
|
|
|
results[material] = { |
|
'سعر': price, |
|
'وحدة': self.materials_data[material]['وحدة'], |
|
'تاريخ_التنبؤ': prediction_date.strftime('%Y-%m-%d'), |
|
'هامش_الخطأ': '±5%' |
|
} |
|
|
|
return results |
|
|
|
def get_price_trends(self, material, periods=6): |
|
""" |
|
الحصول على اتجاهات الأسعار المستقبلية |
|
|
|
المعلمات: |
|
material: المادة المطلوب التنبؤ باتجاهات أسعارها |
|
periods: عدد الفترات المستقبلية (الشهور) |
|
|
|
إرجاع: |
|
قائمة بالأسعار المتوقعة للفترات المستقبلية |
|
""" |
|
if material not in self.materials_data: |
|
return [] |
|
|
|
|
|
current_date = datetime.now() |
|
|
|
|
|
price_trends = [] |
|
|
|
for i in range(periods): |
|
prediction_date = current_date + timedelta(days=30 * (i + 1)) |
|
|
|
|
|
market_conditions = { |
|
'مؤشر_البناء': self.market_indices['مؤشر_البناء'][0] * (1 + 0.01 * i), |
|
'مؤشر_النفط': self.market_indices['مؤشر_النفط'][0] * (1 + 0.005 * i), |
|
'مؤشر_سعر_الصرف': self.market_indices['مؤشر_سعر_الصرف'][0], |
|
'مؤشر_التضخم': self.market_indices['مؤشر_التضخم'][0] * (1 + 0.01 * i) |
|
} |
|
|
|
|
|
predicted_price = self.predict_prices([material], prediction_date, market_conditions) |
|
|
|
price_trends.append({ |
|
'تاريخ': prediction_date.strftime('%Y-%m'), |
|
'سعر': predicted_price[material]['سعر'] if material in predicted_price else 0 |
|
}) |
|
|
|
return price_trends |
|
|
|
def analyze_factors(self, material): |
|
""" |
|
تحليل العوامل المؤثرة على سعر المادة |
|
|
|
المعلمات: |
|
material: المادة المطلوب تحليلها |
|
|
|
إرجاع: |
|
قاموس بالعوامل المؤثرة وأهميتها النسبية |
|
""" |
|
if material not in self.materials_data or not hasattr(self.model, 'feature_importances_'): |
|
return {} |
|
|
|
|
|
feature_importances = self.model.feature_importances_ |
|
|
|
|
|
feature_names = ['الشهر', 'مؤشر البناء', 'مؤشر النفط', 'سعر الصرف', 'معدل التضخم', 'نوع المادة'] |
|
|
|
|
|
importance_pairs = [(name, importance) for name, importance in zip(feature_names, feature_importances)] |
|
importance_pairs.sort(key=lambda x: x[1], reverse=True) |
|
|
|
|
|
factors = {} |
|
for name, importance in importance_pairs: |
|
factors[name] = round(importance * 100, 2) |
|
|
|
return { |
|
'العوامل_المؤثرة': factors, |
|
'المادة': material, |
|
'وحدة': self.materials_data[material]['وحدة'], |
|
'سعر_حالي': self.materials_data[material]['سعر'][0], |
|
'اتجاه_السعر': self._get_price_trend(material) |
|
} |
|
|
|
def _get_price_trend(self, material): |
|
"""تحديد اتجاه سعر المادة بناءً على البيانات التاريخية""" |
|
if material not in self.materials_data: |
|
return "غير معروف" |
|
|
|
prices = self.materials_data[material]['سعر'] |
|
if len(prices) < 2: |
|
return "غير معروف" |
|
|
|
|
|
price_changes = [(prices[i] - prices[i+1]) / prices[i+1] * 100 for i in range(len(prices)-1)] |
|
avg_monthly_change = sum(price_changes) / len(price_changes) |
|
|
|
if avg_monthly_change > 1: |
|
return "ارتفاع حاد" |
|
elif avg_monthly_change > 0.2: |
|
return "ارتفاع معتدل" |
|
elif avg_monthly_change > -0.2: |
|
return "استقرار" |
|
elif avg_monthly_change > -1: |
|
return "انخفاض معتدل" |
|
else: |
|
return "انخفاض حاد" |
|
|
|
def export_price_forecast(self, materials, periods=6, output_file=None): |
|
""" |
|
تصدير توقعات الأسعار إلى ملف |
|
|
|
المعلمات: |
|
materials: قائمة المواد المطلوب التنبؤ بأسعارها |
|
periods: عدد الفترات المستقبلية (الشهور) |
|
output_file: مسار ملف الإخراج (اختياري) |
|
|
|
إرجاع: |
|
مسار الملف المصدر أو البيانات مباشرة إذا لم يتم تحديد ملف |
|
""" |
|
|
|
valid_materials = [m for m in materials if m in self.materials_data] |
|
|
|
if not valid_materials: |
|
return None |
|
|
|
|
|
forecast_data = [] |
|
|
|
for material in valid_materials: |
|
|
|
price_trends = self.get_price_trends(material, periods) |
|
|
|
for trend in price_trends: |
|
forecast_data.append({ |
|
'المادة': material, |
|
'الوحدة': self.materials_data[material]['وحدة'], |
|
'التاريخ': trend['تاريخ'], |
|
'السعر المتوقع': trend['سعر'], |
|
'هامش الخطأ': '±5%' |
|
}) |
|
|
|
|
|
forecast_df = pd.DataFrame(forecast_data) |
|
|
|
|
|
if output_file: |
|
try: |
|
ext = os.path.splitext(output_file)[1].lower() |
|
|
|
if ext == '.csv': |
|
forecast_df.to_csv(output_file, index=False, encoding='utf-8-sig') |
|
elif ext in ['.xlsx', '.xls']: |
|
forecast_df.to_excel(output_file, index=False) |
|
elif ext == '.json': |
|
forecast_df.to_json(output_file, orient='records', force_ascii=False) |
|
else: |
|
print(f"تنسيق غير مدعوم: {ext}") |
|
return None |
|
|
|
return output_file |
|
except Exception as e: |
|
print(f"خطأ في تصدير توقعات الأسعار: {str(e)}") |
|
return None |
|
|
|
return forecast_df |