Music Recommender Part 4: Deploy Model & Inference using Online Feature Store
In this notebook, we’ll deploy our chosen model as an endpoint so that we can make predictions/inferences against it. Under the hood the model.deploy function creates a model, an endpoint configuration and an endpoint.
Then we’ll make music recommendations for a single user by inferencing against our model. We’ll query our Feature Store to get some data to use for inferencing and show you how SageMaker Clarify can explain which features were most useful in making the recommended music predictions using SHAP values.
Amazon SageMaker Clarify provides tools to help explain how machine learning models make predictions. These tools can help ML modelers and developers and other internal stakeholders understand model characteristics as a whole prior to deployment and to debug predictions provided by the model after it’s deployed. Transparency about how ML models arrive at their predictions is also critical to consumers and regulators who need to trust the model predictions if they are going to accept the decisions based on them.
Contents
[ ]:
try:
!pip install -U awswrangler
except ModuleNotFoundError:
!pip install --no-input awswrangler
[ ]:
# update pandas to avoid data type issues in older 1.0 version
!pip install -qU pandas==1.2.0
import pandas as pd
print(pd.__version__)
[ ]:
import time
import boto3
import argparse
import pathlib
import sagemaker
from sagemaker.feature_store.feature_group import FeatureGroup
from sagemaker.estimator import Estimator
import awswrangler as wr
import os
import json
import matplotlib.pyplot as plt
import numpy as np
[ ]:
import sys
import pprint
sys.path.insert(1, './code')
from parameter_store import ParameterStore
ps = ParameterStore(verbose=False)
parameters = ps.read('music-rec')
bucket = parameters['bucket']
dw_ecrlist = parameters['dw_ecrlist']
fg_name_ratings = parameters['fg_name_ratings']
fg_name_tracks = parameters['fg_name_tracks']
fg_name_user_preferences = parameters['fg_name_user_preferences']
flow_export_id = parameters['flow_export_id']
flow_s3_uri = parameters['flow_s3_uri']
pretrained_model_path = parameters['pretrained_model_path']
prefix = parameters['prefix']
ratings_data_source = parameters['ratings_data_source']
tracks_data_source = parameters['tracks_data_source']
model_name = parameters['model_name']
training_job_name = parameters['training_job_name']
mpg_name = parameters['mpg_name']
model_name = parameters['model_name']
feature_names = parameters['feature_names']
train_data_uri = parameters['train_data_uri']
[ ]:
sess = sagemaker.Session()
region = boto3.Session().region_name
boto3.setup_default_session(region_name=region)
s3_client = boto3.client('s3')
account_id = boto3.client('sts').get_caller_identity()["Account"]
boto_session = boto3.Session(region_name=region)
sagemaker_client = boto_session.client(service_name='sagemaker', region_name=region)
sagemaker_session = sagemaker.session.Session(
boto_session=boto_session,
sagemaker_client=sagemaker_client
)
sagemaker_role = sagemaker.get_execution_role(sagemaker_session=sagemaker_session)
Deploy Model
back to top
[ ]:
endpoint_name = '{}-endpoint-notebooks'.format(model_name)
print(endpoint_name)
ps.add({'endpoint_name':endpoint_name}, namespace='music-rec')
ps.store()
[ ]:
# if you want to use a pretrained model, set use_pretrained = True
## else use_pretrained = False to use the model you trained in the previous notebook
use_pretrained = False
if use_pretrained:
# or use a pretrained model if you skipped model training in the last notebook
xgb_estimator = sagemaker.model.Model(
image_uri=sagemaker.image_uris.retrieve("xgboost", region, "0.90-2"),
model_data=pretrained_model_path,
role=sagemaker_role
)
else:
print(training_job_name)
# reinstantiate the estimator we trained in the previous notebook
xgb_estimator = Estimator.attach(training_job_name)
[ ]:
endpoint_list = sagemaker_client.list_endpoints(
SortBy='CreationTime',
SortOrder='Descending',
NameContains=endpoint_name,
StatusEquals='InService'
)
endpoint_list
Create endpoint
[ ]:
%%time
if len(endpoint_list['Endpoints']) > 0:
print(f"Using existing endpoint: {endpoint_list['Endpoints'][0]['EndpointName']}")
else:
# deploy endpoint for model if it doesn't already exist
xgb_estimator.deploy(initial_instance_count=1,
instance_type='ml.m4.xlarge',
model_name=model_name,
endpoint_name=endpoint_name
)
[ ]:
model_package = sagemaker_client.list_model_packages(ModelPackageGroupName=mpg_name)['ModelPackageSummaryList'][0]
model_package_update = {
'ModelPackageArn': model_package['ModelPackageArn'],
'ModelApprovalStatus': 'Approved'
}
update_response = sagemaker_client.update_model_package(**model_package_update)
Create a predictor
back to top
[ ]:
predictor = sagemaker.predictor.Predictor(
endpoint_name=endpoint_name,
sagemaker_session=sagemaker_session)
Pull user data from feature group
[ ]:
# random user ID. You can try any other ID
sample_user_id = 11005
[ ]:
featurestore_runtime = boto_session.client(service_name='sagemaker-featurestore-runtime', region_name=region)
feature_store_session = sagemaker.Session(
boto_session=boto_session,
sagemaker_client=sagemaker_client,
sagemaker_featurestore_runtime_client=featurestore_runtime
)
[ ]:
# pull the sample user's 5 star preferences record from the feature store
fg_response = featurestore_runtime.get_record(
FeatureGroupName=fg_name_user_preferences,
RecordIdentifierValueAsString=str(sample_user_id)
)
record = fg_response['Record']
df_user = pd.DataFrame(record).set_index('FeatureName')
Pull sample of 1000 tracks from feature group
[ ]:
# pull a sample of the tracks data (multiple records) from the feature store using athena query
fg_name_tracks_obj = FeatureGroup(name=fg_name_tracks, sagemaker_session=feature_store_session)
tracks_query = fg_name_tracks_obj.athena_query()
tracks_table = tracks_query.table_name
# use escaped quotes aound table name since it contains '-' symbols
query_string = ("SELECT * FROM \"{}\" LIMIT 1000".format(tracks_table))
print("Running " + query_string)
# run Athena query. The output is loaded to a Pandas dataframe.
tracks_query.run(query_string=query_string, output_location=f"s3://{bucket}/{prefix}/query_results/")
tracks_query.wait()
df_tracks = tracks_query.as_dataframe()
[ ]:
data = df_tracks.merge(pd.DataFrame(df_user['ValueAsString']).T, how='cross')
data.columns = [c.lower() for c in data.columns]
inference_df = data[feature_names]
Format the datapoint
The datapoint must match the exact input format as the model was trained–with all features in the correct order. In this example, the col_order variable was saved when you created the train and test datasets earlier in the guide.
[ ]:
data_inputs = [','.join([str(i) for i in row]) for row in inference_df.values]
Infer (predict) new songs using model
back to top
[ ]:
predictions = []
for data_input in data_inputs:
results = predictor.predict(data_input, initial_args = {"ContentType": "text/csv"})
prediction = json.loads(results)
predictions.append(prediction)
print(f'Predicted rating for user {int(sample_user_id)}:', prediction)
[ ]:
# Write to csv in S3 without headers and index column.
inference_df['rating'] = predictions
inference_df = inference_df[['rating']+feature_names]
inference_df.to_csv('data/prediction_data.csv', header=False, index=False)
s3_client.upload_file('data/prediction_data.csv', bucket, f'{prefix}/data/pred/prediction_data.csv')
pred_data_uri = f's3://{bucket}/{prefix}/data/pred/prediction_data.csv'
[ ]:
df_train = pd.read_csv(train_data_uri)
label = 'rating'
Explain model predictions
back to top
[ ]:
explainability_output_path = f's3://{bucket}/{prefix}/clarify-output/explainability'
[ ]:
clarify_processor = sagemaker.clarify.SageMakerClarifyProcessor(
role=sagemaker_role,
instance_count=1,
instance_type='ml.c4.xlarge',
sagemaker_session=sagemaker_session)
model_config = sagemaker.clarify.ModelConfig(
model_name=model_name,
instance_type='ml.m4.xlarge',
instance_count=1,
accept_type='text/csv')
shap_config = sagemaker.clarify.SHAPConfig(
baseline=[df_train.median().values[1:].tolist()], # ignore the first column since that is that target
num_samples=100,
agg_method='mean_abs')
explainability_data_config = sagemaker.clarify.DataConfig(
s3_data_input_path=pred_data_uri,
s3_output_path=explainability_output_path,
label=label,
headers=[label]+feature_names,
dataset_type='text/csv')
[ ]:
%%time
try:
s3_client.download_file(
Bucket = bucket,
Key = f'{prefix}/clarify-output/explainability/explanations_shap/out.csv',
Filename = 'data/shap_output.csv'
)
print('Downloaded output from previous explainability job')
except Exception as e:
error = e.response.get('Error').get('Code')
if error == '404':
print('Running explainability job')
clarify_processor.run_explainability(
data_config=explainability_data_config,
model_config=model_config,
explainability_config=shap_config)
[ ]:
inference_df['trackid'] = data['trackid']
[ ]:
playlist_length = 10 # number of songs to recommend in playlist
playlist = inference_df.sort_values(by='rating', ascending=False).head(playlist_length)
print('Curated Playlist:\n', playlist['trackid'])
[ ]:
local_explanations_out = pd.read_csv(explainability_output_path+'/explanations_shap/out.csv')
local_explanations_out.columns = feature_names
print("Model prediction:", playlist.iloc[0, 0])
plt.figure(figsize=(12,6))
local_explanations_out.iloc[0].sort_values().plot.barh(title='Local explanation for prediction')