Spaces:
Running
Running
import os | |
import time | |
import logging | |
import requests | |
from apscheduler.schedulers.background import BackgroundScheduler | |
# 配置日志记录 | |
logging.basicConfig(level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - %(message)s') | |
API_ENDPOINT = "https://api.siliconflow.cn/v1/usage/status" | |
def get_credit_usage(key): | |
""" | |
获取单个 key 的额度使用情况。 | |
Args: | |
key: 要查询的 API key。 | |
Returns: | |
包含额度信息的字典,如果出错则返回 None。 | |
""" | |
headers = { | |
"Authorization": f"Bearer {key}" | |
} | |
try: | |
response = requests.get(API_ENDPOINT, headers=headers) | |
response.raise_for_status() # 检查请求是否成功 | |
data = response.json() | |
return data | |
except requests.exceptions.RequestException as e: | |
logging.error(f"请求 API 时出错,key:{key}, 错误信息:{e}") | |
except ValueError as e: | |
logging.error(f"解析 JSON 响应时出错,key:{key}, 错误信息:{e}") | |
return None | |
def load_keys_and_get_usage(): | |
""" | |
从环境变量中加载 keys,并获取每个 key 的额度使用情况,然后记录到日志中。 | |
""" | |
keys_str = os.environ.get("KEYS") | |
if keys_str: | |
keys = [key.strip() for key in keys_str.split(',')] | |
logging.info(f"加载的 keys:{keys}") | |
for key in keys: | |
usage_data = get_credit_usage(key) | |
if usage_data: | |
logging.info(f"Key: {key}, 额度信息: {usage_data}") | |
else: | |
logging.warning(f"获取 key:{key} 的额度信息失败。") | |
else: | |
logging.warning("环境变量 KEYS 未设置。") | |
# 创建一个后台调度器 | |
scheduler = BackgroundScheduler() | |
# 添加定时任务,每小时执行一次 load_keys_and_get_usage 函数 | |
scheduler.add_job(load_keys_and_get_usage, 'interval', hours=1) | |
# 启动调度器 | |
scheduler.start() | |
# 为了保持程序运行,添加一个循环 | |
if __name__ == '__main__': | |
try: | |
while True: | |
time.sleep(120) | |
except (KeyboardInterrupt, SystemExit): | |
scheduler.shutdown() | |