Computer Vision for Medical Imaging: Part 4. SageMaker Pipelines
This notebook is the final part of a 4-part series of techniques and services offer by SageMaker to build a model which predicts if an image of cells contains cancer. This notebook describes how to automate the ML workflow using SageMaker Pipelines.
Dataset
The dataset for this demo comes from the Camelyon16 Challenge made available under the CC0 licencse. The raw data provided by the challenge has been processed into 96x96 pixel tiles by Bas Veeling and also made available under the CC0 license. For detailed information on each dataset please see the papers below: * Ehteshami Bejnordi et al. Diagnostic Assessment of Deep Learning Algorithms for Detection of Lymph Node Metastases in Women With Breast Cancer. JAMA: The Journal of the American Medical Association, 318(22), 2199–2210. doi:jama.2017.14585 * B. S. Veeling, J. Linmans, J. Winkens, T. Cohen, M. Welling. “Rotation Equivariant CNNs for Digital Pathology”. arXiv:1806.03962
The tiled dataset from Bas Veeling is over 6GB of data. In order to easily run this demo, the dataset has been pruned to the first 14,000 images of the tiled dataset and comes included in the repo with this notebook for convenience.
Update Sagemaker SDK and Boto3
NOTE You may get an error from pip’s dependency resolver; you can ignore this error.
[ ]:
%store -r
%store
Import Libraries
[ ]:
import boto3
import sagemaker
import numpy as np
import matplotlib.pyplot as plt
import cv2
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import CreateModelStep
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.workflow.steps import ProcessingStep, TrainingStep
from sagemaker.workflow.parameters import ParameterInteger, ParameterFloat, ParameterString
Configure Boto3 Clients and Sessions
[ ]:
region = "us-west-2" # Change region as needed
boto3.setup_default_session(region_name=region)
boto_session = boto3.Session(region_name=region)
s3_client = boto3.client("s3", region_name=region)
sagemaker_boto_client = boto_session.client("sagemaker")
sagemaker_session = sagemaker.session.Session(
boto_session=boto_session, sagemaker_client=sagemaker_boto_client
)
sagemaker_role = sagemaker.get_execution_role()
bucket = sagemaker.Session().default_bucket()
Configure the Estimator
[ ]:
training_image = sagemaker.image_uris.retrieve("image-classification", region)
hyperparameters = {
"num_layers": 18,
"use_pretrained_model": 1,
"augmentation_type": "crop_color_transform",
"image_shape": "3,96,96",
"num_classes": 2,
"num_training_samples": num_training_samples,
"mini_batch_size": 64,
"epochs": 5,
"learning_rate": 0.01,
"precision_dtype": "float32",
}
estimator_config = {
"hyperparameters": hyperparameters,
"image_uri": training_image,
"role": sagemaker.get_execution_role(),
"instance_count": 1,
"instance_type": "ml.p3.2xlarge",
"volume_size": 100,
"max_run": 360000,
"output_path": f"s3://{bucket}/{prefix}/training_jobs",
}
image_classifier = sagemaker.estimator.Estimator(**estimator_config)
Pipeline
Step 1: Create RecordIO Splits
[ ]:
base_uri = f"s3://{bucket}/{prefix}/data"
input_data_uri = sagemaker.s3.S3Uploader.upload(
local_path="data/camelyon16_tiles.h5", desired_s3_uri=base_uri
)
input_data = ParameterString(name="InputData", default_value=input_data_uri)
[ ]:
s3_client.upload_file(Filename="split_data.py", Bucket=bucket, Key=f"{prefix}/code/split_data.py")
split_data_script_uri = f"s3://{bucket}/{prefix}/code/split_data.py"
split_data_instance_type = "ml.t3.large"
sklearn_processor = SKLearnProcessor(
framework_version="0.23-1",
instance_type=split_data_instance_type,
instance_count=1,
base_job_name="image-classication-split-data",
role=sagemaker_role,
)
split_data_step = ProcessingStep(
name="SplitData",
processor=sklearn_processor,
inputs=[
sagemaker.processing.ProcessingInput(
source=input_data, destination="/opt/ml/processing/input"
),
],
outputs=[
sagemaker.processing.ProcessingOutput(
output_name="train_data", source="/opt/ml/processing/output/data/train"
),
sagemaker.processing.ProcessingOutput(
output_name="val_data", source="/opt/ml/processing/output/data/val"
),
sagemaker.processing.ProcessingOutput(
output_name="test_data", source="/opt/ml/processing/output/data/test"
),
],
code=split_data_script_uri,
)
Step 2: Train Model
[ ]:
train_step_inputs = {
"train": sagemaker.inputs.TrainingInput(
s3_data=split_data_step.properties.ProcessingOutputConfig.Outputs[
"train_data"
].S3Output.S3Uri,
content_type="application/x-recordio",
s3_data_type="S3Prefix",
input_mode="Pipe",
),
"validation": sagemaker.inputs.TrainingInput(
s3_data=split_data_step.properties.ProcessingOutputConfig.Outputs[
"val_data"
].S3Output.S3Uri,
content_type="application/x-recordio",
s3_data_type="S3Prefix",
input_mode="Pipe",
),
}
train_step = TrainingStep(name="TrainModel", estimator=image_classifier, inputs=train_step_inputs)
Step 3: Register Model
[ ]:
model_approval_status = ParameterString(
name="ModelApprovalStatus", default_value="PendingManualApproval"
)
register_step = RegisterModel(
name="RegisterModel",
estimator=image_classifier,
model_data=train_step.properties.ModelArtifacts.S3ModelArtifacts,
content_types=["image/jpeg"],
response_types=["text/csv"],
inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
transform_instances=["ml.m5.xlarge"],
model_package_group_name=mpg_name,
approval_status=model_approval_status,
)
Step 4: Create Model
[ ]:
model = sagemaker.model.Model(
name=f"{mpg_name}-pipline",
image_uri=train_step.properties.AlgorithmSpecification.TrainingImage,
model_data=train_step.properties.ModelArtifacts.S3ModelArtifacts,
sagemaker_session=sagemaker_session,
role=sagemaker_role,
)
inputs = sagemaker.inputs.CreateModelInput(instance_type="ml.m4.xlarge")
create_model_step = CreateModelStep(name="ModelPreDeployment", model=model, inputs=inputs)
Step 5: Deploy Model
[ ]:
s3_client.upload_file(
Filename="deploy_model.py", Bucket=bucket, Key=f"{prefix}/code/deploy_model.py"
)
deploy_model_script_uri = f"s3://{bucket}/{prefix}/code/deploy_model.py"
deploy_model_processor = SKLearnProcessor(
framework_version="0.23-1",
role=sagemaker_role,
instance_type="ml.t3.medium",
instance_count=1,
base_job_name=f"{prefix}-deploy-model",
sagemaker_session=sagemaker_session,
)
deploy_step = ProcessingStep(
name="DeployModel",
processor=deploy_model_processor,
job_arguments=[
"--model-name",
create_model_step.properties.ModelName,
"--region",
region,
"--endpoint-instance-type",
deploy_instance_type,
"--endpoint-name",
"cv-model-pipeline",
],
code=deploy_model_script_uri,
)
Create Pipeline
[ ]:
pipeline_name = f"{prefix}-pipeline"
pipeline = Pipeline(
name=pipeline_name,
parameters=[input_data, model_approval_status],
steps=[split_data_step, train_step, register_step, create_model_step, deploy_step],
)
pipeline.upsert(role_arn=sagemaker_role)
[ ]:
parameters = {"ModelApprovalStatus": "Approved"}
start_response = pipeline.start(parameters=parameters)
start_response.wait(max_attempts=100)
start_response.describe()
Lineage
Review the lineage of the artifacts generated by the pipeline.
[ ]:
import time
from sagemaker.lineage.visualizer import LineageTableVisualizer
from pprint import pprint
viz = LineageTableVisualizer(sagemaker_session)
for execution_step in reversed(start_response.list_steps()):
pprint(execution_step)
display(viz.show(pipeline_execution_step=execution_step))
time.sleep(5)
Clean up resources
[ ]:
best_model.sagemaker_session.delete_endpoint(mpg_name)
[ ]: