Music Recommender Part 2c: Feature Store Creation - Ratings


This notebook creates a feature group for our ratings data to place in our feature store using the transformation instructions found in our .flow file.


Contents

πŸ’‘ Quick Start To save your processed data to feature store, Click here to create a feature group and follow the instruction to run a SageMaker processing job.

This notebook uses Amazon SageMaker Feature Store (Feature Store) to create a feature group, executes your Data Wrangler Flow 01_music_dataprep.flow on the entire dataset using a SageMaker Processing Job and ingest processed data to Feature Store.

[ ]:
import sys
import pprint
sys.path.insert(1, './code')
from parameter_store import ParameterStore
ps = ParameterStore(verbose=False)

parameters = ps.read('music-rec')

bucket = parameters['bucket']
dw_ecrlist = parameters['dw_ecrlist']
fg_name_tracks = parameters['fg_name_tracks']
flow_export_id = parameters['flow_export_id']
flow_s3_uri = parameters['flow_s3_uri']
pretrained_model_path = parameters['pretrained_model_path']
prefix = parameters['prefix']
ratings_data_source = parameters['ratings_data_source']
tracks_data_source = parameters['tracks_data_source']

Create Feature Group

What is a feature group

A single feature corresponds to a column in your dataset. A feature group is a predefined schema for a collection of features - each feature in the feature group has a specified data type and name. A single record in a feature group corresponds to a row in your dataframe. A feature store is a collection of feature groups. To learn more about SageMaker Feature Store, see Amazon Feature Store Documentation.

Define Feature Group


Select Record identifier and Event time feature name. These are required parameters for feature group creation. * Record identifier name is the name of the feature defined in the feature group’s feature definitions whose value uniquely identifies a Record defined in the feature group’s feature definitions. * Event time feature name is the name of the EventTime feature of a Record in FeatureGroup. An EventTime is a timestamp that represents the point in time when a new event occurs that corresponds to the creation or update of a Record in the FeatureGroup. All Records in the FeatureGroup must have a corresponding EventTime.

πŸ’‘Record identifier and Event time feature name are required for feature group. After filling in the values, you can choose Run Selected Cell and All Below from the Run Menu from the menu bar.

[ ]:
record_identifier_feature_name = "ratingEventId"
if record_identifier_feature_name is None:
   raise SystemExit("Select a column name as the feature group record identifier.")

event_time_feature_name = "EventTime"
if event_time_feature_name is None:
   raise SystemExit("Select a column name as the event time feature name.")

Feature Definitions

The following is a list of the feature names and feature types of the final dataset that will be produced when your data flow is used to process your input dataset. These are automatically generated from the step Custom Pyspark from Source: Answers.Csv. To save from a different step, go to Data Wrangler to select a new step to export.

πŸ’‘ Configurable Settings

  1. You can select a subset of the features. By default all columns of the result dataframe will be used as features.

  2. You can change the Data Wrangler data type to one of the Feature Store supported types (Integral, Fractional, or String). The default type is set to String. This means that, if a column in your dataset is not a float or long type, it will default to String in your Feature Store.

For Event Time features, make sure the format follows the feature store Event Time feature format

The following is a list of the feature names and data types of the final dataset that will be produced when your data flow is used to process your input dataset.

[ ]:
column_schemas = [
    {
        "name": "ratingEventId",
        "type": "string"
    },
    {
        "name": "ts",
        "type": "long"
    },
    {
        "name": "userId",
        "type": "long"
    },
    {
        "name": "trackId",
        "type": "string"
    },
    {
        "name": "sessionId",
        "type": "long"
    },
    {
        "name": "itemInSession",
        "type": "long"
    },
    {
        "name": "Rating",
        "type": "float"
    },
    {
        "name": "EventTime",
        "type": "float"
    }
]

Below we create the SDK input for those feature definitions. Some schema types in Data Wrangler are not supported by Feature Store. The following will create a default_FG_type set to String for these types.

[ ]:
from sagemaker.feature_store.feature_definition import FeatureDefinition
from sagemaker.feature_store.feature_definition import FeatureTypeEnum

default_feature_type = FeatureTypeEnum.STRING
column_to_feature_type_mapping = {
    "float": FeatureTypeEnum.FRACTIONAL,
    "long": FeatureTypeEnum.INTEGRAL
}

feature_definitions = [
    FeatureDefinition(
        feature_name=column_schema['name'],
        feature_type=column_to_feature_type_mapping.get(column_schema['type'], default_feature_type)
    ) for column_schema in column_schemas
]

Configure Feature Group

back to top


