Source code for bigdl.llm.langchain.llms.bigdlllm

#
# Copyright 2016 The BigDL Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# This would makes sure Python is aware there is more than one sub-package within bigdl,
# physically located elsewhere.
# Otherwise there would be module not found error in non-pip's setting as Python would
# only search the first bigdl package and end up finding only one sub-package.

# This file is adapted from
# https://github.com/hwchase17/langchain/blob/master/langchain/llms/llamacpp.py

# The MIT License

# Copyright (c) Harrison Chase

# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:

# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.

# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.

import logging
import importlib
from typing import Any, Dict, Generator, List, Optional

from pydantic import Field, root_validator

from langchain.callbacks.manager import CallbackManagerForLLMRun
from langchain.llms.base import LLM
from .transformersllm import TransformersLLM


class BigdlNativeLLM(LLM):
    """Wrapper around the BigDL-LLM

    Example:
        .. code-block:: python

            from bigdl.llm.langchain.llms import BigdlNativeLLM
            llm = BigdlNativeLLM(model_path="/path/to/llama/model")
    """


    logging.warning("BigdlNativeLLM has been deprecated, "
                    "please switch to the new LLM API for sepcific models.")

    model_family: str = "llama"
    """The model family: currently supports llama, gptneox, bloom, starcoder and chatglm."""

    family_info = {
        'llama': {'module': "bigdl.llm.models" , 'class': "Llama"},
        'bloom': {'module': "bigdl.llm.models", 'class': "Bloom"},
        'gptneox': {'module': "bigdl.llm.models", 'class': "Gptneox"},
        'starcoder': {'module':"bigdl.llm.models", 'class': "Starcoder"},
        'chatglm': {'module':"bigdl.llm.ggml.model.chatglm", 'class': "ChatGLM"},
    }  #: :meta private:
    """Info necessary for different model families initiation and configure."""

    client: Any  #: :meta private:
    """The actual model."""

    model_path: str
    """Path to the converted BigDL-LLM optimized ggml binary checkpoint."""

    lora_base: Optional[str] = None
    """The path to the Llama LoRA base model."""

    lora_path: Optional[str] = None
    """The path to the Llama LoRA. If None, no LoRa is loaded."""

    n_ctx: int = Field(512, alias="n_ctx")
    """Token context window."""

    n_parts: int = Field(-1, alias="n_parts")
    """Number of parts to split the model into.
    If -1, the number of parts is automatically determined."""

    seed: int = Field(-1, alias="seed")
    """Seed. If -1, a random seed is used."""

    f16_kv: bool = Field(True, alias="f16_kv")
    """Use half-precision for key/value cache."""

    logits_all: bool = Field(False, alias="logits_all")
    """Return logits for all tokens, not just the last token."""

    vocab_only: bool = Field(False, alias="vocab_only")
    """Only load the vocabulary, no weights."""

    use_mlock: bool = Field(False, alias="use_mlock")
    """Force system to keep model in RAM."""

    n_threads: Optional[int] = Field(-1, alias="n_threads")
    """Number of threads to use."""

    n_batch: Optional[int] = Field(512, alias="n_batch")
    """Number of tokens to process in parallel.
    Should be a number between 1 and n_ctx."""

    n_gpu_layers: Optional[int] = Field(0, alias="n_gpu_layers")
    """Number of layers to be loaded into gpu memory. Default None."""

    suffix: Optional[str] = Field(None)
    """A suffix to append to the generated text. If None, no suffix is appended."""

    max_tokens: Optional[int] = 256
    """The maximum number of tokens to generate."""

    temperature: Optional[float] = 0.8
    """The temperature to use for sampling."""

    top_p: Optional[float] = 0.95
    """The top-p value to use for sampling."""

    logprobs: Optional[int] = Field(None)
    """The number of logprobs to return. If None, no logprobs are returned."""

    echo: Optional[bool] = False
    """Whether to echo the prompt."""

    stop: Optional[List[str]] = []
    """A list of strings to stop generation when encountered."""

    repeat_penalty: Optional[float] = 1.1
    """The penalty to apply to repeated tokens."""

    top_k: Optional[int] = 40
    """The top-k value to use for sampling."""

    last_n_tokens_size: Optional[int] = 64
    """The number of tokens to look back when applying the repeat_penalty."""

    use_mmap: Optional[bool] = True
    """Whether to keep the model loaded in RAM"""

    streaming: bool = True
    """Whether to stream the results, token by token."""

    @root_validator()
    def validate_environment(cls, values: Dict) -> Dict:
        """Validate that bigdl-llm is installed, family is supported"""

        model_path = values["model_path"]
        model_param_names = [
            "lora_path",
            "lora_base",
            "n_ctx",
            "n_parts",
            "seed",
            "f16_kv",
            "logits_all",
            "vocab_only",
            "use_mlock",
            "n_threads",
            "n_batch",
            "use_mmap",
            "last_n_tokens_size",
        ]
        model_params = {k: values[k] for k in model_param_names}
        # For backwards compatibility, only include if non-null.
        if values["n_gpu_layers"] is not None:
            model_params["n_gpu_layers"] = values["n_gpu_layers"]

        model_family = values["model_family"].lower()
        if model_family not in list(values["family_info"].keys()):
            raise ValueError("Model family '%s' is not supported. Valid" \
                    " values are %s"%(values["model_family"],
                    ','.join(list(values["family_info"].keys()))))

        try:

            b_info = values["family_info"][model_family]
            module = importlib.import_module(b_info['module'])
            class_ = getattr(module, b_info['class'])

            values["client"] = class_(model_path, **model_params)

        except ImportError:
            raise ModuleNotFoundError(
                "Could not import bigdl-llm library. "
                "Please install the bigdl-llm library to "
                "use this embedding model: pip install bigdl-llm"
            )
        except Exception as e:
            raise ValueError(
                f"Could not load model from path: {model_path}. "
                f"Please make sure the model family {model_family} matches "
                "the model you want to load."
                f"Received error {e}"
            )

        return values

    @property
    def _default_params(self) -> Dict[str, Any]:
        """Get the default parameters for calling llama_cpp."""
        return {
            "suffix": self.suffix,
            "max_tokens": self.max_tokens,
            "temperature": self.temperature,
            "top_p": self.top_p,
            "logprobs": self.logprobs,
            "echo": self.echo,
            "stop_sequences": self.stop,  # key here is convention among LLM classes
            "repeat_penalty": self.repeat_penalty,
            "top_k": self.top_k,
        }

    @property
    def _identifying_params(self) -> Dict[str, Any]:
        """Get the identifying parameters."""
        return {**{"model_path": self.model_path,
                   "model_family": self.model_family},
                **self._default_params}

    @property
    def _llm_type(self) -> str:
        """Return type of llm."""
        return "BigDL"

    def _get_parameters(self, stop: Optional[List[str]] = None) -> Dict[str, Any]:
        """
        Performs sanity check, preparing parameters in format needed by llama_cpp.

        Args:
            stop (Optional[List[str]]): List of stop sequences for llama_cpp.

        Returns:
            Dictionary containing the combined parameters.
        """

        # Raise error if stop sequences are in both input and default params
        if self.stop and stop is not None:
            raise ValueError("`stop` found in both the input and default params.")

        params = self._default_params

        # llama_cpp expects the "stop" key not this, so we remove it:
        params.pop("stop_sequences")

        # then sets it as configured, or default to an empty list:
        params["stop"] = self.stop or stop or []

        return params

    def _call(
        self,
        prompt: str,
        stop: Optional[List[str]] = None,
        run_manager: Optional[CallbackManagerForLLMRun] = None,
    ) -> str:
        """Call the Llama model and return the output.

        Args:
            prompt: The prompt to use for generation.
            stop: A list of strings to stop generation when encountered.

        Returns:
            The generated text.

        Example:
            .. code-block:: python

                from bigdl.llm.langchain.llms import BigdlNativeLLM
                llm = BigdlNativeLLM(model_path="/path/to/local/llama/model.bin")
                llm("This is a prompt.")
        """
        if self.streaming:
            # If streaming is enabled, we use the stream
            # method that yields as they are generated
            # and return the combined strings from the first choices's text:
            combined_text_output = ""
            for token in self.stream(prompt=prompt, stop=stop, run_manager=run_manager):
                combined_text_output += token["choices"][0]["text"]
            return combined_text_output
        else:
            params = self._get_parameters(stop)
            result = self.client(prompt=prompt, **params)
            return result["choices"][0]["text"]

    def stream(
        self,
        prompt: str,
        stop: Optional[List[str]] = None,
        run_manager: Optional[CallbackManagerForLLMRun] = None,
    ) -> Generator[Dict, None, None]:
        """Yields results objects as they are generated in real time.

        BETA: this is a beta feature while we figure out the right abstraction.
        Once that happens, this interface could change.

        It also calls the callback manager's on_llm_new_token event with
        similar parameters to the OpenAI LLM class method of the same name.

        Args:
            prompt: The prompts to pass into the model.
            stop: Optional list of stop words to use when generating.

        Returns:
            A generator representing the stream of tokens being generated.

        Yields:
            A dictionary like objects containing a string token and metadata.
            See llama-cpp-python docs and below for more.

        Example:
            .. code-block:: python

                from bigdl.llm.langchain.llms import BigdlNativeLLM
                llm = BigdlNativeLLM(
                    model_path="/path/to/local/model.bin",
                    temperature = 0.5
                )
                for chunk in llm.stream("Ask 'Hi, how are you?' like a pirate:'",
                        stop=["'","\\n"]):
                    result = chunk["choices"][0]
                    print(result["text"], end='', flush=True)

        """
        params = self._get_parameters(stop)
        result = self.client(prompt=prompt, stream=True, **params)
        for chunk in result:
            token = chunk["choices"][0]["text"]
            log_probs = chunk["choices"][0].get("logprobs", None)
            if run_manager:
                run_manager.on_llm_new_token(
                    token=token, verbose=self.verbose, log_probs=log_probs
                )
            yield chunk

    def get_num_tokens(self, text: str) -> int:
        tokenized_text = self.client.tokenize(text.encode("utf-8"))
        return len(tokenized_text)


