File size: 6,676 Bytes
49e5e54 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 |
import re
import requests
import chardet
import config as cfg
from bs4 import BeautifulSoup
from pathlib import Path
from transformers import AutoTokenizer
from duckduckgo_search import DDGS
def log_in(uid, state):
state['chat_history'] = []
state['thinking_history'] = ''
state['uid'] = uid
if uid!=0:
response = f"Your Log In UID: {uid}"
else:
response = f"You Are Not Logged In Yet, Use Public Directory"
user_dir = Path(cfg.USER_DIR) / str(uid)
user_dir.mkdir(parents=True, exist_ok=True)
state['user_dir'] = user_dir
# 加载历史会话
state['available_history'] = []
for json_file in user_dir.rglob("*.json"):
state['available_history'].append(json_file.stem)
return response, state
def clean_response(response_content):
response_content = re.sub(r'\*\*|__', '', response_content)
response_content = re.sub(r'\\\(|\\\)|\\\[|\\\]', '', response_content)
response_content = re.sub(r'\\boxed\{([^}]*)\}', r'\1', response_content)
response_content = re.sub(r'\\\\', '', response_content)
response_content = re.sub(r'\n\s*\n', '\n\n', response_content)
response_content = re.sub(r'\s+', ' ', response_content)
return response_content.strip()
def parse_output(response_content):
cleaned_content = clean_response(response_content)
if "<think>" in cleaned_content and "</think>" in cleaned_content:
split_pattern = r'<think>|</think>'
parts = re.split(split_pattern, cleaned_content)
return parts[1], parts[2]
return None, cleaned_content
def parse_chat_history(chat_history):
"""从保存的历史会话中解析出chatbot可以识别的格式
chat_history示例:
[
{
"role": "user",
"content": "hello"
},
{
"role": "assistant",
"content": " Hello! How can I assist you today? 😊"
}
]
Args:
chat_history (list): _description_
"""
from gradio import Warning
try:
assert len(chat_history) % 2 == 0
except AssertionError:
Warning('历史会话可能有遗失,用户和AI的消息数不匹配,截断最后一条消息...')
chat_history = chat_history[:-1]
if len(chat_history) == 0:
Warning('历史会话为空或无法匹配,加载历史会话失败...')
return []
messages = []
responses = []
for conversation in chat_history:
if conversation['role'] == 'user':
messages.append(conversation['content'])
elif conversation['role'] == 'assistant':
responses.append(conversation['content'])
if len(messages) != len(responses):
Warning('用户和AI的消息无法匹配,加载历史会话失败...')
return []
return list(zip(messages, responses))
def web_search(query: str, max_results: int = 3):
"""获取网络搜索结果并提取关键内容"""
try:
# 获取搜索结果链接
with DDGS() as ddgs:
results = [r for r in ddgs.text(query, max_results=max_results)]
# 提取网页正文
web_contents = []
for result in results:
try:
response = requests.get(result['href'], timeout=5)
encoding = chardet.detect(response.content)['encoding']
if response.encoding != encoding:
response.encoding = encoding
soup = BeautifulSoup(response.text, 'html.parser')
# 提取主要文本内容(可根据网站结构调整)
main_content = soup.find('main') or soup.find('article') or soup.body
web_contents.append({
'title': result['title'],
'content': main_content.get_text(separator=' ', strip=True)[:1000] # 限制长度
})
except Exception as e:
print('该结果搜索异常...', e)
continue
return web_contents
except Exception as e:
print('网络搜索异常,返回空...', e)
return []
def parse_net_search(search_res):
res = []
for item in search_res:
if len(item['content']) > 0:
res.append(f"标题:\n{item['title']}\n内容:\n{item['content']}\n")
return res
def wash_up_content(doc_score):
"""_summary_
Args:
doc (string): 整个文档的内容,可能有多行,用换行符分割
"""
if isinstance(doc_score, tuple):
doc, score = doc_score
else:
doc = doc_score
score = None
res = list(filter(lambda x: len(x) > 0, doc.split('\n')))
prefix = '✅<br>"' if score is None else '✅Recall with score:{:.3f}<br>'.format(score)
res[0] = prefix + res[0]
return res
MODEL_HF_MAPPING = {
"qwen2.5:14b-instruct": "Qwen/Qwen2.5-14B-Instruct",
"qwen2.5:32b-instruct": "Qwen/Qwen2.5-32B-Instruct",
"qwen2.5:7b-instruct": "Qwen/Qwen2.5-7B-Instruct",
"qwen2.5:3b-instruct": "Qwen/Qwen2.5-3B-Instruct",
"qwen2.5:0.5b-instruct": "Qwen/Qwen2.5-0.5B-Instruct",
"qwen2.5:0.5b": "Qwen/Qwen2.5-0.5B",
"qwen2.5:32b": "Qwen/Qwen2.5-32B",
"qwen3:32b": "Qwen/Qwen3-32B",
"qwen3:14b": "Qwen/Qwen3-14B",
"qwen3:4b": "Qwen/Qwen3-4B",
"qwen3:8b": "Qwen/Qwen3-8B",
"qwen3:30b-a3b": "Qwen/Qwen3-30B-A3B",
"qwq": "Qwen/QwQ-32B",
"deepseek-r1:14b":"deepseek-ai/DeepSeek-R1-Distill-Qwen-14B",
"deepseek-r1:7b": "deepseek-ai/DeepSeek-R1-Distill-Qwen-7B",
"deepseek-r1:32b": "deepseek-ai/DeepSeek-R1-Distill-Qwen-32B",
}
def load_tokenizer(model_name):
hf_model_name = MODEL_HF_MAPPING.get(model_name, model_name)
return AutoTokenizer.from_pretrained(hf_model_name, use_fast=True)
def messages_to_prompt(messages):
# 按模型要求的格式拼接(此处为示例,需根据实际模型调整)
prompt = ""
for msg in messages:
prompt += f"{msg['role']}: {msg['content']}\n"
return prompt.strip()
def count_tokens_local(messages, tokenizer):
prompt = messages_to_prompt(messages)
return len(tokenizer(prompt, return_tensors=None, truncation=False)["input_ids"])
def concate_metadata(metadata):
"""把Document对象的metadata的各个键值拼接起来
Args:
metadata (dict): _description_
"""
return '\n'.join([f"{k}: {v}" for k, v in metadata.items()])
if __name__ == "__main__":
query = "今天几号"
ret = web_search(query)
for item in ret:
print(item)
ret = parse_net_search(ret)
print(ret) |