πŸ’‘ Configurable Settings

  1. feature_group_name: name of the feature group.

  2. feature_store_offline_s3_uri: SageMaker FeatureStore writes the data in the OfflineStore of a FeatureGroup to a S3 location owned by you.

  3. enable_online_store: controls if online store is enabled. Enabling the online store allows quick access to the latest value for a Record via the GetRecord API.

  4. iam_role: IAM role for executing the processing job.

[ ]:
from time import gmtime, strftime
import uuid
import sagemaker

# Sagemaker session
sess = sagemaker.Session()

# IAM role for executing the processing job.
iam_role = sagemaker.get_execution_role()

# flow name and an unique ID for this export (used later as the processing job name for the export)
flow_name = "01_music_dataprep"
flow_export_name = f"flow-{flow_export_id}"

# feature group name, with flow_name and an unique id. You can give it a customized name
feature_group_name = 'ratings-features-music-rec'
print(f"Feature Group Name: {feature_group_name}")

# SageMaker FeatureStore writes the data in the OfflineStore of a FeatureGroup to a
# S3 location owned by you.
feature_store_offline_s3_uri = 's3://' + bucket

# controls if online store is enabled. Enabling the online store allows quick access to
# the latest value for a Record via the GetRecord API.
enable_online_store = True
[ ]:
fg_name_ratings = feature_group_name

ps.add({'fg_name_ratings': fg_name_ratings}, namespace='music-rec')
ps.store()

Initialize & Create Feature Group


[ ]:
# Initialize Boto3 session that is required to create feature group
import boto3
from sagemaker.session import Session

region = boto3.Session().region_name
boto_session = boto3.Session(region_name=region)

sagemaker_client = boto_session.client(service_name='sagemaker', region_name=region)
featurestore_runtime = boto_session.client(service_name='sagemaker-featurestore-runtime', region_name=region)

feature_store_session = Session(
    boto_session=boto_session,
    sagemaker_client=sagemaker_client,
    sagemaker_featurestore_runtime_client=featurestore_runtime
)

Feature group is initialized and created below

[ ]:
from sagemaker.feature_store.feature_group import FeatureGroup

feature_group = FeatureGroup(
    name=feature_group_name, sagemaker_session=feature_store_session, feature_definitions=feature_definitions)

# only create feature group if it doesn't already exist
try:
    sagemaker_client.describe_feature_group(FeatureGroupName=feature_group_name, NextToken='string')
    feature_group_exists=True
    print("Feature Group {0} already exists. Using {0}".format(feature_group_name))
except Exception as e:
    error = e.response.get('Error').get('Code')
    if error == "ResourceNotFound":
        feature_group_exists=False
        print("Creating Feature Group {}".format(feature_group_name))
        feature_group.create(
            s3_uri=feature_store_offline_s3_uri,
            record_identifier_name=record_identifier_feature_name,
            event_time_feature_name=event_time_feature_name,
            role_arn=iam_role,
            enable_online_store=enable_online_store
        )
    if error == 'ResourceInUse':
        feature_group_exists=True
        print("Feature Group {0} already exists. Using {0}".format(feature_group_name))

Invoke the Feature Store API to create the feature group and wait until it is ready

[ ]:
import time
def wait_for_feature_group_creation_complete(feature_group):
    """Helper function to wait for the completions of creating a feature group"""
    status = feature_group.describe().get("FeatureGroupStatus")
    while status == "Creating":
        print("Waiting for Feature Group Creation")
        time.sleep(5)
        status = feature_group.describe().get("FeatureGroupStatus")
    if status != "Created":
        raise SystemExit(f"Failed to create feature group {feature_group.name}: {status}")
    print(f"FeatureGroup {feature_group.name} successfully created.")

wait_for_feature_group_creation_complete(feature_group=feature_group)

Now that the feature group is created, You will use a processing job to process your data at scale and ingest the transformed data into this feature group.

Inputs and Outputs

back to top


The below settings configure the inputs and outputs for the flow export.

πŸ’‘ Configurable Settings

In Input - Source you can configure the data sources that will be used as input by Data Wrangler

  1. For S3 sources, configure the source attribute that points to the input S3 prefixes

  2. For all other sources, configure attributes like query_string, database in the source’s DatasetDefinition object.

If you modify the inputs the provided data must have the same schema and format as the data used in the Flow. You should also re-execute the cells in this section if you have modified the settings in any data sources.

[ ]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.dataset_definition.inputs import AthenaDatasetDefinition, DatasetDefinition, RedshiftDatasetDefinition

data_sources = []

Input - S3 Source: tracks.csv