class _BaseCausalLM(LLM):
    """Wrapper around the BigDL-LLM

    Example:
        .. code-block:: python

            from bigdl.llm.langchain.llms import LlamaLLM
            llm = LlamaLLM(model_path="/path/to/llama/model")
    """


    ggml_model: str = None
    ggml_module: str = None

    native: bool = True
    """Load model to either BigDL-LLM optimized Transformers or Native (ggml) int4."""

    client: Any  #: :meta private:
    """The actual model."""

    model_path: str
    """Path to the loading model file.
    If native, the path shoule be converted BigDL-LLM optimized ggml binary checkpoint.
    If transformers, the path should be the huggingface repo id to be downloaded
    or the huggingface checkpoint folder."""

    model_kwargs: Optional[dict] = None
    """Key word arguments passed to the Transformers model."""

    kwargs: Any
    """Additional key word arguments passed to TransformersLLM."""

    lora_base: Optional[str] = None
    """The path to the Llama LoRA base model."""

    lora_path: Optional[str] = None
    """The path to the Llama LoRA. If None, no LoRa is loaded."""

    n_ctx: int = Field(512, alias="n_ctx")
    """Token context window."""

    n_parts: int = Field(-1, alias="n_parts")
    """Number of parts to split the model into.
    If -1, the number of parts is automatically determined."""

    seed: int = Field(-1, alias="seed")
    """Seed. If -1, a random seed is used."""

    f16_kv: bool = Field(True, alias="f16_kv")
    """Use half-precision for key/value cache."""

    logits_all: bool = Field(False, alias="logits_all")
    """Return logits for all tokens, not just the last token."""

    vocab_only: bool = Field(False, alias="vocab_only")
    """Only load the vocabulary, no weights."""

    use_mlock: bool = Field(False, alias="use_mlock")
    """Force system to keep model in RAM."""

    n_threads: Optional[int] = Field(2, alias="n_threads")
    """Number of threads to use."""

    n_batch: Optional[int] = Field(512, alias="n_batch")
    """Number of tokens to process in parallel.
    Should be a number between 1 and n_ctx."""

    n_gpu_layers: Optional[int] = Field(0, alias="n_gpu_layers")
    """Number of layers to be loaded into gpu memory. Default None."""

    suffix: Optional[str] = Field(None)
    """A suffix to append to the generated text. If None, no suffix is appended."""

    max_tokens: Optional[int] = 256
    """The maximum number of tokens to generate."""

    temperature: Optional[float] = 0.8
    """The temperature to use for sampling."""

    top_p: Optional[float] = 0.95
    """The top-p value to use for sampling."""

    logprobs: Optional[int] = Field(None)
    """The number of logprobs to return. If None, no logprobs are returned."""

    echo: Optional[bool] = False
    """Whether to echo the prompt."""

    stop: Optional[List[str]] = []
    """A list of strings to stop generation when encountered."""

    repeat_penalty: Optional[float] = 1.1
    """The penalty to apply to repeated tokens."""

    top_k: Optional[int] = 40
    """The top-k value to use for sampling."""

    last_n_tokens_size: Optional[int] = 64
    """The number of tokens to look back when applying the repeat_penalty."""

    use_mmap: Optional[bool] = True
    """Whether to keep the model loaded in RAM"""

    streaming: bool = True
    """Whether to stream the results, token by token."""

    @root_validator()
    def validate_environment(cls, values: Dict) -> Dict:
        """Validate that bigdl-llm is installed, family is supported"""  

        native = values["native"]
        model_path = values["model_path"]
        model_kwargs = values["model_kwargs"]
        kwargs = values["kwargs"]
        model_param_names = [
            "lora_path",
            "lora_base",
            "n_ctx",
            "n_parts",
            "seed",
            "f16_kv",
            "logits_all",
            "vocab_only",
            "use_mlock",
            "n_threads",
            "n_batch",
            "use_mmap",
            "last_n_tokens_size",
        ]
        model_params = {k: values[k] for k in model_param_names}
        # For backwards compatibility, only include if non-null.
        if values["n_gpu_layers"] is not None:
            model_params["n_gpu_layers"] = values["n_gpu_layers"]

        try:
            module = importlib.import_module(values["ggml_module"])
            class_ = getattr(module, values["ggml_model"])
            if native:
                values["client"] = class_(model_path, **model_params)
            else:
                kwargs = {} if kwargs is None else kwargs
                values["client"] = TransformersLLM.from_model_id(model_path, model_kwargs,
                                                                 **kwargs)

        except ImportError:
            raise ModuleNotFoundError(
                "Could not import bigdl-llm library. "
                "Please install the bigdl-llm library to "
                "use this embedding model: pip install bigdl-llm"
            )
        except Exception as e:
            raise ValueError(
                f"Could not load model from path: {model_path}. "
                f"Please make sure the model embedding class matches "
                "the model you want to load."
                f"Received error {e}"
            )

        return values

    @property
    def _default_params(self) -> Dict[str, Any]:
        """Get the default parameters for calling llama_cpp."""
        return {
            "suffix": self.suffix,
            "max_tokens": self.max_tokens,
            "temperature": self.temperature,
            "top_p": self.top_p,
            "logprobs": self.logprobs,
            "echo": self.echo,
            "stop_sequences": self.stop,  # key here is convention among LLM classes
            "repeat_penalty": self.repeat_penalty,
            "top_k": self.top_k,
        }

    @property
    def _identifying_params(self) -> Dict[str, Any]:
        """Get the identifying parameters."""
        return {**{"model_path": self.model_path},
                **self._default_params}

    @property
    def _llm_type(self) -> str:
        """Return type of llm."""
        return "BigDL"

    def _get_parameters(self, stop: Optional[List[str]] = None) -> Dict[str, Any]:
        """
        Performs sanity check, preparing parameters in format needed by llama_cpp.

        Args:
            stop (Optional[List[str]]): List of stop sequences for llama_cpp.

        Returns:
            Dictionary containing the combined parameters.
        """

        # Raise error if stop sequences are in both input and default params
        if self.stop and stop is not None:
            raise ValueError("`stop` found in both the input and default params.")

        params = self._default_params

        # llama_cpp expects the "stop" key not this, so we remove it:
        params.pop("stop_sequences")

        # then sets it as configured, or default to an empty list:
        params["stop"] = self.stop or stop or []

        return params

    def _call(
        self,
        prompt: str,
        stop: Optional[List[str]] = None,
        run_manager: Optional[CallbackManagerForLLMRun] = None,
        **kwargs
    ) -> str:
        """Call the Llama model and return the output.

        Args:
            prompt: The prompt to use for generation.
            stop: A list of strings to stop generation when encountered.

        Returns:
            The generated text.

        Example:
            .. code-block:: python

                from bigdl.llm.langchain.llms import LlamaLLM
                llm = LlamaLLM(model_path="/path/to/local/llama/model.bin")
                llm("This is a prompt.")
        """
        if self.native:
            if self.streaming:
                # If streaming is enabled, we use the stream
                # method that yields as they are generated
                # and return the combined strings from the first choices's text:
                combined_text_output = ""
                for token in self.stream(prompt=prompt, stop=stop, run_manager=run_manager):
                    combined_text_output += token["choices"][0]["text"]
                return combined_text_output
            else:
                params = self._get_parameters(stop)
                result = self.client(prompt=prompt, **params)
                return result["choices"][0]["text"]
        else:
            return self.client._call(prompt, stop, run_manager, **kwargs)

    def stream(
        self,
        prompt: str,
        stop: Optional[List[str]] = None,
        run_manager: Optional[CallbackManagerForLLMRun] = None,
    ) -> Generator[Dict, None, None]:
        """Yields results objects as they are generated in real time.

        BETA: this is a beta feature while we figure out the right abstraction.
        Once that happens, this interface could change.

        It also calls the callback manager's on_llm_new_token event with
        similar parameters to the OpenAI LLM class method of the same name.

        Args:
            prompt: The prompts to pass into the model.
            stop: Optional list of stop words to use when generating.

        Returns:
            A generator representing the stream of tokens being generated.

        Yields:
            A dictionary like objects containing a string token and metadata.
            See llama-cpp-python docs and below for more.

        Example:
            .. code-block:: python

                from bigdl.llm.langchain.llms import LlamaLLM
                llm = LlamaLLM(
                    model_path="/path/to/local/model.bin",
                    temperature = 0.5
                )
                for chunk in llm.stream("Ask 'Hi, how are you?' like a pirate:'",
                        stop=["'","\\n"]):
                    result = chunk["choices"][0]
                    print(result["text"], end='', flush=True)

        """
        params = self._get_parameters(stop)
        result = self.client(prompt=prompt, stream=True, **params)
        for chunk in result:
            token = chunk["choices"][0]["text"]
            log_probs = chunk["choices"][0].get("logprobs", None)
            if run_manager:
                run_manager.on_llm_new_token(
                    token=token, verbose=self.verbose, log_probs=log_probs
                )
            yield chunk

    def get_num_tokens(self, text: str) -> int:
        """Get the number of tokens that present in the text.

        Useful for checking if an input will fit in a model's context window.

        Args:
            text: The string input to tokenize.

        Returns:
            The number of tokens in the text.
        """
        tokenized_text = self.client.tokenize(text.encode("utf-8"))
        return len(tokenized_text)


[docs]class LlamaLLM(_BaseCausalLM): ggml_model = "Llama" ggml_module = "bigdl.llm.models"
[docs]class BloomLLM(_BaseCausalLM): ggml_model = "Bloom" ggml_module = "bigdl.llm.models"
[docs]class GptneoxLLM(_BaseCausalLM): ggml_model = "Gptneox" ggml_module = "bigdl.llm.models"
[docs]class ChatGLMLLM(_BaseCausalLM): ggml_model = "ChatGLM" ggml_module = "bigdl.llm.ggml.model.chatglm"
[docs]class StarcoderLLM(_BaseCausalLM): ggml_model = "Starcoder" ggml_module = "bigdl.llm.models"