|
import re |
|
import requests |
|
import chardet |
|
import config as cfg |
|
from bs4 import BeautifulSoup |
|
from pathlib import Path |
|
from transformers import AutoTokenizer |
|
from duckduckgo_search import DDGS |
|
|
|
|
|
def log_in(uid, state): |
|
state['chat_history'] = [] |
|
state['thinking_history'] = '' |
|
state['uid'] = uid |
|
if uid!=0: |
|
response = f"Your Log In UID: {uid}" |
|
else: |
|
response = f"You Are Not Logged In Yet, Use Public Directory" |
|
user_dir = Path(cfg.USER_DIR) / str(uid) |
|
user_dir.mkdir(parents=True, exist_ok=True) |
|
state['user_dir'] = user_dir |
|
|
|
|
|
state['available_history'] = [] |
|
for json_file in user_dir.rglob("*.json"): |
|
state['available_history'].append(json_file.stem) |
|
|
|
return response, state |
|
|
|
def clean_response(response_content): |
|
response_content = re.sub(r'\*\*|__', '', response_content) |
|
response_content = re.sub(r'\\\(|\\\)|\\\[|\\\]', '', response_content) |
|
response_content = re.sub(r'\\boxed\{([^}]*)\}', r'\1', response_content) |
|
response_content = re.sub(r'\\\\', '', response_content) |
|
response_content = re.sub(r'\n\s*\n', '\n\n', response_content) |
|
response_content = re.sub(r'\s+', ' ', response_content) |
|
return response_content.strip() |
|
|
|
def parse_output(response_content): |
|
cleaned_content = clean_response(response_content) |
|
if "<think>" in cleaned_content and "</think>" in cleaned_content: |
|
split_pattern = r'<think>|</think>' |
|
parts = re.split(split_pattern, cleaned_content) |
|
return parts[1], parts[2] |
|
return None, cleaned_content |
|
|
|
|
|
def parse_chat_history(chat_history): |
|
"""从保存的历史会话中解析出chatbot可以识别的格式 |
|
chat_history示例: |
|
[ |
|
{ |
|
"role": "user", |
|
"content": "hello" |
|
}, |
|
{ |
|
"role": "assistant", |
|
"content": " Hello! How can I assist you today? 😊" |
|
} |
|
] |
|
Args: |
|
chat_history (list): _description_ |
|
""" |
|
from gradio import Warning |
|
try: |
|
assert len(chat_history) % 2 == 0 |
|
except AssertionError: |
|
Warning('历史会话可能有遗失,用户和AI的消息数不匹配,截断最后一条消息...') |
|
chat_history = chat_history[:-1] |
|
|
|
if len(chat_history) == 0: |
|
Warning('历史会话为空或无法匹配,加载历史会话失败...') |
|
return [] |
|
|
|
messages = [] |
|
responses = [] |
|
|
|
for conversation in chat_history: |
|
if conversation['role'] == 'user': |
|
messages.append(conversation['content']) |
|
elif conversation['role'] == 'assistant': |
|
responses.append(conversation['content']) |
|
|
|
if len(messages) != len(responses): |
|
Warning('用户和AI的消息无法匹配,加载历史会话失败...') |
|
return [] |
|
|
|
return list(zip(messages, responses)) |
|
|
|
|
|
def web_search(query: str, max_results: int = 3): |
|
"""获取网络搜索结果并提取关键内容""" |
|
try: |
|
|
|
with DDGS() as ddgs: |
|
results = [r for r in ddgs.text(query, max_results=max_results)] |
|
|
|
|
|
web_contents = [] |
|
for result in results: |
|
try: |
|
response = requests.get(result['href'], timeout=5) |
|
encoding = chardet.detect(response.content)['encoding'] |
|
if response.encoding != encoding: |
|
response.encoding = encoding |
|
soup = BeautifulSoup(response.text, 'html.parser') |
|
|
|
main_content = soup.find('main') or soup.find('article') or soup.body |
|
web_contents.append({ |
|
'title': result['title'], |
|
'content': main_content.get_text(separator=' ', strip=True)[:1000] |
|
}) |
|
except Exception as e: |
|
print('该结果搜索异常...', e) |
|
continue |
|
return web_contents |
|
except Exception as e: |
|
print('网络搜索异常,返回空...', e) |
|
return [] |
|
|
|
|
|
def parse_net_search(search_res): |
|
res = [] |
|
for item in search_res: |
|
if len(item['content']) > 0: |
|
res.append(f"标题:\n{item['title']}\n内容:\n{item['content']}\n") |
|
return res |
|
|
|
|
|
def wash_up_content(doc_score): |
|
"""_summary_ |
|
|
|
Args: |
|
doc (string): 整个文档的内容,可能有多行,用换行符分割 |
|
""" |
|
if isinstance(doc_score, tuple): |
|
doc, score = doc_score |
|
else: |
|
doc = doc_score |
|
score = None |
|
res = list(filter(lambda x: len(x) > 0, doc.split('\n'))) |
|
prefix = '✅<br>"' if score is None else '✅Recall with score:{:.3f}<br>'.format(score) |
|
res[0] = prefix + res[0] |
|
return res |
|
|
|
|
|
MODEL_HF_MAPPING = { |
|
"qwen2.5:14b-instruct": "Qwen/Qwen2.5-14B-Instruct", |
|
"qwen2.5:32b-instruct": "Qwen/Qwen2.5-32B-Instruct", |
|
"qwen2.5:7b-instruct": "Qwen/Qwen2.5-7B-Instruct", |
|
"qwen2.5:3b-instruct": "Qwen/Qwen2.5-3B-Instruct", |
|
"qwen2.5:0.5b-instruct": "Qwen/Qwen2.5-0.5B-Instruct", |
|
"qwen2.5:0.5b": "Qwen/Qwen2.5-0.5B", |
|
"qwen2.5:32b": "Qwen/Qwen2.5-32B", |
|
"qwen3:32b": "Qwen/Qwen3-32B", |
|
"qwen3:14b": "Qwen/Qwen3-14B", |
|
"qwen3:4b": "Qwen/Qwen3-4B", |
|
"qwen3:8b": "Qwen/Qwen3-8B", |
|
"qwen3:30b-a3b": "Qwen/Qwen3-30B-A3B", |
|
"qwq": "Qwen/QwQ-32B", |
|
"deepseek-r1:14b":"deepseek-ai/DeepSeek-R1-Distill-Qwen-14B", |
|
"deepseek-r1:7b": "deepseek-ai/DeepSeek-R1-Distill-Qwen-7B", |
|
"deepseek-r1:32b": "deepseek-ai/DeepSeek-R1-Distill-Qwen-32B", |
|
|
|
} |
|
|
|
def load_tokenizer(model_name): |
|
hf_model_name = MODEL_HF_MAPPING.get(model_name, model_name) |
|
return AutoTokenizer.from_pretrained(hf_model_name, use_fast=True) |
|
|
|
def messages_to_prompt(messages): |
|
|
|
prompt = "" |
|
for msg in messages: |
|
prompt += f"{msg['role']}: {msg['content']}\n" |
|
return prompt.strip() |
|
|
|
def count_tokens_local(messages, tokenizer): |
|
prompt = messages_to_prompt(messages) |
|
return len(tokenizer(prompt, return_tensors=None, truncation=False)["input_ids"]) |
|
|
|
|
|
def concate_metadata(metadata): |
|
"""把Document对象的metadata的各个键值拼接起来 |
|
|
|
Args: |
|
metadata (dict): _description_ |
|
""" |
|
return '\n'.join([f"{k}: {v}" for k, v in metadata.items()]) |
|
|
|
if __name__ == "__main__": |
|
query = "今天几号" |
|
ret = web_search(query) |
|
for item in ret: |
|
print(item) |
|
ret = parse_net_search(ret) |
|
print(ret) |