[ ]:
data_sources.append(ProcessingInput(
    source=f"{tracks_data_source}", # You could override this to point to another dataset on S3
    destination="/opt/ml/processing/tracks.csv",
    input_name="tracks.csv",
    s3_data_type="S3Prefix",
    s3_input_mode="File",
    s3_data_distribution_type="FullyReplicated"
))

Input - S3 Source: ratings.csv

[ ]:
data_sources.append(ProcessingInput(
    source=f"{ratings_data_source}", # You could override this to point to another dataset on S3
    destination="/opt/ml/processing/ratings.csv",
    input_name="ratings.csv",
    s3_data_type="S3Prefix",
    s3_input_mode="File",
    s3_data_distribution_type="FullyReplicated"
))

Output: Feature Store

Below are the inputs required by the SageMaker Python SDK to launch a processing job with feature store as an output.

[ ]:
from sagemaker.processing import FeatureStoreOutput

# Output name is auto-generated from the select node's ID + output name from the flow file.
output_name = "9a283380-91ca-478e-be99-6ba3bf57c680.default" # ratings node

processing_job_output = ProcessingOutput(
    output_name=output_name,
    app_managed=True,
    feature_store_output=FeatureStoreOutput(feature_group_name=feature_group_name),
)

We already uploaded our flow file in the 02a notebook. Here the Data Wrangler Flow is also provided to the Processing Job as an input source which we configure below.

[ ]:
## Input - Flow: 01_music_dataprep.flow
flow_input = ProcessingInput(
    source=flow_s3_uri,
    destination="/opt/ml/processing/flow",
    input_name="flow",
    s3_data_type="S3Prefix",
    s3_input_mode="File",
    s3_data_distribution_type="FullyReplicated"
)

Run Processing Job

back to top


Job Configurations

πŸ’‘ Configurable Settings

You can configure the following settings for Processing Jobs. If you change any configurations you will need to re-execute this and all cells below it by selecting the Run menu above and click Run Selected Cells and All Below

  1. IAM role for executing the processing job.

  2. A unique name of the processing job. Give a unique name every time you re-execute processing jobs

  3. Data Wrangler Container URL.

  4. Instance count, instance type and storage volume size in GB.

  5. Content type for each output. Data Wrangler supports CSV as default and Parquet.

  6. Network Isolation settings

[ ]:
# IAM role for executing the processing job.
iam_role = sagemaker.get_execution_role()

processing_job_name = "dw-flow-proc-music-rec-ratings-{}-{}".format(flow_export_id, str(uuid.uuid4())[:8])
print (f"{processing_job_name}")

# Data Wrangler Container URL.
container_uri = f"{dw_ecrlist['region'][region]}.dkr.ecr.{region}.amazonaws.com/sagemaker-data-wrangler-container:1.x"

# Processing Job Instance count and instance type.
instance_count = 2
instance_type = "ml.m5.4xlarge"

# Size in GB of the EBS volume to use for storing data during processing
volume_size_in_gb = 30

# Content type for each output. Data Wrangler supports CSV as default and Parquet.
output_content_type = "CSV"

# Network Isolation mode; default is off
enable_network_isolation = False

# Output configuration used as processing job container arguments
output_config = {
    output_name: {
        "content_type": output_content_type
    }
}

Create Processing Job

To launch a Processing Job, you will use the SageMaker Python SDK to create a Processor function.

[ ]:
from sagemaker.processing import Processor
from sagemaker.network import NetworkConfig
import json

processor = Processor(
    role=iam_role,
    image_uri=container_uri,
    instance_count=instance_count,
    instance_type=instance_type,
    volume_size_in_gb=volume_size_in_gb,
    network_config=NetworkConfig(enable_network_isolation=enable_network_isolation),
    sagemaker_session=sess
)

Job Status & S3 Output Location

Below you wait for processing job to finish. If it finishes successfully, your feature group should be populated with transformed feature values. In addition the raw parameters used by the Processing Job will be printed.

[ ]:
%%time

# Run Processing Job if job not already previously ran
if feature_group_exists:
    print("Feature Group {0} already exists therefore we will not run a processing job to create it again".format(feature_group_name))
else:
    print("Creating Processing Job: {}".format(feature_group_name))
    processor.run(
        inputs=[flow_input] + data_sources,
        outputs=[processing_job_output],
        arguments=[f"--output-config '{json.dumps(output_config)}'"],
        wait=False,
        logs=False,
        job_name=processing_job_name
    )

    job_result = sess.wait_for_processing_job(processing_job_name)
    print(job_result)

You can view newly created feature group in Studio, refer to Use Amazon SageMaker Feature Store with Amazon SageMaker Studio for detailed guide.Learn more about SageMaker Feature Store