Spaces:
Sleeping
Sleeping
""" | |
خدمة التنبؤ بالأسعار | |
""" | |
import pandas as pd | |
import numpy as np | |
import joblib | |
import os | |
from datetime import datetime, timedelta | |
from sklearn.ensemble import RandomForestRegressor | |
from sklearn.model_selection import train_test_split | |
from sklearn.preprocessing import StandardScaler | |
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score | |
import config | |
class PricePrediction: | |
"""خدمة التنبؤ بالأسعار باستخدام التعلم الآلي""" | |
def __init__(self): | |
"""تهيئة خدمة التنبؤ بالأسعار""" | |
self.model_path = config.PRICE_PREDICTION_MODEL | |
self.model = self._load_model() | |
self.scaler = None | |
self.materials_data = self._load_materials_data() | |
self.market_indices = self._load_market_indices() | |
def _load_model(self): | |
"""تحميل نموذج التنبؤ المدرب مسبقاً""" | |
try: | |
if os.path.exists(self.model_path): | |
model = joblib.load(self.model_path) | |
return model | |
else: | |
# إذا لم يكن النموذج موجوداً، قم بإنشاء نموذج جديد | |
model = RandomForestRegressor( | |
n_estimators=100, | |
max_depth=15, | |
min_samples_split=5, | |
min_samples_leaf=2, | |
random_state=42 | |
) | |
return model | |
except Exception as e: | |
print(f"خطأ في تحميل نموذج التنبؤ: {str(e)}") | |
return RandomForestRegressor(random_state=42) | |
def _load_materials_data(self): | |
"""تحميل بيانات المواد وأسعارها التاريخية""" | |
# محاكاة تحميل البيانات من مصدر بيانات | |
materials_data = { | |
'خرسانة': { | |
'تاريخ': [datetime(2025, 1, 1) - timedelta(days=30*i) for i in range(12)], | |
'سعر': [750, 740, 735, 730, 720, 715, 710, 700, 695, 690, 685, 680], | |
'وحدة': 'م3' | |
}, | |
'حديد تسليح': { | |
'تاريخ': [datetime(2025, 1, 1) - timedelta(days=30*i) for i in range(12)], | |
'سعر': [5500, 5450, 5400, 5350, 5300, 5250, 5200, 5150, 5100, 5050, 5000, 4950], | |
'وحدة': 'طن' | |
}, | |
'إسمنت': { | |
'تاريخ': [datetime(2025, 1, 1) - timedelta(days=30*i) for i in range(12)], | |
'سعر': [25, 25, 24.5, 24.5, 24, 24, 23.5, 23.5, 23, 23, 22.5, 22.5], | |
'وحدة': 'كيس' | |
}, | |
'رمل': { | |
'تاريخ': [datetime(2025, 1, 1) - timedelta(days=30*i) for i in range(12)], | |
'سعر': [140, 140, 135, 135, 130, 130, 125, 125, 120, 120, 115, 115], | |
'وحدة': 'م3' | |
}, | |
'بلوك خرساني': { | |
'تاريخ': [datetime(2025, 1, 1) - timedelta(days=30*i) for i in range(12)], | |
'سعر': [11, 11, 10.5, 10.5, 10, 10, 9.5, 9.5, 9, 9, 8.5, 8.5], | |
'وحدة': 'قطعة' | |
} | |
} | |
return materials_data | |
def _load_market_indices(self): | |
"""تحميل مؤشرات السوق المؤثرة على الأسعار""" | |
# محاكاة تحميل البيانات من مصدر بيانات | |
market_indices = { | |
'تاريخ': [datetime(2025, 1, 1) - timedelta(days=30*i) for i in range(12)], | |
'مؤشر_البناء': [105, 104, 103, 102, 101, 100, 99, 98, 97, 96, 95, 94], | |
'مؤشر_النفط': [80, 79, 78, 77, 76, 75, 74, 73, 72, 71, 70, 69], | |
'مؤشر_سعر_الصرف': [3.75, 3.75, 3.75, 3.75, 3.75, 3.75, 3.75, 3.75, 3.75, 3.75, 3.75, 3.75], | |
'مؤشر_التضخم': [2.5, 2.4, 2.3, 2.2, 2.1, 2.0, 1.9, 1.8, 1.7, 1.6, 1.5, 1.4] | |
} | |
return market_indices | |
def train(self, training_data=None): | |
""" | |
تدريب نموذج التنبؤ بالأسعار | |
المعلمات: | |
training_data: بيانات التدريب (اختياري)، إذا لم يتم توفيرها سيتم استخدام البيانات المتاحة | |
إرجاع: | |
مؤشرات أداء النموذج | |
""" | |
# تجهيز بيانات التدريب | |
if training_data is None: | |
# استخدام البيانات المتاحة لتوليد مجموعة تدريب | |
X, y = self._prepare_training_data() | |
else: | |
X, y = self._extract_features_target(training_data) | |
# تقسيم البيانات إلى تدريب واختبار | |
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25, random_state=42) | |
# تطبيع البيانات | |
self.scaler = StandardScaler() | |
X_train_scaled = self.scaler.fit_transform(X_train) | |
X_test_scaled = self.scaler.transform(X_test) | |
# تدريب النموذج | |
self.model.fit(X_train_scaled, y_train) | |
# تقييم النموذج | |
y_pred = self.model.predict(X_test_scaled) | |
# حساب مؤشرات الأداء | |
mae = mean_absolute_error(y_test, y_pred) | |
rmse = np.sqrt(mean_squared_error(y_test, y_pred)) | |
r2 = r2_score(y_test, y_pred) | |
# حفظ النموذج | |
try: | |
joblib.dump(self.model, self.model_path) | |
joblib.dump(self.scaler, os.path.join(os.path.dirname(self.model_path), 'price_scaler.pkl')) | |
except Exception as e: | |
print(f"خطأ في حفظ النموذج: {str(e)}") | |
return { | |
'mae': mae, | |
'rmse': rmse, | |
'r2': r2 | |
} | |
def _prepare_training_data(self): | |
"""تجهيز بيانات التدريب من البيانات المتاحة""" | |
# توليد بيانات تدريب افتراضية | |
data = [] | |
target = [] | |
# استخدام بيانات المواد وأسعارها التاريخية | |
for material_name, material_info in self.materials_data.items(): | |
for i in range(len(material_info['تاريخ'])): | |
# استخراج المؤشرات في التاريخ المقابل | |
date_index = self.market_indices['تاريخ'].index(material_info['تاريخ'][i]) if material_info['تاريخ'][i] in self.market_indices['تاريخ'] else 0 | |
# تكوين ميزات التدريب (المؤشرات السوقية والشهر) | |
features = [ | |
material_info['تاريخ'][i].month, # الشهر | |
self.market_indices['مؤشر_البناء'][date_index], | |
self.market_indices['مؤشر_النفط'][date_index], | |
self.market_indices['مؤشر_سعر_الصرف'][date_index], | |
self.market_indices['مؤشر_التضخم'][date_index] | |
] | |
# إضافة معرّف للمادة (تمثيل رقمي) | |
material_id = list(self.materials_data.keys()).index(material_name) | |
features.append(material_id) | |
data.append(features) | |
target.append(material_info['سعر'][i]) | |
# إضافة ضوضاء عشوائية لزيادة حجم البيانات | |
for _ in range(5): | |
noisy_features = features.copy() | |
for j in range(1, 5): # إضافة ضوضاء للمؤشرات فقط | |
noisy_features[j] += np.random.normal(0, 0.5) | |
noisy_price = material_info['سعر'][i] * (1 + np.random.normal(0, 0.02)) # ضوضاء 2% | |
data.append(noisy_features) | |
target.append(noisy_price) | |
return np.array(data), np.array(target) | |
def _extract_features_target(self, training_data): | |
"""استخراج الميزات والأهداف من بيانات التدريب""" | |
# استخراج الميزات والأهداف من البيانات المقدمة | |
features = [] | |
target = [] | |
for item in training_data: | |
features.append([ | |
item['date'].month, # الشهر | |
item['building_index'], | |
item['oil_index'], | |
item['exchange_rate'], | |
item['inflation_rate'], | |
item['material_id'] | |
]) | |
target.append(item['price']) | |
return np.array(features), np.array(target) | |
def predict_prices(self, materials, prediction_date=None, market_conditions=None): | |
""" | |
التنبؤ بأسعار المواد | |
المعلمات: | |
materials: قائمة المواد المطلوب التنبؤ بأسعارها | |
prediction_date: تاريخ التنبؤ (اختياري)، إذا لم يتم توفيره سيتم استخدام التاريخ الحالي | |
market_conditions: ظروف السوق (اختياري)، إذا لم يتم توفيرها سيتم استخدام آخر قيم متاحة | |
إرجاع: | |
قاموس بأسعار المواد المتنبأ بها | |
""" | |
if prediction_date is None: | |
prediction_date = datetime.now() | |
if market_conditions is None: | |
# استخدام آخر قيم متاحة للمؤشرات | |
market_conditions = { | |
'مؤشر_البناء': self.market_indices['مؤشر_البناء'][0], | |
'مؤشر_النفط': self.market_indices['مؤشر_النفط'][0], | |
'مؤشر_سعر_الصرف': self.market_indices['مؤشر_سعر_الصرف'][0], | |
'مؤشر_التضخم': self.market_indices['مؤشر_التضخم'][0] | |
} | |
# التحقق من وجود المواد في البيانات | |
material_names = list(self.materials_data.keys()) | |
valid_materials = [m for m in materials if m in material_names] | |
if not valid_materials: | |
return {} | |
# تحميل المعايير إذا كانت متوفرة | |
scaler_path = os.path.join(os.path.dirname(self.model_path), 'price_scaler.pkl') | |
if self.scaler is None and os.path.exists(scaler_path): | |
try: | |
self.scaler = joblib.load(scaler_path) | |
except Exception as e: | |
print(f"خطأ في تحميل المعايير: {str(e)}") | |
# إنشاء معايير جديدة | |
X, _ = self._prepare_training_data() | |
self.scaler = StandardScaler() | |
self.scaler.fit(X) | |
# إعداد ميزات التنبؤ | |
features = [] | |
for material in valid_materials: | |
material_id = material_names.index(material) | |
material_features = [ | |
prediction_date.month, # الشهر | |
market_conditions['مؤشر_البناء'], | |
market_conditions['مؤشر_النفط'], | |
market_conditions['مؤشر_سعر_الصرف'], | |
market_conditions['مؤشر_التضخم'], | |
material_id | |
] | |
features.append(material_features) | |
# تطبيع الميزات | |
if self.scaler is not None: | |
features_scaled = self.scaler.transform(features) | |
else: | |
features_scaled = features | |
# التنبؤ بالأسعار | |
predicted_prices = self.model.predict(features_scaled) | |
# إرجاع النتائج | |
results = {} | |
for i, material in enumerate(valid_materials): | |
# تطبيق عامل تصحيح (2% عشوائية) | |
correction_factor = 1.0 + np.random.uniform(-0.02, 0.02) | |
price = max(0, predicted_prices[i] * correction_factor) | |
results[material] = { | |
'سعر': price, | |
'وحدة': self.materials_data[material]['وحدة'], | |
'تاريخ_التنبؤ': prediction_date.strftime('%Y-%m-%d'), | |
'هامش_الخطأ': '±5%' # تقدير هامش الخطأ | |
} | |
return results | |
def get_price_trends(self, material, periods=6): | |
""" | |
الحصول على اتجاهات الأسعار المستقبلية | |
المعلمات: | |
material: المادة المطلوب التنبؤ باتجاهات أسعارها | |
periods: عدد الفترات المستقبلية (الشهور) | |
إرجاع: | |
قائمة بالأسعار المتوقعة للفترات المستقبلية | |
""" | |
if material not in self.materials_data: | |
return [] | |
# الحصول على التاريخ الحالي | |
current_date = datetime.now() | |
# التنبؤ بالأسعار للفترات المستقبلية | |
price_trends = [] | |
for i in range(periods): | |
prediction_date = current_date + timedelta(days=30 * (i + 1)) | |
# افتراض تغيرات طفيفة في المؤشرات مع مرور الوقت | |
market_conditions = { | |
'مؤشر_البناء': self.market_indices['مؤشر_البناء'][0] * (1 + 0.01 * i), # زيادة 1% شهرياً | |
'مؤشر_النفط': self.market_indices['مؤشر_النفط'][0] * (1 + 0.005 * i), # زيادة 0.5% شهرياً | |
'مؤشر_سعر_الصرف': self.market_indices['مؤشر_سعر_الصرف'][0], # ثابت | |
'مؤشر_التضخم': self.market_indices['مؤشر_التضخم'][0] * (1 + 0.01 * i) # زيادة 1% شهرياً | |
} | |
# التنبؤ بالسعر | |
predicted_price = self.predict_prices([material], prediction_date, market_conditions) | |
price_trends.append({ | |
'تاريخ': prediction_date.strftime('%Y-%m'), | |
'سعر': predicted_price[material]['سعر'] if material in predicted_price else 0 | |
}) | |
return price_trends | |
def analyze_factors(self, material): | |
""" | |
تحليل العوامل المؤثرة على سعر المادة | |
المعلمات: | |
material: المادة المطلوب تحليلها | |
إرجاع: | |
قاموس بالعوامل المؤثرة وأهميتها النسبية | |
""" | |
if material not in self.materials_data or not hasattr(self.model, 'feature_importances_'): | |
return {} | |
# الحصول على أهمية الميزات من النموذج | |
feature_importances = self.model.feature_importances_ | |
# أسماء الميزات | |
feature_names = ['الشهر', 'مؤشر البناء', 'مؤشر النفط', 'سعر الصرف', 'معدل التضخم', 'نوع المادة'] | |
# ترتيب الميزات حسب الأهمية | |
importance_pairs = [(name, importance) for name, importance in zip(feature_names, feature_importances)] | |
importance_pairs.sort(key=lambda x: x[1], reverse=True) | |
# إرجاع العوامل المؤثرة وأهميتها | |
factors = {} | |
for name, importance in importance_pairs: | |
factors[name] = round(importance * 100, 2) # تحويل إلى نسبة مئوية | |
return { | |
'العوامل_المؤثرة': factors, | |
'المادة': material, | |
'وحدة': self.materials_data[material]['وحدة'], | |
'سعر_حالي': self.materials_data[material]['سعر'][0], | |
'اتجاه_السعر': self._get_price_trend(material) | |
} | |
def _get_price_trend(self, material): | |
"""تحديد اتجاه سعر المادة بناءً على البيانات التاريخية""" | |
if material not in self.materials_data: | |
return "غير معروف" | |
prices = self.materials_data[material]['سعر'] | |
if len(prices) < 2: | |
return "غير معروف" | |
# حساب متوسط التغير الشهري | |
price_changes = [(prices[i] - prices[i+1]) / prices[i+1] * 100 for i in range(len(prices)-1)] | |
avg_monthly_change = sum(price_changes) / len(price_changes) | |
if avg_monthly_change > 1: | |
return "ارتفاع حاد" | |
elif avg_monthly_change > 0.2: | |
return "ارتفاع معتدل" | |
elif avg_monthly_change > -0.2: | |
return "استقرار" | |
elif avg_monthly_change > -1: | |
return "انخفاض معتدل" | |
else: | |
return "انخفاض حاد" | |
def export_price_forecast(self, materials, periods=6, output_file=None): | |
""" | |
تصدير توقعات الأسعار إلى ملف | |
المعلمات: | |
materials: قائمة المواد المطلوب التنبؤ بأسعارها | |
periods: عدد الفترات المستقبلية (الشهور) | |
output_file: مسار ملف الإخراج (اختياري) | |
إرجاع: | |
مسار الملف المصدر أو البيانات مباشرة إذا لم يتم تحديد ملف | |
""" | |
# التحقق من وجود المواد في البيانات | |
valid_materials = [m for m in materials if m in self.materials_data] | |
if not valid_materials: | |
return None | |
# إعداد بيانات التوقعات | |
forecast_data = [] | |
for material in valid_materials: | |
# الحصول على اتجاهات الأسعار | |
price_trends = self.get_price_trends(material, periods) | |
for trend in price_trends: | |
forecast_data.append({ | |
'المادة': material, | |
'الوحدة': self.materials_data[material]['وحدة'], | |
'التاريخ': trend['تاريخ'], | |
'السعر المتوقع': trend['سعر'], | |
'هامش الخطأ': '±5%' | |
}) | |
# تحويل البيانات إلى DataFrame | |
forecast_df = pd.DataFrame(forecast_data) | |
# تصدير البيانات إلى ملف إذا تم تحديده | |
if output_file: | |
try: | |
ext = os.path.splitext(output_file)[1].lower() | |
if ext == '.csv': | |
forecast_df.to_csv(output_file, index=False, encoding='utf-8-sig') | |
elif ext in ['.xlsx', '.xls']: | |
forecast_df.to_excel(output_file, index=False) | |
elif ext == '.json': | |
forecast_df.to_json(output_file, orient='records', force_ascii=False) | |
else: | |
print(f"تنسيق غير مدعوم: {ext}") | |
return None | |
return output_file | |
except Exception as e: | |
print(f"خطأ في تصدير توقعات الأسعار: {str(e)}") | |
return None | |
return forecast_df |