Source code for bentoml._internal.io_descriptors.numpy

from __future__ import annotations

import json
import logging
import typing as t
from functools import lru_cache

from starlette.requests import Request
from starlette.responses import Response

from ...exceptions import BadInput
from ...exceptions import InvalidArgument
from ...exceptions import UnprocessableEntity
from ...grpc.utils import LATEST_PROTOCOL_VERSION
from ...grpc.utils import import_generated_stubs
from ..service.openapi import SUCCESS_DESCRIPTION
from ..service.openapi.specification import MediaType
from ..service.openapi.specification import Schema
from ..types import LazyType
from ..utils import LazyLoader
from ..utils.http import set_cookies
from .base import IODescriptor

if t.TYPE_CHECKING:
    import numpy as np
    import pyarrow
    import pyspark.sql.types
    from google.protobuf import message as _message

    from bentoml.grpc.v1 import service_pb2 as pb
    from bentoml.grpc.v1alpha1 import service_pb2 as pb_v1alpha1

    from .. import external_typing as ext
    from ..context import ServiceContext as Context
    from .base import OpenAPIResponse
else:
    pb, _ = import_generated_stubs("v1")
    pb_v1alpha1, _ = import_generated_stubs("v1alpha1")
    np = LazyLoader("np", globals(), "numpy")

logger = logging.getLogger(__name__)


# TODO: support the following types for for protobuf message:
# - support complex64, complex128, object and struct types
# - BFLOAT16, QINT32, QINT16, QUINT16, QINT8, QUINT8
#
# For int16, uint16, int8, uint8 -> specify types in NumpyNdarray + using int_values.
#
# For bfloat16, half (float16) -> specify types in NumpyNdarray + using float_values.
#
# for string_values, use <U for np.dtype instead of S (zero-terminated bytes).
FIELDPB_TO_NPDTYPE_NAME_MAP = {
    "bool_values": "bool",
    "float_values": "float32",
    "string_values": "<U",
    "double_values": "float64",
    "int32_values": "int32",
    "int64_values": "int64",
    "uint32_values": "uint32",
    "uint64_values": "uint64",
}


@t.overload
def dtypepb_to_npdtype_map(
    version: t.Literal["v1"] = ...,
) -> dict[pb.NDArray.DType.ValueType, ext.NpDTypeLike]:
    ...


@t.overload
def dtypepb_to_npdtype_map(
    version: t.Literal["v1alpha1"] = ...,
) -> dict[pb_v1alpha1.NDArray.DType.ValueType, ext.NpDTypeLike]:
    ...


@lru_cache(maxsize=2)
def dtypepb_to_npdtype_map(
    version: str = LATEST_PROTOCOL_VERSION,
) -> dict[int, ext.NpDTypeLike]:
    pb, _ = import_generated_stubs(version)
    # pb.NDArray.Dtype -> np.dtype
    return {
        pb.NDArray.DTYPE_FLOAT: np.dtype("float32"),
        pb.NDArray.DTYPE_DOUBLE: np.dtype("double"),
        pb.NDArray.DTYPE_INT32: np.dtype("int32"),
        pb.NDArray.DTYPE_INT64: np.dtype("int64"),
        pb.NDArray.DTYPE_UINT32: np.dtype("uint32"),
        pb.NDArray.DTYPE_UINT64: np.dtype("uint64"),
        pb.NDArray.DTYPE_BOOL: np.dtype("bool"),
        pb.NDArray.DTYPE_STRING: np.dtype("<U"),
    }


@t.overload
def dtypepb_to_fieldpb_map(
    version: t.Literal["v1"] = ...,
) -> dict[pb.NDArray.DType.ValueType, str]:
    ...


@t.overload
def dtypepb_to_fieldpb_map(
    version: t.Literal["v1alpha1"] = ...,
) -> dict[pb_v1alpha1.NDArray.DType.ValueType, str]:
    ...


@lru_cache(maxsize=2)
def dtypepb_to_fieldpb_map(version: str = LATEST_PROTOCOL_VERSION) -> dict[int, str]:
    return {
        k: npdtype_to_fieldpb_map()[v]
        for k, v in dtypepb_to_npdtype_map(version).items()
    }


@lru_cache(maxsize=1)
def fieldpb_to_npdtype_map() -> dict[str, ext.NpDTypeLike]:
    # str -> np.dtype
    return {k: np.dtype(v) for k, v in FIELDPB_TO_NPDTYPE_NAME_MAP.items()}


