Contents Menu Expand Light mode Dark mode Auto light/dark mode
BentoML
Light Logo Dark Logo
BentoML

BentoML

  • Overview
    • What is BentoML?
    • Ecosystem
  • Quickstarts
    • Install BentoML
    • Deploy a Transformer model with BentoML
    • Deploy a large language model with OpenLLM and BentoML
    • Deploy an Iris classification model with BentoML
    • Deploy a YOLO model with BentoML
    • Deploy Stable Diffusion XL with dynamic LoRA adapters
  • Main Concepts
    • Models
    • Service and APIs
    • Building Bentos
    • Using Runners
    • Deploying Bento
  • Framework Guides
    • CatBoost
    • Diffusers
    • fast.ai
    • Keras
    • LightGBM
    • ONNX
    • Picklable Model
    • PyTorch
    • PyTorch Lightning
    • Scikit-Learn
    • EasyOCR
    • TensorFlow
    • Transformers
    • XGBoost
    • Detectron2
  • Advanced Guides
    • Adaptive Batching
    • Advanced Containerization
    • Bento Client
    • Bento Server
    • Configuration
    • Resource Scheduling Strategy
    • Environment Manager
    • GitHub Actions
    • Inference Graph
    • Inference Data Collection & Model Monitoring
    • Logging
    • Metrics
    • Performance Guide
    • Serving with gRPC
    • Serving with GPU
    • Security
    • Streaming
    • Tracing
    • 1.0 Migration Guide
  • Integrations
    • Airflow
    • Arize AI
    • Flink
    • Kubeflow
    • MLflow
    • Spark
    • Triton Inference Server
    • Ray
  • API Reference
    • Core Components
    • Bento Store APIs
    • API IO Descriptors
    • Metrics API
    • Framework APIs
      • Diffusers
      • ONNX
      • Scikit-Learn
      • Transformers
      • Flax
      • TensorFlow
      • TorchScript
      • XGBoost
      • Picklable Model
      • PyTorch
      • LightGBM
      • MLflow
      • CatBoost
      • fast.ai
      • EasyOCR
      • Keras
      • Ray
      • Detectron
    • BentoML CLI
    • Batch Inference
    • Exceptions
    • Container APIs
    • Types
  • Examples

BentoCloud

  • Get Started
    • Understand BentoCloud
    • Quickstart
  • How-to Guides
    • Deploy Bentos
    • Manage Access Tokens
    • Manage Models and Bentos
    • Manage Users
    • Monitor Events
  • Topic Guides
    • Observability
  • Best Practices
    • Cost Optimization
  • Reference
    • Deployment creation and update information
  v: latest
Versions
latest
v1.0.23
0.13-lts
Downloads
On Read the Docs
Project Home
Builds
Back to top
Edit this page

Flink#

Apache Flink DataStream#

BentoML support stream model inferencing in Apache Flink DataStream API through either embedded runners or remote calls to a separated deployed Bento Service. This guide assumes prior knowledge on using runners and service APIs.

Embedded Model Runners#

In BentoML, a Runner represents a unit of computation, such as model inferencing, that can executed on either a remote runner process or an embedded runner instance. If available system resources allow loading the ML model in memory, invoking runners as embedded instances can typically achieve a better performance by avoiding the overhead incurred in the interprocess communication.

Runners can be initialized as embedded instances by calling init_local(). Once a runner is initialized, inference functions can be invoked on the runners.

import bentoml

iris_runner = bentoml.transformers.get("text-classification:latest")).to_runner()
iris_runner.init_local()
iris_runner.predict.run(INPUT_TEXT)

To integrate with Flink DataRunners API, runners can be used in ProcessWindowFunction` for iterative inferencing or a WindowFunction for batched inferencing.

import bentoml

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import RuntimeContext, MapFunction

class ClassifyFunction(MapFunction):
    def open(self, runtime_context: RuntimeContext):
        self.runner = bentoml.transformers.get(
            "text-classification:latest"
        ).to_runner()
        self.runner.init_local()

    def map(self, data):
        # transform(data)
        return data[0], self.runner.run(data[1])

The following is an end-to-end word classification example of using embedded runners in a Flink DataStream program. For simplicity, the input stream and output sink are abstracted out using in-memory collections and stdout sink.

import bentoml
import logging
import sys

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import RuntimeContext, MapFunction


class ClassifyFunction(MapFunction):
    def open(self, runtime_context: RuntimeContext):
        self.runner = bentoml.transformers.get("text-classification:latest").to_runner()
        self.runner.init_local()

    def map(self, data):
        # transform(data)
        return data[0], self.runner.run(data[1])


def classify_tweets():
    # Create a StreamExecutionEnvironment
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)

    # Create source DataStream, e.g. Kafka, Table, etc. Example uses data collection for simplicity.
    ds = env.from_collection(
        collection=[
            (1, "BentoML: Create an ML Powered Prediction Service in Minutes via @TDataScience https://buff.ly/3srhTw9 #Python #MachineLearning #BentoML"),
            (2, "Top MLOps Serving frameworks — 2021 https://link.medium.com/5Elq6Aw52ib #mlops #TritonInferenceServer #opensource #nvidia #machincelearning  #serving #tensorflow #PyTorch #Bodywork #BentoML #KFServing #kubeflow #Cortex #Seldon #Sagify #Syndicai"),
            (3, "#MLFlow provides components for experimentation management, ML project management. #BentoML only focuses on serving and deploying trained models"),
            (4, "2000 and beyond #OpenSource #bentoml"),
            (5, "Model Serving Made Easy https://github.com/bentoml/BentoML ⭐ 1.1K #Python #Bentoml #BentoML #Modelserving #Modeldeployment #Modelmanagement #Mlplatform #Mlinfrastructure #Ml #Ai #Machinelearning #Awssagemaker #Awslambda #Azureml #Mlops #Aiops #Machinelearningoperations #Turn")
        ]
    )

    # Define the execution logic
    ds = ds.map(ClassifyFunction())

    # Create sink and emit result to sink, e.g. Kafka, File, Table, etc. Example prints to stdout for simplicity.
    ds.print()

    # Submit for execution
    env.execute()


if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
    classify_tweets()

Remote Bento Service#

Model runners can also be invoked remotely as a separately deployed Bento Service. Calling a remote Bento Service may be preferred if the model cannot be loaded into memory of the Flink DataStream program. This options is also advantageous because model runners can be scaled more easily with deployment frameworks like Yatai.

To send a prediction request to a remotely deployed Bento Service in the DataStream program, you can use any HTTP client implementation of your choice inside the MapFunction or ProcessWindowFunction.

class ClassifyFunction(MapFunction):
    def map(self, data):
        return requests.post(
            "http://127.0.0.1:3000/classify",
            headers={"content-type": "text/plain"},
            data=TEXT_INPUT,
        ).text

Using a client with asynchronous IO support combined with Flink AsyncFunction is recommended to handle requests and responses concurrent and minimize IO waiting time of calling a remote Bento Service.

Next
Kubeflow
Previous
Arize AI
Copyright © 2022-2023, bentoml.com
Made with Furo
           
On this page
  • Flink
    • Apache Flink DataStream
      • Embedded Model Runners
      • Remote Bento Service