Spaces:
Runtime error
Runtime error
from __future__ import annotations # type: ignore[import-not-found] | |
from subprocess import Popen, PIPE as P | |
from langchain_experimental.tools.python.tool import PythonREPLTool as PYT | |
from langchain.agents import load_tools, create_structured_chat_agent as Agent,AgentExecutor as Ex, AgentType as Type | |
from langchain.agents.agent_toolkits import create_retriever_tool as crt | |
from langchain_community.agent_toolkits import FileManagementToolkit as FMT | |
from langchain.tools import Tool,YouTubeSearchTool as YTS | |
from langchain.memory import ConversationBufferMemory as MEM,RedisChatMessageHistory as HIS | |
from langchain.schema import SystemMessage as SM,HumanMessage as HM, AIMessage as AM | |
from langchain import hub | |
import os | |
import torch | |
import importlib.util | |
import logging | |
from typing import Any, Dict, Iterator, List, Mapping, Optional | |
from langchain_core.callbacks import CallbackManagerForLLMRun | |
from langchain_core.language_models.llms import BaseLLM | |
from langchain_core.outputs import Generation, GenerationChunk, LLMResult | |
from pydantic import ConfigDict, model_validator | |
from import_utils import ( | |
IMPORT_ERROR, | |
is_ipex_available, | |
is_openvino_available, | |
is_optimum_intel_available, | |
is_optimum_intel_version, | |
) | |
DEFAULT_MODEL_ID = "gpt2" | |
DEFAULT_TASK = "text-generation" | |
VALID_TASKS = ( | |
"text2text-generation", | |
"text-generation", | |
"summarization", | |
"translation", | |
) | |
DEFAULT_BATCH_SIZE = 4 | |
_MIN_OPTIMUM_VERSION = "1.21" | |
logger = logging.getLogger(__name__) | |
class HuggingFacePipeline(BaseLLM): | |
global torch | |
"""HuggingFace Pipeline API. | |
To use, you should have the ``transformers`` python package installed. | |
Only supports `text-generation`, `text2text-generation`, `summarization` and | |
`translation` for now. | |
Example using from_model_id: | |
.. code-block:: python | |
from langchain_huggingface import HuggingFacePipeline | |
hf = HuggingFacePipeline.from_model_id( | |
model_id="gpt2", | |
task="text-generation", | |
pipeline_kwargs={"max_new_tokens": 10}, | |
) | |
Example passing pipeline in directly: | |
.. code-block:: python | |
from langchain_huggingface import HuggingFacePipeline | |
from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline | |
model_id = "gpt2" | |
tokenizer = AutoTokenizer.from_pretrained(model_id) | |
model = AutoModelForCausalLM.from_pretrained(model_id) | |
pipe = pipeline( | |
"text-generation", model=model, tokenizer=tokenizer, max_new_tokens=10 | |
) | |
hf = HuggingFacePipeline(pipeline=pipe) | |
""" | |
pipeline: Any = None #: :meta private: | |
model_id: Optional[str] = None | |
"""The model name. If not set explicitly by the user, | |
it will be inferred from the provided pipeline (if available). | |
If neither is provided, the DEFAULT_MODEL_ID will be used.""" | |
model_kwargs: Optional[dict] = None | |
"""Keyword arguments passed to the model.""" | |
pipeline_kwargs: Optional[dict] = None | |
"""Keyword arguments passed to the pipeline.""" | |
batch_size: int = DEFAULT_BATCH_SIZE | |
"""Batch size to use when passing multiple documents to generate.""" | |
model_config = ConfigDict( | |
extra="forbid", | |
) | |
def pre_init_validator(cls, values: Dict[str, Any]) -> Dict[str, Any]: | |
"""Ensure model_id is set either by pipeline or user input.""" | |
if "model_id" not in values: | |
if "pipeline" in values and values["pipeline"]: | |
values["model_id"] = values["pipeline"].model.name_or_path | |
else: | |
values["model_id"] = DEFAULT_MODEL_ID | |
return values | |
def from_model_id( | |
cls, | |
model_id: str, | |
task: str, | |
backend: str = "default", | |
device: Optional[int] = None, | |
device_map: Optional[str] = None, | |
model_kwargs: Optional[dict] = None, | |
pipeline_kwargs: Optional[dict] = None, | |
batch_size: int = DEFAULT_BATCH_SIZE, | |
**kwargs: Any, | |
) -> HuggingFacePipeline: | |
"""Construct the pipeline object from model_id and task.""" | |
try: | |
from transformers import ( # type: ignore[import] | |
AutoModelForCausalLM, | |
AutoModelForSeq2SeqLM, | |
AutoTokenizer, | |
) | |
from transformers import pipeline as hf_pipeline # type: ignore[import] | |
except ImportError: | |
raise ValueError( | |
"Could not import transformers python package. " | |
"Please install it with `pip install transformers`." | |
) | |
_model_kwargs = model_kwargs.copy() if model_kwargs else {} | |
if device_map is not None: | |
if device is not None: | |
raise ValueError( | |
"Both `device` and `device_map` are specified. " | |
"`device` will override `device_map`. " | |
"You will most likely encounter unexpected behavior." | |
"Please remove `device` and keep " | |
"`device_map`." | |
) | |
if "device_map" in _model_kwargs: | |
raise ValueError("`device_map` is already specified in `model_kwargs`.") | |
_model_kwargs["device_map"] = device_map | |
tokenizer = AutoTokenizer.from_pretrained(model_id, **_model_kwargs) | |
if backend in {"openvino", "ipex"}: | |
if task not in VALID_TASKS: | |
raise ValueError( | |
f"Got invalid task {task}, " | |
f"currently only {VALID_TASKS} are supported" | |
) | |
err_msg = f'Backend: {backend} {IMPORT_ERROR.format(f"optimum[{backend}]")}' | |
if not is_optimum_intel_available(): | |
raise ImportError(err_msg) | |
# TODO: upgrade _MIN_OPTIMUM_VERSION to 1.22 after release | |
min_optimum_version = ( | |
"1.22" | |
if backend == "ipex" and task != "text-generation" | |
else _MIN_OPTIMUM_VERSION | |
) | |
if is_optimum_intel_version("<", min_optimum_version): | |
raise ImportError( | |
f"Backend: {backend} requires optimum-intel>=" | |
f"{min_optimum_version}. You can install it with pip: " | |
"`pip install --upgrade --upgrade-strategy eager " | |
f"`optimum[{backend}]`." | |
) | |
if backend == "openvino": | |
if not is_openvino_available(): | |
raise ImportError(err_msg) | |
from optimum.intel import ( # type: ignore[import] | |
OVModelForCausalLM, | |
OVModelForSeq2SeqLM, | |
) | |
model_cls = ( | |
OVModelForCausalLM | |
if task == "text-generation" | |
else OVModelForSeq2SeqLM | |
) | |
else: | |
if not is_ipex_available(): | |
raise ImportError(err_msg) | |
if task == "text-generation": | |
from optimum.intel import ( | |
IPEXModelForCausalLM, # type: ignore[import] | |
) | |
model_cls = IPEXModelForCausalLM | |
else: | |
from optimum.intel import ( | |
IPEXModelForSeq2SeqLM, # type: ignore[import] | |
) | |
model_cls = IPEXModelForSeq2SeqLM | |
else: | |
model_cls = ( | |
AutoModelForCausalLM | |
if task == "text-generation" | |
else AutoModelForSeq2SeqLM | |
) | |
model = model_cls.from_pretrained(model_id, **_model_kwargs) | |
model=torch.compile(model,mode="max-autotune") | |
if tokenizer.pad_token is None: | |
if model.config.pad_token_id is not None: | |
tokenizer.pad_token_id = model.config.pad_token_id | |
elif model.config.eos_token_id is not None and isinstance( | |
model.config.eos_token_id, int | |
): | |
tokenizer.pad_token_id = model.config.eos_token_id | |
elif tokenizer.eos_token_id is not None: | |
tokenizer.pad_token_id = tokenizer.eos_token_id | |
else: | |
tokenizer.add_special_tokens({"pad_token": "[PAD]"}) | |
if ( | |
( | |
getattr(model, "is_loaded_in_4bit", False) | |
or getattr(model, "is_loaded_in_8bit", False) | |
) | |
and device is not None | |
and backend == "default" | |
): | |
logger.warning( | |
f"Setting the `device` argument to None from {device} to avoid " | |
"the error caused by attempting to move the model that was already " | |
"loaded on the GPU using the Accelerate module to the same or " | |
"another device." | |
) | |
device = None | |
if ( | |
device is not None | |
and importlib.util.find_spec("torch") is not None | |
and backend == "default" | |
): | |
import torch | |
cuda_device_count = torch.cuda.device_count() | |
if device < -1 or (device >= cuda_device_count): | |
raise ValueError( | |
f"Got device=={device}, " | |
f"device is required to be within [-1, {cuda_device_count})" | |
) | |
if device_map is not None and device < 0: | |
device = None | |
if device is not None and device < 0 and cuda_device_count > 0: | |
logger.warning( | |
"Device has %d GPUs available. " | |
"Provide device={deviceId} to `from_model_id` to use available" | |
"GPUs for execution. deviceId is -1 (default) for CPU and " | |
"can be a positive integer associated with CUDA device id.", | |
cuda_device_count, | |
) | |
if device is not None and device_map is not None and backend == "openvino": | |
logger.warning("Please set device for OpenVINO through: `model_kwargs`") | |
if "trust_remote_code" in _model_kwargs: | |
_model_kwargs = { | |
k: v for k, v in _model_kwargs.items() if k != "trust_remote_code" | |
} | |
_pipeline_kwargs = pipeline_kwargs or {} | |
pipeline = hf_pipeline( | |
task=task, | |
model=model, | |
tokenizer=tokenizer, | |
device=device, | |
batch_size=batch_size, | |
model_kwargs=_model_kwargs, | |
**_pipeline_kwargs, | |
) | |
if pipeline.task not in VALID_TASKS: | |
raise ValueError( | |
f"Got invalid task {pipeline.task}, " | |
f"currently only {VALID_TASKS} are supported" | |
) | |
return cls( | |
pipeline=pipeline, | |
model_id=model_id, | |
model_kwargs=_model_kwargs, | |
pipeline_kwargs=_pipeline_kwargs, | |
batch_size=batch_size, | |
**kwargs, | |
) | |
def _identifying_params(self) -> Mapping[str, Any]: | |
"""Get the identifying parameters.""" | |
return { | |
"model_id": self.model_id, | |
"model_kwargs": self.model_kwargs, | |
"pipeline_kwargs": self.pipeline_kwargs, | |
} | |
def _llm_type(self) -> str: | |
return "huggingface_pipeline" | |
def _generate( | |
self, | |
prompts: List[str], | |
stop: Optional[List[str]] = None, | |
run_manager: Optional[CallbackManagerForLLMRun] = None, | |
**kwargs: Any, | |
) -> LLMResult: | |
# List to hold all results | |
text_generations: List[str] = [] | |
pipeline_kwargs = kwargs.get("pipeline_kwargs", {}) | |
skip_prompt = kwargs.get("skip_prompt", False) | |
for i in range(0, len(prompts), self.batch_size): | |
batch_prompts = prompts[i : i + self.batch_size] | |
# Process batch of prompts | |
responses = self.pipeline( | |
batch_prompts, | |
**pipeline_kwargs, | |
) | |
# Process each response in the batch | |
for j, response in enumerate(responses): | |
if isinstance(response, list): | |
# if model returns multiple generations, pick the top one | |
response = response[0] | |
if self.pipeline.task == "text-generation": | |
text = response["generated_text"] | |
elif self.pipeline.task == "text2text-generation": | |
text = response["generated_text"] | |
elif self.pipeline.task == "summarization": | |
text = response["summary_text"] | |
elif self.pipeline.task in "translation": | |
text = response["translation_text"] | |
else: | |
raise ValueError( | |
f"Got invalid task {self.pipeline.task}, " | |
f"currently only {VALID_TASKS} are supported" | |
) | |
if skip_prompt: | |
text = text[len(batch_prompts[j]) :] | |
# Append the processed text to results | |
text_generations.append(text) | |
return LLMResult( | |
generations=[[Generation(text=text)] for text in text_generations] | |
) | |
def _stream( | |
self, | |
prompt: str, | |
stop: Optional[List[str]] = None, | |
run_manager: Optional[CallbackManagerForLLMRun] = None, | |
**kwargs: Any, | |
) -> Iterator[GenerationChunk]: | |
from threading import Thread | |
import torch | |
from transformers import ( | |
StoppingCriteria, | |
StoppingCriteriaList, | |
TextIteratorStreamer, | |
) | |
pipeline_kwargs = kwargs.get("pipeline_kwargs", {}) | |
skip_prompt = kwargs.get("skip_prompt", True) | |
if stop is not None: | |
stop = self.pipeline.tokenizer.convert_tokens_to_ids(stop) | |
stopping_ids_list = stop or [] | |
class StopOnTokens(StoppingCriteria): | |
def __call__( | |
self, | |
input_ids: torch.LongTensor, | |
scores: torch.FloatTensor, | |
**kwargs: Any, | |
) -> bool: | |
for stop_id in stopping_ids_list: | |
if input_ids[0][-1] == stop_id: | |
return True | |
return False | |
stopping_criteria = StoppingCriteriaList([StopOnTokens()]) | |
streamer = TextIteratorStreamer( | |
self.pipeline.tokenizer, | |
timeout=60.0, | |
skip_prompt=skip_prompt, | |
skip_special_tokens=True, | |
) | |
generation_kwargs = dict( | |
text_inputs=prompt, | |
streamer=streamer, | |
stopping_criteria=stopping_criteria, | |
**pipeline_kwargs, | |
) | |
t1 = Thread(target=self.pipeline, kwargs=generation_kwargs) | |
t1.start() | |
for char in streamer: | |
chunk = GenerationChunk(text=char) | |
if run_manager: | |
run_manager.on_llm_new_token(chunk.text, chunk=chunk) | |
yield chunk | |
from langchain_core.prompts.chat import ChatPromptTemplate, MessagesPlaceholder | |
system = '''Respond to the human as helpfully and accurately as possible. You have access to the following tools: | |
{tools} | |
Use a json blob to specify a tool by providing an action key (tool name) and an action_input key (tool input). | |
Valid "action" values: "Final Answer" or {tool_names} | |
Provide only ONE action per $JSON_BLOB, as shown: | |
``` | |
{{ | |
"action": $TOOL_NAME, | |
"action_input": $INPUT | |
}} | |
``` | |
Follow this format: | |
Question: input question to answer | |
Thought: consider previous and subsequent steps | |
Action: | |
``` | |
$JSON_BLOB | |
``` | |
Observation: action result | |
(repeat Thought/Action/Observation N times) | |
Thought: I know what to respond | |
Action: | |
``` | |
{{ | |
"action": "Final Answer", | |
"action_input": "Final response to human" | |
}} | |
Begin! Reminder to ALWAYS respond with a valid json blob of a single action. Use tools if necessary. Respond directly if appropriate. Format is Action:```$JSON_BLOB```then Observation''' | |
human = ''' | |
{input} | |
{agent_scratchpad} | |
(reminder to respond in a JSON blob no matter what)''' | |
prompt = ChatPromptTemplate.from_messages( | |
[ | |
("system", system), | |
MessagesPlaceholder("chat_history", optional=True), | |
("human", human), | |
] | |
) | |
from typing import Any, Dict, List, Optional | |
from langchain_core.language_models import BaseChatModel | |
from langchain_core.messages import AIMessage, BaseMessage, HumanMessage | |
from langchain_core.outputs import ChatResult, ChatGeneration | |
from langchain_core.callbacks.manager import CallbackManagerForLLMRun | |
from langchain_core.callbacks.manager import AsyncCallbackManagerForLLMRun | |
from langchain_core.runnables import run_in_executor | |
from transformers import AutoTokenizer, AutoModelForCausalLM | |
#from transformers import pipeline,AutoModelForCausalLM as M,AutoTokenizer as T | |
#m=M.from_pretrained("peterpeter8585/syai4.3") | |
#t=T.from_pretrained("peterpeter8585/syai4.3") | |
#pipe=pipeline(model=m,tokenizer=t,task="text-generation") | |
llm=HuggingFacePipeline.from_model_id(model_id="peterpeter8585/deepseek_1",task="text-generation") | |
from langchain.retrievers import WikipediaRetriever as Wiki | |
import gradio as gr | |
chatbot = gr.Chatbot( | |
label="SYAI4.1", | |
show_copy_button=True, | |
layout="panel" | |
) | |
def terminal(c): | |
a=Popen(c,shell=True,stdin=P,stdout=P,stderr=P) | |
return a.stdout.read()+a.stderr.read() | |
tools=FMT().get_tools() | |
tools.append(PYT()) | |
tools.append(YTS()) | |
tools.extend(load_tools(["requests_all"],allow_dangerous_tools=True)) | |
tools.extend(load_tools(["llm-math","ddg-search"],llm=llm)) | |
tools.append(Tool.from_function(func=terminal,name="terminal",description="터미널 명령어실행에 적합함")) | |
tools.append(crt(name="wiki",description="위키 백과를 검색하여 정보를 가져온다",retriever=Wiki(lang="ko",top_k_results=1))) | |
def chat(message, | |
history: list[tuple[str, str]], | |
system_message, | |
max_tokens, | |
temperature, | |
top_p, chat_session): | |
messages=[SM(content=system_message+"And, Your name is Chatchat")] | |
for val in history: | |
if val[0]: | |
messages.append(HM(content=val[0])) | |
if val[1]: | |
messages.append(AM(content=val[1])) | |
messages.append(HM(content=message)) | |
memory=MEM(memory_key="history") | |
agent=Ex(agent=Agent(llm,tools,prompt),tools=tools,verbose=True,handle_parsing_errors=True,memory=memory) | |
return agent.invoke({"input":messages,"chat_history":memory.buffer_as_messages}) | |
ai1=gr.ChatInterface( | |
chat, | |
chatbot=chatbot, | |
additional_inputs=[ | |
gr.Textbox(value="You are a helpful assistant.", label="System message", interactive=True), | |
gr.Slider(minimum=1, maximum=2048, value=512, step=1, label="Max new tokens"), | |
gr.Slider(minimum=0.1, maximum=4.0, value=0.1, step=0.1, label="Temperature"), | |
gr.Slider( | |
minimum=0.1, | |
maximum=1.0, | |
value=0.1, | |
step=0.05, | |
label="Top-p (nucleus sampling)", | |
), | |
gr.Textbox(label="chat_id(please enter the chat id!)") | |
], | |
) | |
ai1.launch() |