Architect and Build a Music Recommender System across the Entire ML-Lifecycle with Amazon SageMaker

Overview


Welcome of the Music Recommender use-case with Amazon SageMaker. In this series of notebooks we will go through the ML Lifecycle and show how we can build a Music Recommender System using a combination of SageMaker Services and features. IN each phase, we will have relevant notebooks that show you how easy it is to implement that phase of the lifecycle.


Contents

Architecture

Let’s look at the overall solution architecure for this use case. We will start by doing each of these tasks within the exploratoyr phase of the ML Lifecycle, then when we are done with Experimentation and Trials, we can develop an automated pipeline such as the one depicted here to prepare data, deposit in feature store, train and tune the model, deposit it in the registry, then deploy it to a SageMaker hosted endpoint, and run Monitoring on it.


Solution Architecure

[ ]:
import sys
import pprint
sys.path.insert(1, './code')
from parameter_store import ParameterStore

ps = ParameterStore()
ps.create(namespace='music-rec')
[ ]:
# update pandas to avoid data type issues in older 1.0 version
!pip install pandas --upgrade --quiet
import pandas as pd
print(pd.__version__)
[ ]:
# create data folder
!mkdir data
[ ]:
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline

import json
import sagemaker
import boto3
import os
from awscli.customizations.s3.utils import split_s3_bucket_key

# Sagemaker session
sess = sagemaker.Session()
# get session bucket name
bucket = sess.default_bucket()
# bucket prefix or the subfolder for everything we produce
prefix='music-recommendation'
# s3 client
s3_client = boto3.client("s3")

print(f"this is your default SageMaker Studio bucket name: {bucket}")

ps.add({'bucket': bucket, 'prefix': prefix}, namespace='music-rec')
[ ]:
def get_data(public_s3_data, to_bucket, sample_data=1):
    new_paths = []
    for f in public_s3_data:
        bucket_name, key_name = split_s3_bucket_key(f)
        filename = f.split('/')[-1]
        new_path = "s3://{}/{}/{}".format(to_bucket, prefix, filename)
        new_paths.append(new_path)

        # only download if not already downloaded
        if not os.path.exists('./data/{}'.format(filename)):
            # download s3 data
            print("Downloading file from {}".format(f))
            s3_client.download_file(bucket_name, key_name, './data/{}'.format(filename))

        # subsample the data to create a smaller datatset for this demo
        new_df = pd.read_csv('./data/{}'.format(filename))
        new_df = new_df.sample(frac=sample_data)
        new_df.to_csv('./data/{}'.format(filename), index=False)

        # upload s3 data to our default s3 bucket for SageMaker Studio
        print("Uploading {} to {}\n".format(filename, new_path))
        s3_client.upload_file('./data/{}'.format(filename), to_bucket, os.path.join(prefix,filename))

    return new_paths


def get_model(model_path, to_bucket):
    # upload model to our default s3 bucket for SageMaker Studio
    filename = model_path.split('/')[-1]
    print("Uploading {} to {}\n".format(model_path, os.path.join(to_bucket,prefix,filename)))
    s3_client.upload_file(model_path, to_bucket, os.path.join(prefix,filename))
    return "s://{}".format(os.path.join(to_bucket,prefix,filename))


def update_data_sources(flow_path, tracks_data_source, ratings_data_source):
    with open(flow_path) as flowf:
        flow = json.load(flowf)

    for node in flow['nodes']:
        # if the key exists for our s3 endpoint
        try:
            if node['parameters']['dataset_definition']['name'] == 'tracks.csv':
                # reset the s3 data source for tracks data
                old_source = node['parameters']['dataset_definition']['s3ExecutionContext']['s3Uri']
                print("Changed {} to {}".format(old_source, tracks_data_source))
                node['parameters']['dataset_definition']['s3ExecutionContext']['s3Uri'] = tracks_data_source
            elif node['parameters']['dataset_definition']['name'] == 'ratings.csv':
                # reset the s3 data source for ratings data
                old_source = node['parameters']['dataset_definition']['s3ExecutionContext']['s3Uri']
                print("Changed {} to {}".format(old_source, ratings_data_source))
                node['parameters']['dataset_definition']['s3ExecutionContext']['s3Uri'] = ratings_data_source
        except:
            continue
    # write out the updated json flow file
    with open(flow_path, 'w') as outfile:
        json.dump(flow, outfile)

    return flow

Prereqs: Get Data


Here we will download the music data from a public S3 bucket that we’ll be using for this demo and uploads it to your default S3 bucket that was created for you when you initially created a SageMaker Studio workspace.

[ ]:
# public S3 bucket that contains our music data
s3_bucket_music_data = "s3://sagemaker-sample-files/datasets/tabular/synthetic-music"
[ ]:
new_data_paths = get_data([f"{s3_bucket_music_data}/tracks.csv", f"{s3_bucket_music_data}/ratings.csv"], bucket, sample_data=0.70)
print(new_data_paths)
[ ]:
# these are the new file paths located on your SageMaker Studio default s3 storage bucket
tracks_data_source = f's3://{bucket}/{prefix}/tracks.csv'
ratings_data_source = f's3://{bucket}/{prefix}/ratings.csv'

ps.add({'tracks_data_source': tracks_data_source, 'ratings_data_source': ratings_data_source}, namespace='music-rec')

Upload pretrained model

[ ]:
pretrained_model_path = get_model('./model/model.tar.gz', bucket)

ps.add({'pretrained_model_path': pretrained_model_path}, namespace='music-rec')
ps.store()

Update the data source in the .flow file


The 01_music_datapred.flow file is a JSON file containing instructions for where to find your data sources and how to transform the data. We’ll be updating the object telling Data Wrangler where to find the input data on S3. We will set this to your default S3 bucket. With this update to the .flow file it now points to your new S3 bucket as the data source used by SageMaker Data Wrangler.

Make sure the .flow file is closed before running this next step or it won’t update the new s3 file locations in the file

[ ]:
update_data_sources('01_music_dataprep.flow', tracks_data_source, ratings_data_source)

Explore the Data


[ ]:
tracks = pd.read_csv('./data/tracks.csv')
ratings = pd.read_csv('./data/ratings.csv')
[ ]:
tracks.head()
[ ]:
ratings.head()
[ ]:
print("{:,} different songs/tracks".format(tracks['trackId'].nunique()))
print("{:,} users".format(ratings['userId'].nunique()))
print("{:,} user rating events".format(ratings['ratingEventId'].nunique()))
[ ]:
tracks.groupby('genre')['genre'].count().plot.bar(title="Tracks by Genre");
[ ]:
ratings[['ratingEventId','userId']].plot.hist(by='userId', bins=50, title="Distribution of # of Ratings by User");

Create some new data to ingest later

[ ]:
tracks_new = tracks[:300]
ratings_new = ratings[:1000]

# export dataframes to csv
tracks_new.to_csv('./data/tracks_new.csv', index=False)
ratings_new.to_csv('./data/ratings_new.csv', index=False)
[ ]:
s3_client.upload_file(Filename="./data/tracks_new.csv", Bucket=bucket, Key=f'{prefix}/data/tracks_new.csv')
s3_client.upload_file(Filename="./data/ratings_new.csv", Bucket=bucket, Key=f'{prefix}/data/ratings_new.csv')

Music Recommender Part 1: Data Prep using Data Wrangler

After you completed running this notebook, you can open the Data Wrangler file 01_music_dataprep.flow.