File size: 2,418 Bytes
ce4e319
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import logging
from typing import Union, Dict
import tweepy
from tenacity import retry, stop_after_attempt, wait_exponential

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class TwitterError(Exception):
    """Custom exception for Twitter API errors"""
    pass

def validate_credentials(login: Dict[str, str]) -> bool:
    """Validate Twitter API credentials"""
    required_keys = ['api_key', 'secret_key', 'bearer_key', 'access_token', 'access_token_secret']
    return all(key in login and login[key] for key in required_keys)

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=4, max=10),
    reraise=True
)
def create_tweet(text: str, login: Dict[str, str]) -> Union[Dict[str, str], Dict[str, str]]:
    """
    Create a tweet with improved error handling and retries
    
    Args:
        text (str): Tweet content
        login (dict): Twitter API credentials
        
    Returns:
        dict: Tweet information if successful
        dict: Error information if failed
    """
    try:
        if not text:
            return {"error": "No tweet content provided"}

        if not validate_credentials(login):
            return {"error": "Invalid or missing Twitter credentials"}

        if len(text) > 280:
            return {"error": "Tweet exceeds 280 characters"}

        # Initialize Twitter client
        client = tweepy.Client(
            bearer_token=login['bearer_key'],
            consumer_key=login['api_key'],
            consumer_secret=login['secret_key'],
            access_token=login['access_token'],
            access_token_secret=login['access_token_secret']
        )

        # Create tweet
        response = client.create_tweet(text=text)

        if response and response.data:
            tweet_id = response.data['id']
            return {
                "success": True,
                "tweet_id": tweet_id,
                "url": f"https://twitter.com/user/status/{tweet_id}"
            }
        else:
            return {"error": "Failed to create tweet: No response data"}

    except tweepy.TweepyException as e:
        logger.error(f"Twitter API error: {str(e)}")
        return {"error": f"Twitter API error: {str(e)}"}
    except Exception as e:
        logger.error(f"Unexpected error creating tweet: {str(e)}")
        return {"error": f"Failed to create tweet: {str(e)}"}