import asyncio from crewai import Crew, Process from textwrap import dedent import json from crypto_analysis_agents import CryptoAnalysisAgents from crypto__analysis_tasks import CryptoAnalysisTasks class CryptoCrew: def __init__(self, crypto): self.crypto = crypto def run(self): return asyncio.run(self.run_async()) async def run_async(self): agents = CryptoAnalysisAgents() tasks = CryptoAnalysisTasks() market_analyst_agent = agents.market_analyst() technical_analyst_agent = agents.technical_analyst() crypto_advisor_agent = agents.crypto_advisor() market_research_task = tasks.market_research(market_analyst_agent, self.crypto) technical_analysis_task = tasks.technical_analysis(technical_analyst_agent, self.crypto) sentiment_analysis_task = tasks.sentiment_analysis(market_analyst_agent, self.crypto) recommend_task = tasks.recommend(crypto_advisor_agent, self.crypto) crew = Crew( agents=[ market_analyst_agent, technical_analyst_agent, crypto_advisor_agent ], tasks=[ market_research_task, technical_analysis_task, sentiment_analysis_task, recommend_task ], verbose=True, process=Process.sequential, max_iterations=100, task_timeout=600 ) try: result = await asyncio.to_thread(crew.kickoff) return self.parse_result(result) except Exception as e: return {"summary": f"Analysis failed: {str(e)}. Please try again."} def parse_result(self, result): parsed = { "summary": str(result), "sentiment": { "overall": "Neutral", "social_media": "Neutral", "news": "Neutral", "community": "Neutral" } } # Add your parsing logic here return parsed if __name__ == "__main__": print("## Welcome to Crypto Analysis Crew") print('-------------------------------') crypto = input(dedent(""" What is the cryptocurrency you want to analyze? """)) crypto_crew = CryptoCrew(crypto) result = crypto_crew.run() print("\n\n########################") print("## Here is the Report") print("########################\n") print(json.dumps(result, indent=2))