Contents Menu Expand Light mode Dark mode Auto light/dark mode
BentoML
Logo
BentoML
  • Installation
  • Tutorial: Intro to BentoML
  • Main Concepts
    • Preparing Models
    • Service and APIs
    • Building Bentos
    • Using Runners
    • Deploying Bento
  • Framework Guides
    • CatBoost
    • fast.ai
    • Keras
    • LightGBM
    • ONNX
    • Picklable Model
    • PyTorch
    • PyTorch Lightning
    • Scikit-Learn
    • TensorFlow
    • Transformers
    • XGBoost
  • Advanced Guides
    • Adaptive Batching
    • Advanced Containerization
    • Bento Client
    • Bento Server
    • Configuration
    • Environment Manager
    • Inference Graph
    • Inference Data Collection & Model Monitoring
    • Logging
    • Metrics
    • Performance Guide
    • Serving with gRPC
    • Serving with GPU
    • Security
    • Tracing
    • 1.0 Migration Guide
  • Integrations
    • Airflow
    • Apache Flink
    • Arize AI
    • MLflow
    • Spark
    • Triton Inference Server
  • API Reference
    • Core Components
    • Bento Store APIs
    • API IO Descriptors
    • Metrics API
    • Framework APIs
      • ONNX
      • Scikit-Learn
      • Transformers
      • Flax
      • TensorFlow
      • TorchScript
      • XGBoost
      • Picklable Model
      • PyTorch
      • LightGBM
      • MLflow
      • CatBoost
      • fast.ai
      • Keras
      • Ray
    • BentoML CLI
    • Batch Inference
    • Exceptions
    • Container APIs
  • Examples
  • Community
  • GitHub
  • Blog
  v: latest
Versions
latest
v1.0.15
v1.0.14
v0.13.1
0.13-lts
Downloads
On Read the Docs
Project Home
Builds
Back to top
Edit this page

Apache Flink#

Apache Flink DataStream#

BentoML support stream model inferencing in Apache Flink DataStream API through either embedded runners or remote calls to a separated deployed Bento Service. This guide assumes prior knowledge on using runners and service APIs.

Embedded Model Runners#

In BentoML, a Runner represents a unit of computation, such as model inferencing, that can executed on either a remote runner process or an embedded runner instance. If available system resources allow loading the ML model in memory, invoking runners as embedded instances can typically achieve a better performance by avoiding the overhead incurred in the interprocess communication.

Runners can be initialized as embedded instances by calling init_local(). Once a runner is initialized, inference functions can be invoked on the runners.

import bentoml

iris_runner = bentoml.transformers.get("text-classification:latest")).to_runner()
iris_runner.init_local()
iris_runner.predict.run(INPUT_TEXT)

To integrate with Flink DataRunners API, runners can be used in ProcessWindowFunction` for iterative inferencing or a WindowFunction for batched inferencing.

import bentoml

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import RuntimeContext, MapFunction

class ClassifyFunction(MapFunction):
    def open(self, runtime_context: RuntimeContext):
        self.runner = bentoml.transformers.get(
            "text-classification:latest"
        ).to_runner()
        self.runner.init_local()

    def map(self, data):
        # transform(data)
        return data[0], self.runner.run(data[1])

The following is an end-to-end word classification example of using embedded runners in a Flink DataStream program. For simplicity, the input stream and output sink are abstracted out using in-memory collections and stdout sink.

import bentoml
import logging
import sys

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import RuntimeContext, MapFunction


class ClassifyFunction(MapFunction):
    def open(self, runtime_context: RuntimeContext):
        self.runner = bentoml.transformers.get("text-classification:latest").to_runner()
        self.runner.init_local()

    def map(self, data):
        # transform(data)
        return data[0], self.runner.run(data[1])


def classify_tweets():
    # Create a StreamExecutionEnvironment
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)

    # Create source DataStream, e.g. Kafka, Table, etc. Example uses data collection for simplicity.
    ds = env.from_collection(
        collection=[
            (1, "BentoML: Create an ML Powered Prediction Service in Minutes via @TDataScience https://buff.ly/3srhTw9 #Python #MachineLearning #BentoML"),
            (2, "Top MLOps Serving frameworks — 2021 https://link.medium.com/5Elq6Aw52ib #mlops #TritonInferenceServer #opensource #nvidia #machincelearning  #serving #tensorflow #PyTorch #Bodywork #BentoML #KFServing #kubeflow #Cortex #Seldon #Sagify #Syndicai"),
            (3, "#MLFlow provides components for experimentation management, ML project management. #BentoML only focuses on serving and deploying trained models"),
            (4, "2000 and beyond #OpenSource #bentoml"),
            (5, "Model Serving Made Easy https://github.com/bentoml/BentoML ⭐ 1.1K #Python #Bentoml #BentoML #Modelserving #Modeldeployment #Modelmanagement #Mlplatform #Mlinfrastructure #Ml #Ai #Machinelearning #Awssagemaker #Awslambda #Azureml #Mlops #Aiops #Machinelearningoperations #Turn")
        ]
    )

    # Define the execution logic
    ds = ds.map(ClassifyFunction())

    # Create sink and emit result to sink, e.g. Kafka, File, Table, etc. Example prints to stdout for simplicity.
    ds.print()

    # Submit for execution
    env.execute()


if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
    classify_tweets()

Remote Bento Service#

Model runners can also be invoked remotely as a separately deployed Bento Service. Calling a remote Bento Service may be preferred if the model cannot be loaded into memory of the Flink DataStream program. This options is also advantageous because model runners can be scaled more easily with deployment frameworks like Yatai.

To send a prediction request to a remotely deployed Bento Service in the DataStream program, you can use any HTTP client implementation of your choice inside the MapFunction or ProcessWindowFunction.

class ClassifyFunction(MapFunction):
    def map(self, data):
        return requests.post(
            "http://127.0.0.1:3000/classify",
            headers={"content-type": "text/plain"},
            data=TEXT_INPUT,
        ).text

Using a client with asynchronous IO support combined with Flink AsyncFunction is recommended to handle requests and responses concurrent and minimize IO waiting time of calling a remote Bento Service.

Next
Arize AI
Previous
Airflow
Copyright © 2022-2023, bentoml.com
Made with Furo
🍱
On this page
  • Apache Flink
    • Apache Flink DataStream
      • Embedded Model Runners
      • Remote Bento Service