@t.overload
def npdtype_to_dtypepb_map(
    version: t.Literal["v1"] = ...,
) -> dict[ext.NpDTypeLike, pb.NDArray.DType.ValueType]:
    ...


@t.overload
def npdtype_to_dtypepb_map(
    version: t.Literal["v1alpha1"] = ...,
) -> dict[ext.NpDTypeLike, pb_v1alpha1.NDArray.DType.ValueType]:
    ...


@lru_cache(maxsize=2)
def npdtype_to_dtypepb_map(
    version: str = LATEST_PROTOCOL_VERSION,
) -> dict[ext.NpDTypeLike, int]:
    # np.dtype -> pb.NDArray.Dtype
    return {v: k for k, v in dtypepb_to_npdtype_map(version).items()}


@lru_cache(maxsize=1)
def npdtype_to_fieldpb_map() -> dict[ext.NpDTypeLike, str]:
    # np.dtype -> str
    return {v: k for k, v in fieldpb_to_npdtype_map().items()}


def _is_matched_shape(left: tuple[int, ...], right: tuple[int, ...]) -> bool:
    if (left is None) or (right is None):
        return False

    if len(left) != len(right):
        return False

    for i, j in zip(left, right):
        if i == -1 or j == -1:
            continue
        if i == j:
            continue
        return False
    return True


