""" خدمة التنبؤ بالأسعار """ import pandas as pd import numpy as np import joblib import os from datetime import datetime, timedelta from sklearn.ensemble import RandomForestRegressor from sklearn.model_selection import train_test_split from sklearn.preprocessing import StandardScaler from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score import config class PricePrediction: """خدمة التنبؤ بالأسعار باستخدام التعلم الآلي""" def __init__(self): """تهيئة خدمة التنبؤ بالأسعار""" self.model_path = config.PRICE_PREDICTION_MODEL self.model = self._load_model() self.scaler = None self.materials_data = self._load_materials_data() self.market_indices = self._load_market_indices() def _load_model(self): """تحميل نموذج التنبؤ المدرب مسبقاً""" try: if os.path.exists(self.model_path): model = joblib.load(self.model_path) return model else: # إذا لم يكن النموذج موجوداً، قم بإنشاء نموذج جديد model = RandomForestRegressor( n_estimators=100, max_depth=15, min_samples_split=5, min_samples_leaf=2, random_state=42 ) return model except Exception as e: print(f"خطأ في تحميل نموذج التنبؤ: {str(e)}") return RandomForestRegressor(random_state=42) def _load_materials_data(self): """تحميل بيانات المواد وأسعارها التاريخية""" # محاكاة تحميل البيانات من مصدر بيانات materials_data = { 'خرسانة': { 'تاريخ': [datetime(2025, 1, 1) - timedelta(days=30*i) for i in range(12)], 'سعر': [750, 740, 735, 730, 720, 715, 710, 700, 695, 690, 685, 680], 'وحدة': 'م3' }, 'حديد تسليح': { 'تاريخ': [datetime(2025, 1, 1) - timedelta(days=30*i) for i in range(12)], 'سعر': [5500, 5450, 5400, 5350, 5300, 5250, 5200, 5150, 5100, 5050, 5000, 4950], 'وحدة': 'طن' }, 'إسمنت': { 'تاريخ': [datetime(2025, 1, 1) - timedelta(days=30*i) for i in range(12)], 'سعر': [25, 25, 24.5, 24.5, 24, 24, 23.5, 23.5, 23, 23, 22.5, 22.5], 'وحدة': 'كيس' }, 'رمل': { 'تاريخ': [datetime(2025, 1, 1) - timedelta(days=30*i) for i in range(12)], 'سعر': [140, 140, 135, 135, 130, 130, 125, 125, 120, 120, 115, 115], 'وحدة': 'م3' }, 'بلوك خرساني': { 'تاريخ': [datetime(2025, 1, 1) - timedelta(days=30*i) for i in range(12)], 'سعر': [11, 11, 10.5, 10.5, 10, 10, 9.5, 9.5, 9, 9, 8.5, 8.5], 'وحدة': 'قطعة' } } return materials_data def _load_market_indices(self): """تحميل مؤشرات السوق المؤثرة على الأسعار""" # محاكاة تحميل البيانات من مصدر بيانات market_indices = { 'تاريخ': [datetime(2025, 1, 1) - timedelta(days=30*i) for i in range(12)], 'مؤشر_البناء': [105, 104, 103, 102, 101, 100, 99, 98, 97, 96, 95, 94], 'مؤشر_النفط': [80, 79, 78, 77, 76, 75, 74, 73, 72, 71, 70, 69], 'مؤشر_سعر_الصرف': [3.75, 3.75, 3.75, 3.75, 3.75, 3.75, 3.75, 3.75, 3.75, 3.75, 3.75, 3.75], 'مؤشر_التضخم': [2.5, 2.4, 2.3, 2.2, 2.1, 2.0, 1.9, 1.8, 1.7, 1.6, 1.5, 1.4] } return market_indices def train(self, training_data=None): """ تدريب نموذج التنبؤ بالأسعار المعلمات: training_data: بيانات التدريب (اختياري)، إذا لم يتم توفيرها سيتم استخدام البيانات المتاحة إرجاع: مؤشرات أداء النموذج """ # تجهيز بيانات التدريب if training_data is None: # استخدام البيانات المتاحة لتوليد مجموعة تدريب X, y = self._prepare_training_data() else: X, y = self._extract_features_target(training_data) # تقسيم البيانات إلى تدريب واختبار X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25, random_state=42) # تطبيع البيانات self.scaler = StandardScaler() X_train_scaled = self.scaler.fit_transform(X_train) X_test_scaled = self.scaler.transform(X_test) # تدريب النموذج self.model.fit(X_train_scaled, y_train) # تقييم النموذج y_pred = self.model.predict(X_test_scaled) # حساب مؤشرات الأداء mae = mean_absolute_error(y_test, y_pred) rmse = np.sqrt(mean_squared_error(y_test, y_pred)) r2 = r2_score(y_test, y_pred) # حفظ النموذج try: joblib.dump(self.model, self.model_path) joblib.dump(self.scaler, os.path.join(os.path.dirname(self.model_path), 'price_scaler.pkl')) except Exception as e: print(f"خطأ في حفظ النموذج: {str(e)}") return { 'mae': mae, 'rmse': rmse, 'r2': r2 } def _prepare_training_data(self): """تجهيز بيانات التدريب من البيانات المتاحة""" # توليد بيانات تدريب افتراضية data = [] target = [] # استخدام بيانات المواد وأسعارها التاريخية for material_name, material_info in self.materials_data.items(): for i in range(len(material_info['تاريخ'])): # استخراج المؤشرات في التاريخ المقابل date_index = self.market_indices['تاريخ'].index(material_info['تاريخ'][i]) if material_info['تاريخ'][i] in self.market_indices['تاريخ'] else 0 # تكوين ميزات التدريب (المؤشرات السوقية والشهر) features = [ material_info['تاريخ'][i].month, # الشهر self.market_indices['مؤشر_البناء'][date_index], self.market_indices['مؤشر_النفط'][date_index], self.market_indices['مؤشر_سعر_الصرف'][date_index], self.market_indices['مؤشر_التضخم'][date_index] ] # إضافة معرّف للمادة (تمثيل رقمي) material_id = list(self.materials_data.keys()).index(material_name) features.append(material_id) data.append(features) target.append(material_info['سعر'][i]) # إضافة ضوضاء عشوائية لزيادة حجم البيانات for _ in range(5): noisy_features = features.copy() for j in range(1, 5): # إضافة ضوضاء للمؤشرات فقط noisy_features[j] += np.random.normal(0, 0.5) noisy_price = material_info['سعر'][i] * (1 + np.random.normal(0, 0.02)) # ضوضاء 2% data.append(noisy_features) target.append(noisy_price) return np.array(data), np.array(target) def _extract_features_target(self, training_data): """استخراج الميزات والأهداف من بيانات التدريب""" # استخراج الميزات والأهداف من البيانات المقدمة features = [] target = [] for item in training_data: features.append([ item['date'].month, # الشهر item['building_index'], item['oil_index'], item['exchange_rate'], item['inflation_rate'], item['material_id'] ]) target.append(item['price']) return np.array(features), np.array(target) def predict_prices(self, materials, prediction_date=None, market_conditions=None): """ التنبؤ بأسعار المواد المعلمات: materials: قائمة المواد المطلوب التنبؤ بأسعارها prediction_date: تاريخ التنبؤ (اختياري)، إذا لم يتم توفيره سيتم استخدام التاريخ الحالي market_conditions: ظروف السوق (اختياري)، إذا لم يتم توفيرها سيتم استخدام آخر قيم متاحة إرجاع: قاموس بأسعار المواد المتنبأ بها """ if prediction_date is None: prediction_date = datetime.now() if market_conditions is None: # استخدام آخر قيم متاحة للمؤشرات market_conditions = { 'مؤشر_البناء': self.market_indices['مؤشر_البناء'][0], 'مؤشر_النفط': self.market_indices['مؤشر_النفط'][0], 'مؤشر_سعر_الصرف': self.market_indices['مؤشر_سعر_الصرف'][0], 'مؤشر_التضخم': self.market_indices['مؤشر_التضخم'][0] } # التحقق من وجود المواد في البيانات material_names = list(self.materials_data.keys()) valid_materials = [m for m in materials if m in material_names] if not valid_materials: return {} # تحميل المعايير إذا كانت متوفرة scaler_path = os.path.join(os.path.dirname(self.model_path), 'price_scaler.pkl') if self.scaler is None and os.path.exists(scaler_path): try: self.scaler = joblib.load(scaler_path) except Exception as e: print(f"خطأ في تحميل المعايير: {str(e)}") # إنشاء معايير جديدة X, _ = self._prepare_training_data() self.scaler = StandardScaler() self.scaler.fit(X) # إعداد ميزات التنبؤ features = [] for material in valid_materials: material_id = material_names.index(material) material_features = [ prediction_date.month, # الشهر market_conditions['مؤشر_البناء'], market_conditions['مؤشر_النفط'], market_conditions['مؤشر_سعر_الصرف'], market_conditions['مؤشر_التضخم'], material_id ] features.append(material_features) # تطبيع الميزات if self.scaler is not None: features_scaled = self.scaler.transform(features) else: features_scaled = features # التنبؤ بالأسعار predicted_prices = self.model.predict(features_scaled) # إرجاع النتائج results = {} for i, material in enumerate(valid_materials): # تطبيق عامل تصحيح (2% عشوائية) correction_factor = 1.0 + np.random.uniform(-0.02, 0.02) price = max(0, predicted_prices[i] * correction_factor) results[material] = { 'سعر': price, 'وحدة': self.materials_data[material]['وحدة'], 'تاريخ_التنبؤ': prediction_date.strftime('%Y-%m-%d'), 'هامش_الخطأ': '±5%' # تقدير هامش الخطأ } return results def get_price_trends(self, material, periods=6): """ الحصول على اتجاهات الأسعار المستقبلية المعلمات: material: المادة المطلوب التنبؤ باتجاهات أسعارها periods: عدد الفترات المستقبلية (الشهور) إرجاع: قائمة بالأسعار المتوقعة للفترات المستقبلية """ if material not in self.materials_data: return [] # الحصول على التاريخ الحالي current_date = datetime.now() # التنبؤ بالأسعار للفترات المستقبلية price_trends = [] for i in range(periods): prediction_date = current_date + timedelta(days=30 * (i + 1)) # افتراض تغيرات طفيفة في المؤشرات مع مرور الوقت market_conditions = { 'مؤشر_البناء': self.market_indices['مؤشر_البناء'][0] * (1 + 0.01 * i), # زيادة 1% شهرياً 'مؤشر_النفط': self.market_indices['مؤشر_النفط'][0] * (1 + 0.005 * i), # زيادة 0.5% شهرياً 'مؤشر_سعر_الصرف': self.market_indices['مؤشر_سعر_الصرف'][0], # ثابت 'مؤشر_التضخم': self.market_indices['مؤشر_التضخم'][0] * (1 + 0.01 * i) # زيادة 1% شهرياً } # التنبؤ بالسعر predicted_price = self.predict_prices([material], prediction_date, market_conditions) price_trends.append({ 'تاريخ': prediction_date.strftime('%Y-%m'), 'سعر': predicted_price[material]['سعر'] if material in predicted_price else 0 }) return price_trends def analyze_factors(self, material): """ تحليل العوامل المؤثرة على سعر المادة المعلمات: material: المادة المطلوب تحليلها إرجاع: قاموس بالعوامل المؤثرة وأهميتها النسبية """ if material not in self.materials_data or not hasattr(self.model, 'feature_importances_'): return {} # الحصول على أهمية الميزات من النموذج feature_importances = self.model.feature_importances_ # أسماء الميزات feature_names = ['الشهر', 'مؤشر البناء', 'مؤشر النفط', 'سعر الصرف', 'معدل التضخم', 'نوع المادة'] # ترتيب الميزات حسب الأهمية importance_pairs = [(name, importance) for name, importance in zip(feature_names, feature_importances)] importance_pairs.sort(key=lambda x: x[1], reverse=True) # إرجاع العوامل المؤثرة وأهميتها factors = {} for name, importance in importance_pairs: factors[name] = round(importance * 100, 2) # تحويل إلى نسبة مئوية return { 'العوامل_المؤثرة': factors, 'المادة': material, 'وحدة': self.materials_data[material]['وحدة'], 'سعر_حالي': self.materials_data[material]['سعر'][0], 'اتجاه_السعر': self._get_price_trend(material) } def _get_price_trend(self, material): """تحديد اتجاه سعر المادة بناءً على البيانات التاريخية""" if material not in self.materials_data: return "غير معروف" prices = self.materials_data[material]['سعر'] if len(prices) < 2: return "غير معروف" # حساب متوسط التغير الشهري price_changes = [(prices[i] - prices[i+1]) / prices[i+1] * 100 for i in range(len(prices)-1)] avg_monthly_change = sum(price_changes) / len(price_changes) if avg_monthly_change > 1: return "ارتفاع حاد" elif avg_monthly_change > 0.2: return "ارتفاع معتدل" elif avg_monthly_change > -0.2: return "استقرار" elif avg_monthly_change > -1: return "انخفاض معتدل" else: return "انخفاض حاد" def export_price_forecast(self, materials, periods=6, output_file=None): """ تصدير توقعات الأسعار إلى ملف المعلمات: materials: قائمة المواد المطلوب التنبؤ بأسعارها periods: عدد الفترات المستقبلية (الشهور) output_file: مسار ملف الإخراج (اختياري) إرجاع: مسار الملف المصدر أو البيانات مباشرة إذا لم يتم تحديد ملف """ # التحقق من وجود المواد في البيانات valid_materials = [m for m in materials if m in self.materials_data] if not valid_materials: return None # إعداد بيانات التوقعات forecast_data = [] for material in valid_materials: # الحصول على اتجاهات الأسعار price_trends = self.get_price_trends(material, periods) for trend in price_trends: forecast_data.append({ 'المادة': material, 'الوحدة': self.materials_data[material]['وحدة'], 'التاريخ': trend['تاريخ'], 'السعر المتوقع': trend['سعر'], 'هامش الخطأ': '±5%' }) # تحويل البيانات إلى DataFrame forecast_df = pd.DataFrame(forecast_data) # تصدير البيانات إلى ملف إذا تم تحديده if output_file: try: ext = os.path.splitext(output_file)[1].lower() if ext == '.csv': forecast_df.to_csv(output_file, index=False, encoding='utf-8-sig') elif ext in ['.xlsx', '.xls']: forecast_df.to_excel(output_file, index=False) elif ext == '.json': forecast_df.to_json(output_file, orient='records', force_ascii=False) else: print(f"تنسيق غير مدعوم: {ext}") return None return output_file except Exception as e: print(f"خطأ في تصدير توقعات الأسعار: {str(e)}") return None return forecast_df