File size: 7,828 Bytes
01a1238 e814ccb 41eef54 01a1238 73fedea 01a1238 427638d 7d1b428 73fedea 01a1238 41eef54 9cb37fd 41eef54 07342b6 41eef54 e814ccb bf68831 9cb37fd 41eef54 01a1238 fd01f78 7d1b428 01a1238 15cdc23 73fedea dddd52a 73fedea dddd52a fd01f78 73fedea 41eef54 73fedea 665318f 7875b25 73fedea 665318f 73fedea 665318f 01a1238 73fedea dddd522 a911135 fd01f78 73fedea fd01f78 427638d 73fedea 427638d 73fedea 01a1238 73fedea 01a1238 73fedea c335f43 73fedea c335f43 01a1238 73fedea 01a1238 73fedea 01a1238 73fedea |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 |
# App/Messages/Routes.py
### In App/Messages/Routes.py
from fastapi import APIRouter, HTTPException, Depends
from typing import List
import asyncio
from App.Users.Model import User
from App.Users.dependencies import get_admin_user, UserType
from pydantic import BaseModel
from fastapi import APIRouter, HTTPException, Depends
from typing import List
from .Model import Message
from .Schema import MessageCreate, MessageResponse
from typing import Optional
import re
from datetime import datetime
from decimal import Decimal
from App.Payments.PaymentsRoutes import create_payment
from App.Payments.Schema import CreatePaymentRequest
from App.Users.Model import User
from App.Payments.Schema import PaymentMethod
from App.Payments.Model import Payment
from App.Plans.Model import Plan
from tortoise.contrib.pydantic import pydantic_model_creator
import traceback
import logging
from App.Users.dependencies import (
get_current_active_user,
UserType,
get_admin_user,
) # Assuming you have a dependency to get the current user
message_router = APIRouter(tags=["Messages"], prefix="/messages")
logging.basicConfig(level=logging.WARNING)
class BroadcastMessageRequest(BaseModel):
message: str
@message_router.post("/broadcast")
async def broadcast_message(request: BroadcastMessageRequest):
# # Ensure the requesting user is an admin
# if current_user.user_type != UserType.ADMIN:
# raise HTTPException(
# status_code=403,
# detail="User does not have permission to perform a broadcast.",
# )
# Fetch all users
users = await User.all()
# Create a list of coroutines for sending the message
tasks = [user.send_message(request.message) for user in users]
# Run them concurrently
await asyncio.gather(*tasks)
return {"status": 200, "message": "Broadcast message sent to all users."}
@message_router.get("/", response_model=List[MessageResponse])
async def get_all_messages(current_user: User = Depends(get_admin_user)):
# Check if the current user is an admin
if current_user.user_type != UserType.ADMIN:
raise HTTPException(
status_code=403,
detail="User does not have permission to access this resource",
)
# Fetch all messages from the database
messages = await Message.all()
# Serialize the messages
return [MessageResponse.from_orm(message) for message in messages]
@message_router.post("/sms_received", response_model=MessageResponse)
async def receive_message(message_data: MessageCreate):
Message_Pydantic = pydantic_model_creator(Message)
try:
# Extract data from the message content using regex
text = message_data.payload.message
parsed_data = parse_message_content(text)
print(parsed_data)
# Validate parsed_data
if not parsed_data:
# Create a new message record with parsed_data as None
message = await Message.create(
device_id=message_data.deviceId,
event=message_data.event,
message_id=message_data.id,
webhook_id=message_data.webhookId,
message_content=message_data.payload.message,
phone_number=message_data.payload.phoneNumber,
received_at=message_data.payload.receivedAt,
sim_number=message_data.payload.simNumber,
parsed_data=None, # Set parsed_data as None
)
# Return the created message
data = await Message_Pydantic.from_tortoise_orm(message)
return data
required_keys = ["phone_number", "amount_received", "transaction_id"]
missing_keys = [key for key in required_keys if key not in parsed_data]
if missing_keys:
raise HTTPException(
status_code=400, detail=f"Missing keys in parsed data: {missing_keys}"
)
# Prevent double entry from the SMS gateway app
message = await Message.get_or_none(message_id=message_data.id)
if not message:
# Process Mpesa Payments
user: User = await User.get_or_none(phoneNumber=parsed_data["phone_number"])
data_plan: Plan = await Plan.get_or_none(
amount=Decimal(parsed_data["amount_received"])
)
payment_details = CreatePaymentRequest(
user_id=user.id if user else None,
plan_id=str(data_plan.id) if data_plan else None,
amount=Decimal(parsed_data["amount_received"]),
payment_method=PaymentMethod.MPESA,
transaction_id=parsed_data["transaction_id"],
)
# Ensure 'create_payment' is an async function
payment = await create_payment(payment_details, internal=True)
if isinstance(payment, Payment):
await payment.create_subscription_or_balance()
else:
## this is not a paymemt instance it is a json response due to an error
return MessageResponse(**message_data.dict())
# Create a new message record with parsed_data
message = await Message.create(
device_id=message_data.deviceId,
event=message_data.event,
message_id=message_data.id,
webhook_id=message_data.webhookId,
message_content=message_data.payload.message,
phone_number=message_data.payload.phoneNumber,
received_at=message_data.payload.receivedAt,
sim_number=message_data.payload.simNumber,
parsed_data=parsed_data,
)
# Serialize the message
data = await Message_Pydantic.from_tortoise_orm(message)
return data
except Exception as e:
# Log the full traceback
traceback_str = "".join(traceback.format_exception(None, e, e.__traceback__))
logging.error(f"An error occurred: {traceback_str}")
# Provide a default error message if 'e' is empty
error_message = str(e) if str(e) else "An unexpected error occurred."
raise HTTPException(status_code=500, detail=error_message)
def parse_message_content(text: str) -> Optional[dict]:
# Regular expression to capture the data from the message
pattern = r"(\w+)\sConfirmed\.You have received Tsh([\d,]+\.\d{2}) from (\d{12}) - ([A-Z ]+) on (\d{1,2}/\d{1,2}/\d{2}) at ([\d:]+ [APM]+).*?balance\sis\sTsh([\d,]+\.\d{2})"
# Replace non-breaking spaces and other whitespace characters with regular spaces
text = re.sub(r"\s+", " ", text)
matches = re.search(pattern, text)
if matches:
data = {
"transaction_id": matches.group(1),
"amount_received": parse_decimal(matches.group(2)),
"phone_number": matches.group(3),
"name": matches.group(4).strip(),
"date": parse_date(matches.group(5), matches.group(6)),
"new_balance": parse_decimal(matches.group(7)),
}
return data
else:
# Return None if the message doesn't match the expected format
return None
def parse_decimal(amount_str: str) -> float:
# Remove commas and convert to float
amount_str = amount_str.replace(",", "")
return float(Decimal(amount_str))
def parse_date(date_str: str, time_str: str) -> str:
# Combine date and time strings and parse into ISO format
datetime_str = f"{date_str} {time_str}"
try:
dt = datetime.strptime(datetime_str, "%d/%m/%y %I:%M %p")
return dt.isoformat()
except ValueError:
# Log the error
logging.error(f"Failed to parse date: {datetime_str}")
# Return current time as fallback
return datetime.now().isoformat()
|