Batch Pipline API Usage Guide#
Modin provides an experimental batching feature that pipelines row-parallel queries. This feature
is currently only supported for the PandasOnRay engine. Please note that this feature is experimental
and behavior or interfaces could be changed.
Usage examples#
In examples below we build and run some pipelines. It is important to note that the queries passed to
the pipeline operate on Modin DataFrame partitions, which are backed by pandas. When using pandas-
module level functions, please make sure to import and use pandas rather than modin.pandas.
Simple Batch Pipelining#
This example walks through a simple batch pipeline in order to familiarize the user with the API.
from modin.experimental.batch import PandasQueryPipeline
import modin.pandas as pd
import numpy as np
df = pd.DataFrame(
np.random.randint(0, 100, (100, 100)),
columns=[f"col {i}" for i in range(1, 101)],
) # Build the dataframe we will pipeline.
pipeline = PandasQueryPipeline(df) # Build the pipeline.
pipeline.add_query(lambda df: df + 1, is_output=True) # Add the first query and specify that
# it is an output query.
pipeline.add_query(
lambda df: df.rename(columns={f"col {i}":f"col {i-1}" for i in range(1, 101)})
) # Add a second query.
pipeline.add_query(
lambda df: df.drop(columns=['col 99']),
is_output=True,
) # Add a third query and specify that it is an output query.
new_df = pd.DataFrame(
np.ones((100, 100)),
columns=[f"col {i}" for i in range(1, 101)],
) # Build a second dataframe that we will pipeline now instead.
pipeline.update_df(new_df) # Update the dataframe that we will pipeline to be `new_df`
# instead of `df`.
result_dfs = pipeline.compute_batch() # Begin batch processing.
# Print pipeline results
print(f"Result of Query 1:\n{result_dfs[0]}")
print(f"Result of Query 2:\n{result_dfs[1]}")
# Output IDs can also be specified
pipeline = PandasQueryPipeline(df) # Build the pipeline.
pipeline.add_query(
lambda df: df + 1,
is_output=True,
output_id=1,
) # Add the first query, specify that it is an output query, as well as specify an output id.
pipeline.add_query(
lambda df: df.rename(columns={f"col {i}":f"col {i-1}" for i in range(1, 101)})
) # Add a second query.
pipeline.add_query(
lambda df: df.drop(columns=['col 99']),
is_output=True,
output_id=2,
) # Add a third query, specify that it is an output query, and specify an output_id.
result_dfs = pipeline.compute_batch() # Begin batch processing.
# Print pipeline results - should be a dictionary mapping Output IDs to resulting dataframes:
print(f"Mapping of Output ID to dataframe:\n{result_dfs}")
# Print results
for query_id, res_df in result_dfs.items():
print(f"Query {query_id} resulted in\n{res_df}")
Batch Pipelining with Postprocessing#
A postprocessing function can also be provided when calling pipeline.compute_batch. The example
below runs a similar pipeline as above, but the postprocessing function writes the output dfs to
a parquet file.
from modin.experimental.batch import PandasQueryPipeline
import modin.pandas as pd
import numpy as np
import os
import shutil
df = pd.DataFrame(
np.random.randint(0, 100, (100, 100)),
columns=[f"col {i}" for i in range(1, 101)],
) # Build the dataframe we will pipeline.
pipeline = PandasQueryPipeline(df) # Build the pipeline.
pipeline.add_query(
lambda df: df + 1,
is_output=True,
output_id=1,
) # Add the first query, specify that it is an output query, as well as specify an output id.
pipeline.add_query(
lambda df: df.rename(columns={f"col {i}":f"col {i-1}" for i in range(1, 101)})
) # Add a second query.
pipeline.add_query(
lambda df: df.drop(columns=['col 99']),
is_output=True,
output_id=2,
) # Add a third query, specify that it is an output query, and specify an output_id.
def postprocessing_func(df, output_id, partition_id):
filepath = f"query_{output_id}/"
os.makedirs(filepath, exist_ok=True)
filepath += f"part-{partition_id:04d}.parquet"
df.to_parquet(filepath)
return df
result_dfs = pipeline.compute_batch(
postprocessor=postprocessing_func,
pass_partition_id=True,
pass_output_id=True,
) # Begin computation, pass in a postprocessing function, and specify that partition ID and
# output ID should be passed to that postprocessing function.
print(os.system("ls query_1/")) # Should show `NPartitions.get()` parquet files - which
# correspond to partitions of the output of query 1.
print(os.system("ls query_2/")) # Should show `NPartitions.get()` parquet files - which
# correspond to partitions of the output of query 2.
for query_id, res_df in result_dfs.items():
written_df = pd.read_parquet(f"query_{query_id}/")
shutil.rmtree(f"query_{query_id}/") # Clean up
print(f"Written and Computed DF are " +
f"{'equal' if res_df.equals(written_df) else 'not equal'} for query {query_id}")
Batch Pipelining with Fan Out#
If the input dataframe to a query is small (consisting of only one partition), it is possible to
induce additional parallelism using the fan_out argument. The fan_out argument replicates
the input partition, applies the query to each replica, and then coalesces all of the replicas back
to one partition using the reduce_fn that must also be specified when fan_out is True.
It is possible to control the parallelism via the num_partitions parameter passed to the
constructor of the PandasQueryPipeline. This parameter designates the desired number of partitions,
and defaults to NPartitions.get() when not specified. During fan out, the input partition is replicated
num_partitions times. In the previous examples, num_partitions was not specified, and so defaulted
to NPartitions.get().
The example below demonstrates the usage of fan_out and num_partitions. We first demonstrate
an example of a function that would benefit from this computation pattern:
import glob
from PIL import Image
import torchvision.transforms as T
import torchvision
transforms = T.Compose([T.ToTensor()])
model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
model.eval()
COCO_INSTANCE_CATEGORY_NAMES = [
'__background__', 'person', 'bicycle', 'car', 'motorcycle', 'airplane', 'bus',
'train', 'truck', 'boat', 'traffic light', 'fire hydrant', 'N/A', 'stop sign',
'parking meter', 'bench', 'bird', 'cat', 'dog', 'horse', 'sheep', 'cow',
'elephant', 'bear', 'zebra', 'giraffe', 'N/A', 'backpack', 'umbrella', 'N/A', 'N/A',
'handbag', 'tie', 'suitcase', 'frisbee', 'skis', 'snowboard', 'sports ball',
'kite', 'baseball bat', 'baseball glove', 'skateboard', 'surfboard', 'tennis racket',
'bottle', 'N/A', 'wine glass', 'cup', 'fork', 'knife', 'spoon', 'bowl',
'banana', 'apple', 'sandwich', 'orange', 'broccoli', 'carrot', 'hot dog', 'pizza',
'donut', 'cake', 'chair', 'couch', 'potted plant', 'bed', 'N/A', 'dining table',
'N/A', 'N/A', 'toilet', 'N/A', 'tv', 'laptop', 'mouse', 'remote', 'keyboard', 'cell phone',
'microwave', 'oven', 'toaster', 'sink', 'refrigerator', 'N/A', 'book',
'clock', 'vase', 'scissors', 'teddy bear', 'hair drier', 'toothbrush'
]
def contains_cat(image, model):
image = transforms(image)
labels = [COCO_INSTANCE_CATEGORY_NAMES[i] for i in model([image])[0]['labels']]
return 'cat' in labels
def serial_query(df):
"""
This function takes as input a dataframe with a single row corresponding to a folder
containing images to parse. Each image in the folder is passed through a neural network
that detects whether it contains a cat, in serial, and a new column is computed for the
dataframe that counts the number of images containing cats.
Parameters
----------
df : a dataframe
The dataframe to process
Returns
-------
The same dataframe as before, with an additional column containing the count of images
containing cats.
"""
model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
model.eval()
img_folder = df['images'][0]
images = sorted(glob.glob(f"{img_folder}/*.jpg"))
cats = 0
for img in images:
cats = cats + 1 if contains_cat(Image.open(img), model) else cats
df['cat_count'] = cats
return df
To download the image files to test out this code, run the following bash script, which downloads
the images from the fast-ai-coco S3 bucket to a folder called images in your current working
directory:
aws s3 cp s3://fast-ai-coco/coco_tiny.tgz . --no-sign-request; tar -xf coco_tiny.tgz; mkdir \
images; mv coco_tiny/train/* images/; rm -rf coco_tiny; rm -rf coco_tiny.tgz
We can pipeline that code like so:
import modin.pandas as pd
from modin.experimental.batch import PandasQueryPipeline
from time import time
df = pd.DataFrame([['images']], columns=['images'])
pipeline = PandasQueryPipeline(df)
pipeline.add_query(serial_query, is_output=True)
serial_start = time()
df_with_cat_count = pipeline.compute_batch()[0]
serial_end = time()
print(f"Result of pipeline:\n{df_with_cat_count}")
We can induce 8x parallelism into the pipeline above by combining the fan_out and num_partitions parameters like so:
import modin.pandas as pd
from modin.experimental.batch import PandasQueryPipeline
import shutil
from time import time
df = pd.DataFrame([['images']], columns=['images'])
desired_num_partitions = 8
def parallel_query(df, partition_id):
"""
This function takes as input a dataframe with a single row corresponding to a folder
containing images to parse. It parses `total_images/desired_num_partitions` images every
time it is called. A new column is computed for the dataframe that counts the number of
images containing cats.
Parameters
----------
df : a dataframe
The dataframe to process
partition_id : int
The partition id of the dataframe that this function runs on.
Returns
-------
The same dataframe as before, with an additional column containing the count of images
containing cats.
"""
model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
model.eval()
img_folder = df['images'][0]
images = sorted(glob.glob(f"{img_folder}/*.jpg"))
total_images = len(images)
cats = 0
start_index = partition_id * (total_images // desired_num_partitions)
if partition_id == desired_num_partitions - 1: # Last partition must parse to end of list
images = images[start_index:]
else:
end_index = (partition_id + 1) * (total_images // desired_num_partitions)
images = images[start_index:end_index]
for img in images:
cats = cats + 1 if contains_cat(Image.open(img), model) else cats
df['cat_count'] = cats
return df
def reduce_fn(dfs):
"""
Coalesce the results of fanning out the `parallel_query` query.
Parameters
----------
dfs : a list of dataframes
The resulting dataframes from fanning out `parallel_query`
Returns
-------
A new dataframe whose `cat_count` column is the sum of the `cat_count` column of all
dataframes in `dfs`
"""
df = dfs[0]
cat_count = df['cat_count'][0]
for dataframe in dfs[1:]:
cat_count += dataframe['cat_count'][0]
df['cat_count'] = cat_count
return df
pipeline = PandasQueryPipeline(df, desired_num_partitions)
pipeline.add_query(
parallel_query,
fan_out=True,
reduce_fn=reduce_fn,
is_output=True,
pass_partition_id=True
)
parallel_start = time()
df_with_cat_count = pipeline.compute_batch()[0]
parallel_end = time()
print(f"Result of pipeline:\n{df_with_cat_count}")
print(f"Total Time in Serial: {serial_end - serial_start}")
print(f"Total time with induced parallelism: {parallel_end - parallel_start}")
shutil.rmtree("images/") # Clean up
Batch Pipelining with Dynamic Repartitioning#
Similarly, it is also possible to hint to the Pipeline API to repartition after a node completes
computation. This is currently only supported if the input dataframe consists of only one partition.
The number of partitions after repartitioning is controlled by the num_partitions parameter
passed to the constructor of the PandasQueryPipeline.
The following example demonstrates how to use the repartition_after parameter.
import modin.pandas as pd
from modin.experimental.batch import PandasQueryPipeline
import numpy as np
small_df = pd.DataFrame([[1, 2, 3]]) # Create a small dataframe
def increase_dataframe_size(df):
import pandas
new_df = pandas.concat([df] * 1000)
new_df = new_df.reset_index(drop=True) # Get a new range index that isn't duplicated
return new_df
desired_num_partitions = 24 # We will repartition to 24 partitions
def add_partition_id_to_df(df, partition_id):
import pandas
new_col = pandas.Series([partition_id]*len(df), name="partition_id", index=df.index)
return pandas.concat([df, new_col], axis=1)
pipeline = PandasQueryPipeline(small_df, desired_num_partitions)
pipeline.add_query(increase_dataframe_size, repartition_after=True)
pipeline.add_query(add_partition_id_to_df, pass_partition_id=True, is_output=True)
result_df = pipeline.compute_batch()[0]
print(f"Number of partitions passed to second query: " +
f"{len(np.unique(result_df['partition_id'].values))}")
print(f"Result of pipeline:\n{result_df}")