Spaces:
Sleeping
Sleeping
import os | |
# from twilio.rest import Client | |
from agents.deals import Opportunity | |
import http.client | |
import urllib | |
from agents.agent import Agent | |
# Uncomment the Twilio lines if you wish to use Twilio | |
DO_TEXT = False | |
DO_PUSH = True | |
class MessagingAgent(Agent): | |
name = "Messaging Agent" | |
color = Agent.WHITE | |
def __init__(self): | |
""" | |
Set up this object to either do push notifications via Pushover, | |
or SMS via Twilio, | |
whichever is specified in the constants | |
""" | |
self.log(f"Messaging Agent is initializing") | |
if DO_TEXT: | |
account_sid = os.getenv('TWILIO_ACCOUNT_SID', 'your-sid-if-not-using-env') | |
auth_token = os.getenv('TWILIO_AUTH_TOKEN', 'your-auth-if-not-using-env') | |
self.me_from = os.getenv('TWILIO_FROM', 'your-phone-number-if-not-using-env') | |
self.me_to = os.getenv('MY_PHONE_NUMBER', 'your-phone-number-if-not-using-env') | |
# self.client = Client(account_sid, auth_token) | |
self.log("Messaging Agent has initialized Twilio") | |
if DO_PUSH: | |
self.pushover_user = os.getenv('PUSHOVER_USER', 'your-pushover-user-if-not-using-env') | |
self.pushover_token = os.getenv('PUSHOVER_TOKEN', 'your-pushover-user-if-not-using-env') | |
self.log("Messaging Agent has initialized Pushover") | |
def message(self, text): | |
""" | |
Send an SMS message using the Twilio API | |
""" | |
self.log("Messaging Agent is sending a text message") | |
message = self.client.messages.create( | |
from_=self.me_from, | |
body=text, | |
to=self.me_to | |
) | |
def push(self, text): | |
""" | |
Send a Push Notification using the Pushover API | |
""" | |
self.log("Messaging Agent is sending a push notification") | |
conn = http.client.HTTPSConnection("api.pushover.net:443") | |
conn.request("POST", "/1/messages.json", | |
urllib.parse.urlencode({ | |
"token": self.pushover_token, | |
"user": self.pushover_user, | |
"message": text, | |
"sound": "cashregister" | |
}), { "Content-type": "application/x-www-form-urlencoded" }) | |
conn.getresponse() | |
def alert(self, opportunity: Opportunity): | |
""" | |
Make an alert about the specified Opportunity | |
""" | |
text = f"Deal Alert! Price=${opportunity.deal.price:.2f}, " | |
text += f"Estimate=${opportunity.estimate:.2f}, " | |
text += f"Discount=${opportunity.discount:.2f} :" | |
text += opportunity.deal.product_description[:10]+'... ' | |
text += opportunity.deal.url | |
if DO_TEXT: | |
self.message(text) | |
if DO_PUSH: | |
self.push(text) | |
self.log("Messaging Agent has completed") | |