Music Recommender Part 5: Model Monitor
In this notebook, we’ll set up SageMaker Model Monitor to detect when our model or data significantly deviates from its “normal” behavior. SageMaker Model Monitor provides the ability to monitor machine learning models in production and detect deviations in data quality in comparison to a baseline dataset (e.g. training data set). This notebook walks you through enabling data capture and setting up continous monitoring for an existing Endpoint. |
This Notebook helps with the following: * Update your existing SageMaker Endpoint to enable Model Monitoring * Analyze the training dataset to generate a baseline constraint * Setup a MonitoringSchedule for monitoring deviations from the specified baseline |
Contents
Step 1: Enable real-time inference data capture
back to top
To enable data capture for monitoring the model data quality, you specify the new capture option called DataCaptureConfig. You can capture the request payload, the response payload or both with this configuration. The capture config applies to all variants. Please provide the Endpoint name in the following cell:
[ ]:
from sagemaker.model_monitor import DataCaptureConfig
from sagemaker.predictor import Predictor
from sagemaker import session
import boto3
[ ]:
import sys
import pprint
sys.path.insert(1, './code')
from parameter_store import ParameterStore
ps = ParameterStore(verbose=False)
parameters = ps.read('music-rec')
bucket = parameters['bucket']
dw_ecrlist = parameters['dw_ecrlist']
fg_name_ratings = parameters['fg_name_ratings']
fg_name_tracks = parameters['fg_name_tracks']
fg_name_user_preferences = parameters['fg_name_user_preferences']
flow_export_id = parameters['flow_export_id']
flow_s3_uri = parameters['flow_s3_uri']
pretrained_model_path = parameters['pretrained_model_path']
prefix = parameters['prefix']
ratings_data_source = parameters['ratings_data_source']
tracks_data_source = parameters['tracks_data_source']
endpoint_name = parameters['endpoint_name']
val_data_uri = parameters['val_data_uri']
[ ]:
sm_session = session.Session(boto3.Session())
region = boto3.Session().region_name
[ ]:
# Please fill in the following for enabling data capture
s3_capture_upload_path = f's3://{bucket}/{prefix}/endpoint-data-capture/' #example: s3://bucket-name/path/to/endpoint-data-capture/
#####
## IMPORTANT
##
## Please make sure to add the "s3:PutObject" permission to the "role' you provided in the SageMaker Model
## behind this Endpoint. Otherwise, Endpoint data capture will not work.
##
#####
[ ]:
%%time
# Change parameters as you would like - adjust sampling percentage,
# chose to capture request or response or both
data_capture_config = DataCaptureConfig(
enable_capture = True,
sampling_percentage=25,
destination_s3_uri=s3_capture_upload_path,
kms_key_id=None,
capture_options=["REQUEST", "RESPONSE"],
csv_content_types=["text/csv"],
json_content_types=["application/json"]
)
# Now it is time to apply the new configuration and wait for it to be applied
predictor = Predictor(endpoint_name=endpoint_name)
predictor.update_data_capture_config(data_capture_config=data_capture_config)
sm_session.wait_for_endpoint(endpoint=endpoint_name)
Before you proceed:
Currently SageMaker supports monitoring Endpoints out of the box only for tabular (csv, flat-json) datasets. If your Endpoint uses some other datasets, these following steps will NOT work for you.
Step 2: Model Monitor - Baselining
back to top
In addition to collecting the data, SageMaker allows you to monitor and evaluate the data observed by the Endpoints. For this : 1. We need to create a baseline with which we compare the realtime traffic against. 1. Once a baseline is ready, we can setup a schedule to continously evaluate/compare against the baseline.
Constraint suggestion with baseline/training dataset
The training dataset with which you trained the model is usually a good baseline dataset. Note that the training dataset’s data schema and the inference dataset schema should exactly match (i.e. number and order of the features).
Using our training dataset, we’ll ask SageMaker to suggest a set of baseline constraints and generate descriptive statistics to explore the data.
[ ]:
##'s3://bucketname/path/to/baseline/data' - Where your validation data is
baseline_data_uri = val_data_uri
##'s3://bucketname/path/to/baseline/data' - Where the results are to be stored in
baseline_results_uri = f's3://{bucket}/{prefix}/baseline/results'
print('Baseline data uri: {}'.format(baseline_data_uri))
print('Baseline results uri: {}'.format(baseline_results_uri))
Create a baselining job with the validation dataset
Now that we have the training data ready in S3, let’s kick off a job to suggest constraints. DefaultModelMonitor.suggest_baseline(..) kicks off a ProcessingJob using a SageMaker provided Model Monitor container to generate the constraints. Please edit the configurations to fit your needs.
[ ]:
from sagemaker.model_monitor import DefaultModelMonitor
from sagemaker.model_monitor.dataset_format import DatasetFormat
from sagemaker import get_execution_role
import datetime
role = get_execution_role(sagemaker_session=sm_session)
datetime_stamp = datetime.datetime.now().strftime("%Y-%m-%d-%H%M%S")
my_default_monitor = DefaultModelMonitor(
role=role,
instance_count=2,
instance_type='ml.m5.xlarge',
volume_size_in_gb=20,
max_runtime_in_seconds=1800,
base_job_name=f"{prefix}-monitor-{datetime_stamp}"
)
[ ]:
%%time
monitor_baseline = my_default_monitor.suggest_baseline(
baseline_dataset=baseline_data_uri,
dataset_format=DatasetFormat.csv(header=False),
output_s3_uri=baseline_results_uri,
job_name=f"{prefix}-monitor-baseline-{datetime_stamp}",
wait=True
)
Exploratory Analysis of the Processing Jobs underlying SageMaker Monitor
In this short section [next few cells] we will be showing you how to further view the underlying jobs for the monitoring job
[ ]:
from time import gmtime, strftime
import boto3
client = boto3.client('sagemaker')
def get_last_processing_job():
response = client.list_processing_jobs(
NameContains=f"{prefix}-monitor-baseline-{datetime_stamp}",
StatusEquals='Completed',
SortBy='CreationTime',
SortOrder='Descending',
MaxResults=20
)
pprint.pprint(response['ProcessingJobSummaries'][0])
return response['ProcessingJobSummaries'][0]['ProcessingJobName']
[ ]:
from sagemaker.processing import ProcessingJob
from sagemaker.estimator import Estimator
from sagemaker.model_monitor.model_monitoring import ModelMonitor
my_default_monitor_name = get_last_processing_job()
[ ]:
my_default_monitor_reload = ProcessingJob.from_processing_name(sm_session, my_default_monitor_name)
response = client.describe_processing_job(
ProcessingJobName=my_default_monitor_name
)
pprint.pprint(response)
Explore the generated constraints and statistics
[ ]:
import pandas as pd
baseline_job = my_default_monitor.latest_baselining_job
schema_df = pd.io.json.json_normalize(baseline_job.baseline_statistics().body_dict["features"])
schema_df.head(10)
[ ]:
constraints_df = pd.io.json.json_normalize(baseline_job.suggested_constraints().body_dict["features"])
constraints_df.head(10)
Before proceeding to enable monitoring, you could chose to edit the constraint file as required to fine tune the constraints.
Step 3: Enable continous monitoring
back to top
We have collected the data above, here we proceed to analyze and monitor the data with MonitoringSchedules.
Create a schedule
We are ready to create a model monitoring schedule for the Endpoint created earlier with the baseline resources (constraints and statistics).
[ ]:
from sagemaker.model_monitor import CronExpressionGenerator
import datetime as datetime
from time import gmtime, strftime
mon_schedule_name = 'music-rec-monitor-schedule-{}'.format(datetime.datetime.now().strftime("%Y-%m-%d-%H%M%S"))
s3_report_path = f's3://{bucket}/{prefix}/monitor/report'
try:
my_default_monitor.create_monitoring_schedule(
monitor_schedule_name=mon_schedule_name,
endpoint_input=endpoint_name,
output_s3_uri=s3_report_path,
statistics=my_default_monitor.baseline_statistics(),
constraints=my_default_monitor.suggested_constraints(),
schedule_cron_expression=CronExpressionGenerator.daily(),
enable_cloudwatch_metrics=True,
)
print(f"Created monitoring schedule {mon_schedule_name}")
except:
my_default_monitor.update_monitoring_schedule(
endpoint_input=endpoint_name,
schedule_cron_expression=CronExpressionGenerator.daily(),
enable_cloudwatch_metrics=True,
)
print(f"Updated monitoring schedule {my_default_monitor.monitoring_schedule_name}")
[ ]:
import time
desc_schedule_result = my_default_monitor.describe_schedule()
while desc_schedule_result['MonitoringScheduleStatus'] != 'Scheduled':
print('Schedule status: {}'.format(desc_schedule_result['MonitoringScheduleStatus']))
desc_schedule_result = my_default_monitor.describe_schedule()
time.sleep(30)
print('Schedule status: {}'.format(desc_schedule_result['MonitoringScheduleStatus']))
Schedule status: Pending
All set
Now that your monitoring schedule has been created. Please return to the Amazon SageMaker Studio to list the executions for this Schedule and observe the results going forward.