# TODO: when updating docs, add examples with gRPCurl
[docs]class NumpyNdarray( IODescriptor["ext.NpNDArray"], descriptor_id="bentoml.io.NumpyNdarray", proto_fields=("ndarray",), ): """ :obj:`NumpyNdarray` defines API specification for the inputs/outputs of a Service, where either inputs will be converted to or outputs will be converted from type :code:`numpy.ndarray` as specified in your API function signature. A sample service implementation: .. code-block:: python :caption: `service.py` from __future__ import annotations from typing import TYPE_CHECKING, Any import bentoml from bentoml.io import NumpyNdarray if TYPE_CHECKING: from numpy.typing import NDArray runner = bentoml.sklearn.get("sklearn_model_clf").to_runner() svc = bentoml.Service("iris-classifier", runners=[runner]) @svc.api(input=NumpyNdarray(), output=NumpyNdarray()) def predict(input_arr: NDArray[Any]) -> NDArray[Any]: return runner.run(input_arr) Users then can then serve this service with :code:`bentoml serve`: .. code-block:: bash % bentoml serve ./service.py:svc --reload Users can then send requests to the newly started services with any client: .. tab-set:: .. tab-item:: Bash .. code-block:: bash % curl -X POST -H "Content-Type: application/json" \\ --data '[[5,4,3,2]]' http://0.0.0.0:3000/predict # [1]% .. tab-item:: Python .. code-block:: python :caption: `request.py` import requests requests.post( "http://0.0.0.0:3000/predict", headers={"content-type": "application/json"}, data='[{"0":5,"1":4,"2":3,"3":2}]' ).text Args: dtype: Data type users wish to convert their inputs/outputs to. Refer to `arrays dtypes <https://numpy.org/doc/stable/reference/arrays.dtypes.html>`_ for more information. enforce_dtype: Whether to enforce a certain data type. if :code:`enforce_dtype=True` then :code:`dtype` must be specified. shape: Given shape that an array will be converted to. For example: .. code-block:: python :caption: `service.py` from bentoml.io import NumpyNdarray @svc.api(input=NumpyNdarray(shape=(2,2), enforce_shape=False), output=NumpyNdarray()) async def predict(input_array: np.ndarray) -> np.ndarray: # input_array will be reshaped to (2,2) result = await runner.run(input_array) When ``enforce_shape=True`` is provided, BentoML will raise an exception if the input array received does not match the `shape` provided. .. dropdown:: About the behaviour of ``shape`` :icon: triangle-down If specified, then both :meth:`bentoml.io.NumpyNdarray.from_http_request` and :meth:`bentoml.io.NumpyNdarray.from_proto` will reshape the input array before sending it to the API function. enforce_shape: Whether to enforce a certain shape. If ``enforce_shape=True`` then ``shape`` must be specified. Returns: :obj:`~bentoml._internal.io_descriptors.IODescriptor`: IO Descriptor that represents a :code:`np.ndarray`. """ _mime_type = "application/json" def __init__( self, dtype: str | ext.NpDTypeLike | None = None, enforce_dtype: bool = False, shape: tuple[int, ...] | None = None, enforce_shape: bool = False, ): if dtype and not isinstance(dtype, np.dtype): # Convert from primitive type or type string, e.g.: np.dtype(float) or np.dtype("float64") try: dtype = np.dtype(dtype) except TypeError as e: raise UnprocessableEntity(f'Invalid dtype "{dtype}": {e}') from None self._dtype = dtype self._shape = shape self._enforce_dtype = enforce_dtype self._enforce_shape = enforce_shape def _openapi_types(self) -> str: # convert numpy dtypes to openapi compatible types. var_type = "integer" if self._dtype: name: str = self._dtype.name if name.startswith("float") or name.startswith("complex"): var_type = "number" return var_type def input_type(self) -> LazyType[ext.NpNDArray]: return LazyType("numpy", "ndarray") def to_spec(self) -> dict[str, t.Any]: return { "id": self.descriptor_id, "args": { "dtype": None if self._dtype is None else self._dtype.name, "shape": self._shape, "enforce_dtype": self._enforce_dtype, "enforce_shape": self._enforce_shape, }, } @classmethod def from_spec(cls, spec: dict[str, t.Any]) -> t.Self: if "args" not in spec: raise InvalidArgument(f"Missing args key in NumpyNdarray spec: {spec}") res = NumpyNdarray(**spec["args"]) return res def openapi_schema(self) -> Schema: # Note that we are yet provide # supports schemas for arrays that is > 2D. items = Schema(type=self._openapi_types()) if self._shape and len(self._shape) > 1: items = Schema(type="array", items=Schema(type=self._openapi_types())) return Schema(type="array", items=items, nullable=True) def openapi_components(self) -> dict[str, t.Any] | None: pass def openapi_example(self): if self.sample is not None: if isinstance(self.sample, np.generic): raise BadInput("NumpyNdarray: sample must be a numpy array.") from None return self.sample.tolist() def openapi_request_body(self) -> dict[str, t.Any]: return { "content": { self._mime_type: MediaType( schema=self.openapi_schema(), example=self.openapi_example() ) }, "required": True, "x-bentoml-io-descriptor": self.to_spec(), } def openapi_responses(self) -> OpenAPIResponse: return { "description": SUCCESS_DESCRIPTION, "content": { self._mime_type: MediaType( schema=self.openapi_schema(), example=self.openapi_example() ) }, "x-bentoml-io-descriptor": self.to_spec(), } def validate_array( self, arr: ext.NpNDArray, exception_cls: t.Type[Exception] = BadInput ) -> ext.NpNDArray: if self._dtype is not None and self._dtype != arr.dtype: # ‘same_kind’ means only safe casts or casts within a kind, like float64 # to float32, are allowed. if np.can_cast(arr.dtype, self._dtype, casting="same_kind"): arr = arr.astype(self._dtype, casting="same_kind") # type: ignore else: msg = '%s: Expecting ndarray of dtype "%s", but "%s" was received.' if self._enforce_dtype: raise exception_cls( msg % (self.__class__.__name__, self._dtype, arr.dtype) ) from None else: logger.debug(msg, self.__class__.__name__, self._dtype, arr.dtype) if self._shape is not None and not _is_matched_shape(self._shape, arr.shape): msg = '%s: Expecting ndarray of shape "%s", but "%s" was received.' if self._enforce_shape: raise exception_cls( msg % (self.__class__.__name__, self._shape, arr.shape) ) from None try: arr = arr.reshape(self._shape) except ValueError as e: logger.debug( msg + "Failed to reshape: %s", self.__class__.__name__, self._shape, arr.shape, e, ) return arr
[docs] async def from_http_request(self, request: Request) -> ext.NpNDArray: """ Process incoming requests and convert incoming objects to ``numpy.ndarray``. Args: request: Incoming Requests Returns: a ``numpy.ndarray`` object. This can then be used inside users defined logics. """ obj = await request.json() try: res = np.array(obj, dtype=self._dtype) except ValueError: res = np.array(obj) return self.validate_array(res)
[docs] async def to_http_response(self, obj: ext.NpNDArray, ctx: Context | None = None): """ Process given objects and convert it to HTTP response. Args: obj: ``np.ndarray`` that will be serialized to JSON ctx: ``Context`` object that contains information about the request. Returns: HTTP Response of type ``starlette.responses.Response``. This can be accessed via cURL or any external web traffic. """ obj = self.validate_array(obj) if ctx is not None: res = Response( json.dumps(obj.tolist()), media_type=self._mime_type, headers=ctx.response.metadata, # type: ignore (bad starlette types) status_code=ctx.response.status_code, ) set_cookies(res, ctx.response.cookies) return res else: return Response(json.dumps(obj.tolist()), media_type=self._mime_type)
def _from_sample(self, sample: ext.NpNDArray | t.Sequence[t.Any]) -> ext.NpNDArray: """ Create a :class:`~bentoml._internal.io_descriptors.numpy.NumpyNdarray` IO Descriptor from given inputs. Args: sample: Given sample ``np.ndarray`` data. It also accepts a sequence-like data type that can be converted to ``np.ndarray``. enforce_dtype: Enforce a certain data type. :code:`dtype` must be specified at function signature. If you don't want to enforce a specific dtype then change :code:`enforce_dtype=False`. enforce_shape: Enforce a certain shape. :code:`shape` must be specified at function signature. If you don't want to enforce a specific shape then change :code:`enforce_shape=False`. Returns: :class:`~bentoml._internal.io_descriptors.numpy.NumpyNdarray`: IODescriptor from given users inputs. Example: .. code-block:: python :caption: `service.py` from __future__ import annotations from typing import TYPE_CHECKING, Any import bentoml from bentoml.io import NumpyNdarray import numpy as np if TYPE_CHECKING: from numpy.typing import NDArray input_spec = NumpyNdarray.from_sample(np.array([[1,2,3]])) @svc.api(input=input_spec, output=NumpyNdarray()) async def predict(input: NDArray[np.int16]) -> NDArray[Any]: return await runner.async_run(input) Raises: :class:`BadInput`: If given sample is a type ``numpy.generic``. This exception will also be raised if we failed to create a ``np.ndarray`` from given sample. """ if isinstance(sample, np.generic): raise BadInput( "'NumpyNdarray.from_sample()' expects a 'numpy.array', not 'numpy.generic'." ) from None try: if not isinstance(sample, np.ndarray): sample = np.array(sample) except ValueError: raise BadInput(f"Given sample ({sample}) is not a numpy ND-array") from None if self._dtype is None: self._dtype = sample.dtype if self._shape is None: self._shape = sample.shape return sample
[docs] async def from_proto(self, field: pb.NDArray | bytes) -> ext.NpNDArray: """ Process incoming protobuf request and convert it to ``numpy.ndarray`` Args: request: Incoming RPC request message. context: grpc.ServicerContext Returns: ``numpy.ndarray``: A ``np.array`` constructed from given protobuf message. .. seealso:: :ref:`Protobuf representation of np.ndarray <guides/grpc:Array representation via \\`\\`NDArray\\`\\`>` .. note:: Currently, we support ``pb.NDArray`` and ``serialized_bytes`` as valid inputs. ``serialized_bytes`` will be prioritised over ``pb.NDArray`` if both are provided. Serialized bytes has a specialized bytes representation and should not be used by users directly. """ if isinstance(field, bytes): # We will be using ``np.frombuffer`` to deserialize the bytes. # This means that we need to ensure that ``dtype`` are provided to the IO descriptor # # ```python # from __future__ import annotations # # import numpy as np # # @svc.api(input=NumpyNdarray(dtype=np.float16), output=NumpyNdarray()) # def predict(input: np.ndarray): # ... # input will be serialized with np.frombuffer, and hence dtype is required # ``` if not self._dtype: raise BadInput( "'serialized_bytes' requires specifying 'dtype'." ) from None dtype: ext.NpDTypeLike = self._dtype array = np.frombuffer(field, dtype=self._dtype) else: if isinstance(field, pb_v1alpha1.NDArray): version = "v1alpha1" elif isinstance(field, pb.NDArray): version = "v1" else: raise BadInput( f"Expected 'pb.NDArray' or 'bytes' but received {type(field)}" ) from None # The behaviour of dtype are as follows: # - if not provided: # * All of the fields are empty, then we return a ``np.empty``. # * We will loop through all of the provided fields, and only allows one field per message. # If here are more than two fields (i.e. ``string_values`` and ``float_values``), then we will raise an error, as we don't know how to deserialize the data. # - if provided: # * We will use the provided dtype-to-field maps to get the data from the given message. if field.dtype == pb.NDArray.DTYPE_UNSPECIFIED: dtype = None else: try: dtype = dtypepb_to_npdtype_map(version=version)[field.dtype] except KeyError: raise BadInput(f"{field.dtype} is invalid.") from None if dtype is not None: values_array = getattr( field, dtypepb_to_fieldpb_map(version=version)[field.dtype] ) else: fieldpb = [ f.name for f, _ in field.ListFields() if f.name.endswith("_values") ] if len(fieldpb) == 0: # input message doesn't have any fields. return np.empty(shape=field.shape or 0) elif len(fieldpb) > 1: # when there are more than two values provided in the proto. raise BadInput( f"Array contents can only be one of given values key. Use one of '{fieldpb}' instead.", ) from None dtype: ext.NpDTypeLike = fieldpb_to_npdtype_map()[fieldpb[0]] values_array = getattr(field, fieldpb[0]) try: array = np.array(values_array, dtype=dtype) except ValueError: array = np.array(values_array) # We will try to reshape the array if ``shape`` is provided. # Note that all of the logics here are handled in-place, meaning that we will ensure # not to create new copies of given initialized array. if field.shape: array = np.reshape(array, field.shape) # We will try to run validation process before sending this of to the user. return self.validate_array(array)
@t.overload async def _to_proto_impl( self, obj: ext.NpNDArray, *, version: t.Literal["v1"] ) -> pb.NDArray: ... @t.overload async def _to_proto_impl( self, obj: ext.NpNDArray, *, version: t.Literal["v1alpha1"] ) -> pb_v1alpha1.NDArray: ... async def _to_proto_impl( self, obj: ext.NpNDArray, *, version: str ) -> _message.Message: """ Process given objects and convert it to grpc protobuf response. Args: obj: ``np.array`` that will be serialized to protobuf. Returns: ``pb.NDArray``: Protobuf representation of given ``np.ndarray`` """ try: obj = self.validate_array(obj) except BadInput as e: raise e from None pb, _ = import_generated_stubs(version) try: fieldpb = npdtype_to_fieldpb_map()[obj.dtype] dtypepb = npdtype_to_dtypepb_map(version=version)[obj.dtype] return pb.NDArray( dtype=dtypepb, shape=tuple(obj.shape), **{fieldpb: obj.ravel().tolist()}, ) except KeyError: raise BadInput( f"Unsupported dtype '{obj.dtype}' for response message.", ) from None def from_arrow(self, batch: pyarrow.RecordBatch) -> ext.NpNDArray: df = batch.to_pandas() if not LazyType("pandas", "DataFrame").isinstance(df): raise InvalidArgument("Unable to convert input into numpy ndarray.") return df.to_numpy() def to_arrow(self, arr: ext.NpNDArray) -> pyarrow.RecordBatch: import pyarrow return pyarrow.RecordBatch.from_arrays([pyarrow.array(arr)], names=["output"]) def spark_schema(self) -> pyspark.sql.types.StructType: from pyspark.pandas.typedef import as_spark_type from pyspark.sql.types import StructField from pyspark.sql.types import StructType if self._dtype is None: raise InvalidArgument( "Cannot perform batch inference with a numpy output without a known dtype; please provide a dtype." ) if self._shape is None: raise InvalidArgument( "Cannot perform batch inference with a numpy output without a known shape; please provide a shape." ) if len(self._shape) != 1: raise InvalidArgument( "Cannot perform batch inference with a multidimensional numpy ndarray output; consider using pandas DataFrames instead." ) try: out_spark_type = as_spark_type(self._dtype) except TypeError: raise InvalidArgument( f"dtype {self._dtype} is not supported for batch inference." ) return StructType([StructField("out", out_spark_type, nullable=False)])
[docs] async def to_proto(self, obj: ext.NpNDArray) -> pb.NDArray: return await self._to_proto_impl(obj, version="v1")
async def to_proto_v1alpha1(self, obj: ext.NpNDArray) -> pb_v1alpha1.NDArray: return await self._to_proto_impl(obj, version="v1alpha1")