Fleet Predictive Maintenance: Part 2. Data Preparation with Data Wrangler
Architecure
SageMaker Data Wrangler Job Notebook
This notebook uses the Data Wrangler .flow file to submit a SageMaker Data Wrangler Job with the following steps:
Push Data Wrangler .flow file to S3
Parse the .flow file inputs, and create the argument dictionary to submit to a boto client
Submit the ProcessingJob arguments and wait for Job completion
Optionally, the notebook also gives an example of starting a SageMaker XGBoost TrainingJob using the newly processed data.
[1]:
# SageMaker Python SDK version 2.x is required
import pkg_resources
import subprocess
import sys
original_version = pkg_resources.get_distribution("sagemaker").version
_ = subprocess.check_call([sys.executable, "-m", "pip", "install", "sagemaker==2.20.0"])
[2]:
import json
import os
import time
import uuid
import boto3
import sagemaker
Parameters
The following lists configurable parameters that are used throughout this notebook.
[3]:
# S3 bucket for saving processing job outputs
# Feel free to specify a different bucket here if you wish.
sess = sagemaker.Session()
bucket = sess.default_bucket()
prefix = "data_wrangler_flows"
flow_id = f"{time.strftime('%d-%H-%M-%S', time.gmtime())}-{str(uuid.uuid4())[:8]}"
flow_name = f"flow-{flow_id}"
flow_uri = f"s3://{bucket}/{prefix}/{flow_name}.flow"
flow_file_name = "dw_flow/prm.flow"
iam_role = sagemaker.get_execution_role()
container_uri = (
"415577184552.dkr.ecr.us-east-2.amazonaws.com/sagemaker-data-wrangler-container:1.2.1"
)
# Processing Job Resources Configurations
# Data wrangler processing job only supports 1 instance.
instance_count = 1
instance_type = "ml.m5.4xlarge"
# Processing Job Path URI Information
output_prefix = f"export-{flow_name}/output"
output_path = f"s3://{bucket}/{output_prefix}"
output_name = "ff586e7b-a02d-472b-91d4-da3dd05d7a30.default"
processing_job_name = f"data-wrangler-flow-processing-{flow_id}"
processing_dir = "/opt/ml/processing"
# Modify the variable below to specify the content type to be used for writing each output
# Currently supported options are 'CSV' or 'PARQUET', and default to 'CSV'
output_content_type = "CSV"
# URL to use for sagemaker client.
# If this is None, boto will automatically construct the appropriate URL to use
# when communicating with sagemaker.
sagemaker_endpoint_url = None
For this demo, the following cell has been added to the generated code from the Data Wrangler export. The changes are needed to update the S3 bucket in the .flow file to match your S3 location as well as make sure we have the right container URI depending on your region.
[ ]:
from demo_helpers import update_dw_s3uri, get_dw_container_for_region
# update the flow file to change the s3 location to our bucket
update_dw_s3uri(flow_file_name)
# get the Data Wrangler container associated with our region
region = boto3.Session().region_name
container_uri = get_dw_container_for_region(region)
dw_output_path_prm = output_path
print(
f"Storing dw_output_path_prm = {dw_output_path_prm} for use in next notebook 2_fleet_predmaint.ipynb"
)
%store dw_output_path_prm
Push Flow to S3
Use the following cell to upload the Data Wrangler .flow file to Amazon S3 so that it can be used as an input to the processing job.
[ ]:
# Load .flow file
with open(flow_file_name) as f:
flow = json.load(f)
# Upload to S3
s3_client = boto3.client("s3")
s3_client.upload_file(flow_file_name, bucket, f"{prefix}/{flow_name}.flow")
print(f"Data Wrangler Flow notebook uploaded to {flow_uri}")
Create Processing Job arguments
This notebook submits a Processing Job using the Sagmaker Python SDK. Below, utility methods are defined for creating Processing Job Inputs for the following sources: S3, Athena, and Redshift.
[6]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.dataset_definition.inputs import (
AthenaDatasetDefinition,
DatasetDefinition,
RedshiftDatasetDefinition,
)
def create_flow_notebook_processing_input(base_dir, flow_s3_uri):
return ProcessingInput(
source=flow_s3_uri,
destination=f"{base_dir}/flow",
input_name="flow",
s3_data_type="S3Prefix",
s3_input_mode="File",
s3_data_distribution_type="FullyReplicated",
)
def create_s3_processing_input(s3_dataset_definition, name, base_dir):
return ProcessingInput(
source=s3_dataset_definition["s3ExecutionContext"]["s3Uri"],
destination=f"{base_dir}/{name}",
input_name=name,
s3_data_type="S3Prefix",
s3_input_mode="File",
s3_data_distribution_type="FullyReplicated",
)
def create_athena_processing_input(athena_dataset_defintion, name, base_dir):
return ProcessingInput(
input_name=name,
dataset_definition=DatasetDefinition(
local_path=f"{base_dir}/{name}",
athena_dataset_definition=AthenaDatasetDefinition(
catalog=athena_dataset_defintion["catalogName"],
database=athena_dataset_defintion["databaseName"],
query_string=athena_dataset_defintion["queryString"],
output_s3_uri=athena_dataset_defintion["s3OutputLocation"] + f"{name}/",
output_format=athena_dataset_defintion["outputFormat"].upper(),
),
),
)
def create_redshift_processing_input(redshift_dataset_defintion, name, base_dir):
return ProcessingInput(
input_name=name,
dataset_definition=DatasetDefinition(
local_path=f"{base_dir}/{name}",
redshift_dataset_definition=RedshiftDatasetDefinition(
cluster_id=redshift_dataset_defintion["clusterIdentifier"],
database=redshift_dataset_defintion["database"],
db_user=redshift_dataset_defintion["dbUser"],
query_string=redshift_dataset_defintion["queryString"],
cluster_role_arn=redshift_dataset_defintion["unloadIamRole"],
output_s3_uri=redshift_dataset_defintion["s3OutputLocation"] + f"{name}/",
output_format=redshift_dataset_defintion["outputFormat"].upper(),
),
),
)
def create_processing_inputs(processing_dir, flow, flow_uri):
"""Helper function for creating processing inputs
:param flow: loaded data wrangler flow notebook
:param flow_uri: S3 URI of the data wrangler flow notebook
"""
processing_inputs = []
flow_processing_input = create_flow_notebook_processing_input(processing_dir, flow_uri)
processing_inputs.append(flow_processing_input)
for node in flow["nodes"]:
if "dataset_definition" in node["parameters"]:
data_def = node["parameters"]["dataset_definition"]
name = data_def["name"]
source_type = data_def["datasetSourceType"]
if source_type == "S3":
processing_inputs.append(create_s3_processing_input(data_def, name, processing_dir))
elif source_type == "Athena":
processing_inputs.append(
create_athena_processing_input(data_def, name, processing_dir)
)
elif source_type == "Redshift":
processing_inputs.append(
create_redshift_processing_input(data_def, name, processing_dir)
)
else:
raise ValueError(f"{source_type} is not supported for Data Wrangler Processing.")
return processing_inputs
def create_processing_output(output_name, output_path, processing_dir):
return ProcessingOutput(
output_name=output_name,
source=os.path.join(processing_dir, "output"),
destination=output_path,
s3_upload_mode="EndOfJob",
)
def create_container_arguments(output_name, output_content_type):
output_config = {output_name: {"content_type": output_content_type}}
return [f"--output-config '{json.dumps(output_config)}'"]
Start ProcessingJob
Now, the Processing Job is submitted using the Processor from the Sagemaker SDK. Logs are turned off, but can be turned on for debugging purposes.
[ ]:
%%time
from sagemaker.processing import Processor
processor = Processor(
role=iam_role,
image_uri=container_uri,
instance_count=instance_count,
instance_type=instance_type,
sagemaker_session=sess,
)
processor.run(
inputs=create_processing_inputs(processing_dir, flow, flow_uri),
outputs=[create_processing_output(output_name, output_path, processing_dir)],
arguments=create_container_arguments(output_name, output_content_type),
wait=True,
logs=False,
job_name=processing_job_name,
)
Kick off SageMaker Training Job (Optional)
Data Wrangler is a SageMaker tool for processing data to be used for Machine Learning. Now that the data has been processed, users will want to train a model using the data. The following shows an example of doing so using a popular algorithm XGBoost.
It is important to note that the following XGBoost objective [‘binary’, ‘regression’, ‘multiclass’], hyperparameters, or content_type may not be suitable for the output data, and will require changes to train a proper model. Furthermore, for CSV training, the algorithm assumes that the target variable is in the first column. For more information on SageMaker XGBoost, please see https://docs.aws.amazon.com/sagemaker/latest/dg/xgboost.html.
Find Training Data path
The below demonstrates how to recursively search the output directory to find the data location.
[ ]:
s3_client = boto3.client("s3")
list_response = s3_client.list_objects_v2(Bucket=bucket, Prefix=output_prefix)
training_path = None
for content in list_response["Contents"]:
if "_SUCCESS" not in content["Key"]:
training_path = content["Key"]
print(training_path)
Next, the Training Job hyperparameters are set. For more information on XGBoost Hyperparameters, see https://xgboost.readthedocs.io/en/latest/parameter.html.
[9]:
region = boto3.Session().region_name
container = sagemaker.image_uris.retrieve("xgboost", region, "1.2-1")
hyperparameters = {
"max_depth": "5",
"objective": "reg:squarederror",
"num_round": "10",
}
train_content_type = (
"application/x-parquet" if output_content_type.upper() == "PARQUET" else "text/csv"
)
train_input = sagemaker.inputs.TrainingInput(
s3_data=f"s3://{bucket}/{training_path}",
content_type=train_content_type,
)
The TrainingJob configurations are set using the SageMaker Python SDK Estimator, and which is fit using the training data from the ProcessingJob that was run earlier.
[ ]:
estimator = sagemaker.estimator.Estimator(
container,
iam_role,
hyperparameters=hyperparameters,
instance_count=1,
instance_type="ml.m5.2xlarge",
)
estimator.fit({"train": train_input})
Cleanup
Uncomment the following code cell to revert the SageMaker Python SDK to the original version used before running this notebook. This notebook upgrades the SageMaker Python SDK to 2.x, which may cause other example notebooks to break. To learn more about the changes introduced in the SageMaker Python SDK 2.x update, see Use Version 2.x of the SageMaker Python SDK..
[ ]:
# _ = subprocess.check_call(
# [sys.executable, "-m", "pip", "install", f"sagemaker=={original_version}"]
# )