anpigon's picture
add langchain docs
ed4d993
raw
history blame
14.2 kB
import asyncio
import logging
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Dict, List, Mapping, Optional, Sequence, TypedDict
import aiohttp
import requests
from langchain_core.callbacks import (
AsyncCallbackManagerForLLMRun,
CallbackManagerForLLMRun,
)
from langchain_core.language_models.llms import BaseLLM
from langchain_core.outputs import Generation, LLMResult
from langchain_core.pydantic_v1 import Extra, Field, root_validator
from langchain_core.utils import get_from_dict_or_env
from langchain_community.llms.utils import enforce_stop_tokens
class TrainResult(TypedDict):
"""Train result."""
loss: float
class GradientLLM(BaseLLM):
"""Gradient.ai LLM Endpoints.
GradientLLM is a class to interact with LLMs on gradient.ai
To use, set the environment variable ``GRADIENT_ACCESS_TOKEN`` with your
API token and ``GRADIENT_WORKSPACE_ID`` for your gradient workspace,
or alternatively provide them as keywords to the constructor of this class.
Example:
.. code-block:: python
from langchain_community.llms import GradientLLM
GradientLLM(
model="99148c6d-c2a0-4fbe-a4a7-e7c05bdb8a09_base_ml_model",
model_kwargs={
"max_generated_token_count": 128,
"temperature": 0.75,
"top_p": 0.95,
"top_k": 20,
"stop": [],
},
gradient_workspace_id="12345614fc0_workspace",
gradient_access_token="gradientai-access_token",
)
"""
model_id: str = Field(alias="model", min_length=2)
"Underlying gradient.ai model id (base or fine-tuned)."
gradient_workspace_id: Optional[str] = None
"Underlying gradient.ai workspace_id."
gradient_access_token: Optional[str] = None
"""gradient.ai API Token, which can be generated by going to
https://auth.gradient.ai/select-workspace
and selecting "Access tokens" under the profile drop-down.
"""
model_kwargs: Optional[dict] = None
"""Keyword arguments to pass to the model."""
gradient_api_url: str = "https://api.gradient.ai/api"
"""Endpoint URL to use."""
aiosession: Optional[aiohttp.ClientSession] = None #: :meta private:
"""ClientSession, private, subject to change in upcoming releases."""
# LLM call kwargs
class Config:
"""Configuration for this pydantic object."""
allow_population_by_field_name = True
extra = Extra.forbid
@root_validator(allow_reuse=True)
def validate_environment(cls, values: Dict) -> Dict:
"""Validate that api key and python package exists in environment."""
values["gradient_access_token"] = get_from_dict_or_env(
values, "gradient_access_token", "GRADIENT_ACCESS_TOKEN"
)
values["gradient_workspace_id"] = get_from_dict_or_env(
values, "gradient_workspace_id", "GRADIENT_WORKSPACE_ID"
)
if (
values["gradient_access_token"] is None
or len(values["gradient_access_token"]) < 10
):
raise ValueError("env variable `GRADIENT_ACCESS_TOKEN` must be set")
if (
values["gradient_workspace_id"] is None
or len(values["gradient_access_token"]) < 3
):
raise ValueError("env variable `GRADIENT_WORKSPACE_ID` must be set")
if values["model_kwargs"]:
kw = values["model_kwargs"]
if not 0 <= kw.get("temperature", 0.5) <= 1:
raise ValueError("`temperature` must be in the range [0.0, 1.0]")
if not 0 <= kw.get("top_p", 0.5) <= 1:
raise ValueError("`top_p` must be in the range [0.0, 1.0]")
if 0 >= kw.get("top_k", 0.5):
raise ValueError("`top_k` must be positive")
if 0 >= kw.get("max_generated_token_count", 1):
raise ValueError("`max_generated_token_count` must be positive")
values["gradient_api_url"] = get_from_dict_or_env(
values, "gradient_api_url", "GRADIENT_API_URL"
)
try:
import gradientai # noqa
except ImportError:
logging.warning(
"DeprecationWarning: `GradientLLM` will use "
"`pip install gradientai` in future releases of langchain."
)
except Exception:
pass
return values
@property
def _identifying_params(self) -> Mapping[str, Any]:
"""Get the identifying parameters."""
_model_kwargs = self.model_kwargs or {}
return {
**{"gradient_api_url": self.gradient_api_url},
**{"model_kwargs": _model_kwargs},
}
@property
def _llm_type(self) -> str:
"""Return type of llm."""
return "gradient"
def _kwargs_post_fine_tune_request(
self, inputs: Sequence[str], kwargs: Mapping[str, Any]
) -> Mapping[str, Any]:
"""Build the kwargs for the Post request, used by sync
Args:
prompt (str): prompt used in query
kwargs (dict): model kwargs in payload
Returns:
Dict[str, Union[str,dict]]: _description_
"""
_model_kwargs = self.model_kwargs or {}
_params = {**_model_kwargs, **kwargs}
multipliers = _params.get("multipliers", None)
return dict(
url=f"{self.gradient_api_url}/models/{self.model_id}/fine-tune",
headers={
"authorization": f"Bearer {self.gradient_access_token}",
"x-gradient-workspace-id": f"{self.gradient_workspace_id}",
"accept": "application/json",
"content-type": "application/json",
},
json=dict(
samples=tuple(
{
"inputs": input,
}
for input in inputs
)
if multipliers is None
else tuple(
{
"inputs": input,
"fineTuningParameters": {
"multiplier": multiplier,
},
}
for input, multiplier in zip(inputs, multipliers)
),
),
)
def _kwargs_post_request(
self, prompt: str, kwargs: Mapping[str, Any]
) -> Mapping[str, Any]:
"""Build the kwargs for the Post request, used by sync
Args:
prompt (str): prompt used in query
kwargs (dict): model kwargs in payload
Returns:
Dict[str, Union[str,dict]]: _description_
"""
_model_kwargs = self.model_kwargs or {}
_params = {**_model_kwargs, **kwargs}
return dict(
url=f"{self.gradient_api_url}/models/{self.model_id}/complete",
headers={
"authorization": f"Bearer {self.gradient_access_token}",
"x-gradient-workspace-id": f"{self.gradient_workspace_id}",
"accept": "application/json",
"content-type": "application/json",
},
json=dict(
query=prompt,
maxGeneratedTokenCount=_params.get("max_generated_token_count", None),
temperature=_params.get("temperature", None),
topK=_params.get("top_k", None),
topP=_params.get("top_p", None),
),
)
def _call(
self,
prompt: str,
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> str:
"""Call to Gradients API `model/{id}/complete`.
Args:
prompt: The prompt to pass into the model.
stop: Optional list of stop words to use when generating.
Returns:
The string generated by the model.
"""
try:
response = requests.post(**self._kwargs_post_request(prompt, kwargs))
if response.status_code != 200:
raise Exception(
f"Gradient returned an unexpected response with status "
f"{response.status_code}: {response.text}"
)
except requests.exceptions.RequestException as e:
raise Exception(f"RequestException while calling Gradient Endpoint: {e}")
text = response.json()["generatedOutput"]
if stop is not None:
# Apply stop tokens when making calls to Gradient
text = enforce_stop_tokens(text, stop)
return text
async def _acall(
self,
prompt: str,
stop: Optional[List[str]] = None,
run_manager: Optional[AsyncCallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> str:
"""Async Call to Gradients API `model/{id}/complete`.
Args:
prompt: The prompt to pass into the model.
stop: Optional list of stop words to use when generating.
Returns:
The string generated by the model.
"""
if not self.aiosession:
async with aiohttp.ClientSession() as session:
async with session.post(
**self._kwargs_post_request(prompt=prompt, kwargs=kwargs)
) as response:
if response.status != 200:
raise Exception(
f"Gradient returned an unexpected response with status "
f"{response.status}: {response.text}"
)
text = (await response.json())["generatedOutput"]
else:
async with self.aiosession.post(
**self._kwargs_post_request(prompt=prompt, kwargs=kwargs)
) as response:
if response.status != 200:
raise Exception(
f"Gradient returned an unexpected response with status "
f"{response.status}: {response.text}"
)
text = (await response.json())["generatedOutput"]
if stop is not None:
# Apply stop tokens when making calls to Gradient
text = enforce_stop_tokens(text, stop)
return text
def _generate(
self,
prompts: List[str],
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> LLMResult:
"""Run the LLM on the given prompt and input."""
# same thing with threading
def _inner_generate(prompt: str) -> List[Generation]:
return [
Generation(
text=self._call(
prompt=prompt, stop=stop, run_manager=run_manager, **kwargs
)
)
]
if len(prompts) <= 1:
generations = list(map(_inner_generate, prompts))
else:
with ThreadPoolExecutor(min(8, len(prompts))) as p:
generations = list(p.map(_inner_generate, prompts))
return LLMResult(generations=generations)
async def _agenerate(
self,
prompts: List[str],
stop: Optional[List[str]] = None,
run_manager: Optional[AsyncCallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> LLMResult:
"""Run the LLM on the given prompt and input."""
generations = []
for generation in asyncio.gather(
[self._acall(prompt, stop=stop, run_manager=run_manager, **kwargs)]
for prompt in prompts
):
generations.append([Generation(text=generation)])
return LLMResult(generations=generations)
def train_unsupervised(
self,
inputs: Sequence[str],
**kwargs: Any,
) -> TrainResult:
try:
response = requests.post(
**self._kwargs_post_fine_tune_request(inputs, kwargs)
)
if response.status_code != 200:
raise Exception(
f"Gradient returned an unexpected response with status "
f"{response.status_code}: {response.text}"
)
except requests.exceptions.RequestException as e:
raise Exception(f"RequestException while calling Gradient Endpoint: {e}")
response_json = response.json()
loss = response_json["sumLoss"] / response_json["numberOfTrainableTokens"]
return TrainResult(loss=loss)
async def atrain_unsupervised(
self,
inputs: Sequence[str],
**kwargs: Any,
) -> TrainResult:
if not self.aiosession:
async with aiohttp.ClientSession() as session:
async with session.post(
**self._kwargs_post_fine_tune_request(inputs, kwargs)
) as response:
if response.status != 200:
raise Exception(
f"Gradient returned an unexpected response with status "
f"{response.status}: {response.text}"
)
response_json = await response.json()
loss = (
response_json["sumLoss"]
/ response_json["numberOfTrainableTokens"]
)
else:
async with self.aiosession.post(
**self._kwargs_post_fine_tune_request(inputs, kwargs)
) as response:
if response.status != 200:
raise Exception(
f"Gradient returned an unexpected response with status "
f"{response.status}: {response.text}"
)
response_json = await response.json()
loss = (
response_json["sumLoss"] / response_json["numberOfTrainableTokens"]
)
return TrainResult(loss=loss)