Spark#

Apache Spark is a general-purpose distributed processing system used for big data workloads. It allows for processing large datasets through an in-memory computation model, which can improve the performance of big data processing tasks. It also provides a wide range of APIs and a feature-rich set of tools for structured data processing, machine learning, and stream processing for big-data applications.

BentoML now supports running your Bentos with batch data via Spark. The following tutorial assumes basic understanding of BentoML. If you’d like to learn more about BentoML, see the BentoML tutorial.

Prerequisites#

Make sure to have at least BentoML 1.0.13 and Spark version 3.3.0 available in your system.

$ pip install -U "bentoml>=1.0.13"

In addition, both BentoML and your service’s dependencies (including model dependencies) must also be installed in the Spark cluster. Most likely, the service you are hosting Spark on has its own mechanisms for doing this. If you are using a standalone cluster, you should install those dependencies on every node you expect to use.

Finally, we use the quickstart bento from the aforementioned tutorial. If you have already followed that tutorial, you should already have that bento. If you have note, simply run the following:

import urllib.request
urllib.request.urlretrieve("https://bentoml-public.s3.us-west-1.amazonaws.com/quickstart/iris_classifier.bento", "iris_classifier.bento")
bentoml.import_bento("iris_classifier.bento")

Run Bentos in Spark#

Note

All of the following commands/APIs should work for bentos with IO Descriptor that support batch inference. Currently, those descriptors are bentoml.io.NumpyNdarray, bentoml.io.PandasDataFrame, and bentoml.io.PandasSeries.

IMPORTANT: your Bento API must be capable of accepting multiple inputs. For example, batch_classify(np.array([[input_1], [input_2]])) must work, and return np.array([[output_1], [output_2]]). The quickstart bento supports this pattern because the iris classifier model it contains does.

Create a PySpark SparkSession object#

This will be used to create a DataFrame from the input data, and to run the batch inference job. If you’re running in a notebook with spark already (e.g. a VertexAI PySpark notebook or a Databricks Notebook), you can skip this step.

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

Load the input data into a PySpark DataFrame#

If you are using multipart input, or your dataframe requires column names, you must also provide a schema for your DataFrame as you load it. You can do this using the spark.read.csv() method, which takes a file path as input and returns a DataFrame containing the data from the file.

from pyspark.sql.types import StructType, StructField, FloatType, StringType
import urllib.request

urllib.request.urlretrieve("https://docs.bentoml.com/en/latest/_static/examples/batch/input.csv", "input.csv")

schema = StructType([
    StructField("sepal_length", FloatType(), False),
    StructField("sepal_width", FloatType(), False),
    StructField("petal_length", FloatType(), False),
    StructField("petal_width", FloatType(), False),
])
df = spark.read.csv("input.csv", schema=schema)

Create a BentoService object#

Create a BentoService object using the BentoML service you want to use for the batch inference job. Here, we first try to use bentoml.get to get the bento from the local BentoML store. If it is not found, we retrieve the bento from the BentoML public S3 and import it.

import bentoml

bento = bentoml.get("iris_classifier:latest")

Run the batch inference job#

Run the batch inference job using the bentoml.batch.run_in_spark() method. This method takes the API name, the Spark DataFrame containing the input data, and the Spark session itself as parameters, and it returns a DataFrame containing the results of the batch inference job.

results_df = bentoml.batch.run_in_spark(bento, "classify", df, spark)

Internally, what happens when you run run_in_spark is as follows:

  • First, the bento is distributed to the cluster. Note that if the bento has already been distributed, i.e. you have already run a computation with that bento, this step is skipped.

  • Next, a process function is created, which runs the API method on every Spark batch given it. The batch size can be controlled by setting spark.sql.execution.arrow.maxRecordsPerBatch. PySpark pickles this process function and dispatches it, along with the relevant data, to the workers.

  • Finally, the function is evaluated on the given dataframe. Once all methods that the user defined in the script have been executed, the data is returned to the master node.

Save the results#

Finally, save the results of the batch inference job to a file using the DataFrame.write.csv() method. This method takes a file path as input and saves the contents of the DataFrame to the specified file.

results_df.write.csv("output")

Upon success, you should see multiple files in the output folder: an empty _SUCCESS file and one or more part-*.csv files containing your output.

$ ls output
_SUCCESS  part-00000-85fe41df-4005-4991-a6ad-98b6ed549993-c000.csv
$ head output/part-00000-d8fe59de-0233-4a80-8bda-519ce98223ea-c000.csv
1.0
0.0
2.0
0.0

Spark supports many formats other than CSV; see the Spark documentation for a full list.