Build a Customer Churn Model for Music Streaming App Users: Date Pre-processing with SageMaker Data Wrangler and Processing Job

In this demo, you are going to learn how to use various SageMaker functionalities to build, train, and deploy the model from end to end, including data pre-processing steps like ingestion, cleaning and processing, feature engineering, training and hyperparameter tuning, model explainability, and eventually deploy the model. There are two parts of the demo: in part 1: Prepare Data, you will process the data with the help of Data Wrangler, then create features from the cleaned data. By the end of part 1, you will have a complete feature data set that contains all attributes built for each user, and it is ready for modeling. Then in part 2: Modeling and Reference, you will use the data set built from part 1 to find an optimal model for the use case, then test the model predictability with the test data. To start with Part 2, you can either read in data from the output of your Part 1 results, or use the provided ‘data/full_feature_data.csv’ as the input for the next steps.

For how to set up the SageMaker Studio Notebook environment, please check the onboarding video. And for a list of services covered in the use case demo, please check the documentation linked in each section.

Content

Overview

What is Customer Churn and why is it important for businesses?

Customer churn, or customer retention/attrition, means a customer has the tendency to leave and stop paying for a business. It is one of the primary metrics companies want to track to get a sense of their customer satisfaction, especially for a subscription-based business model. The company can track churn rate (defined as the percentage of customers churned during a period) as a health indicator for the business, but we would love to identify the at-risk customers before they churn and offer appropriate treatment to keep them with the business, and this is where machine learning comes into play.

Use Cases for Customer Churn

Any subscription-based business would track customer churn as one of the most critical Key Performance Indicators (KPIs). Such companies and industries include Telecom companies (cable, cell phone, internet, etc.), digital subscriptions of media (news, forums, blogposts platforms, etc.), music and video streaming services, and other Software as a Service (SaaS) providers (e-commerce, CRM, Mar-Tech, cloud computing, video conference provider, and visualization and data science tools, etc.)

Define Business problem

To start with, here are some common business problems to consider depending on your specific use cases and your focus: * Will this customer churn (cancel the plan, cancel the subscription)? * Will this customer downgrade a pricing plan? * For a subscription business model, will a customer renew his/her subscription?

Machine learning problem formulation

Classification: will this customer churn?

To goal of classification is to identify the at-risk customers and sometimes their unusual behavior, such as: will this customer churn or downgrade their plan? Is there any unusual behavior for a customer? The latter question can be formulated as an anomaly detection problem.

Time Series: will this customer churn in the next X months? When will this customer churn?

You can further explore your users by formulating the problem as a time series one and detect when will the customer churn.

Data Requirements

Data collection Sources

Some most common data sources used to construct a data set for churn analysis are: * Customer Relationship Management platform (CRM), * engagement and usage data (analytics services), * passive feedback (ratings based on your request), and active feedback (customer support request, feedback on social media and review platforms).

Construct a Data Set for Churn Analysis

Most raw data collected from the sources mentioned above are huge and often needs a lot of cleaning and pre-processing. For example, usage data is usually event-based log data and can be more than a few gigabytes every day; you can aggregate the data to user-level daily for further analysis. Feedback and review data are mostly text data, so you would need to clean and pre-process the natural language data to be normalized, machine-readable data. If you are joining multiple data sources (especially from different platforms) together, you would want to make sure all data points are consistent, and the user identity can be matched across different platforms.

Challenges with Customer Churn

  • Business related

    • Importance of domain knowledge: this is critical when you start building features for the machine learning model. It is important to understand the business enough to decide which features would trigger retention.

  • Data issues

    • fewer churn data available (imbalanced classes): data for churn analysis is often very imbalanced as most of the customers of a business are happy customers (usually).

    • User identity mapping problem: if you are joining data from different platforms (CRM, email, feedback, mobile app, and website usage data), you would want to make sure user A is recognized as the same user across multiple platforms. There are third-party solutions that help you tackle this problem.

    • Not collecting the right data for the use case or Lacking enough data

Use Case Study - Music Streaming User Churn Prediction

Data Selection

You will use generated music streaming data that is simulated to imitate music streaming user behaviors. The data simulated contains 1100 users and their user behavior for one year (2019/10/28 - 2020/10/28). Data is simulated using the EventSim and does not contain any real user data.

  • Observation window: you will use 1 year of data to generate predictions.

  • Explanation of fields:

    • ts: event UNIX timestamp

    • userId: a randomly assigned unique user id

    • sessionId: a randomly assigned session id unique to each user

    • page: event taken by the user, e.g. “next song”, “upgrade”, “cancel”

    • auth: whether the user is a logged-in user

    • method: request method, GET or PUT

    • status: request status

    • level: if the user is a free or paid user

    • itemInSession: event happened in the session

    • location: location of the user’s IP address

    • userAgent: agent of the user’s device

    • lastName: user’s last name

    • firstName: user’s first name

    • registration: user’s time of registration

    • gender: gender of the user

    • artist: artist of the song the user is playing at the event

    • song: song title the user is playing at the event

    • length: length of the session

  • the data will be downloaded from Github and contained in an *Amazon Simple Storage Service* (Amazon S3) bucket.

For this specific use case, you will focus on a solution to predict whether a customer will cancel the subscription. Some possible expansion of the work includes: * predict plan downgrading * when a user will churn * add song attributes (genre, playlist, charts) and user attributes (demographics) to the data * add user feedback and customer service requests to the data

Architecture Diagram

The services covered in the use case and an architecture diagram is shown below.

<img src="image/use_case_diagram_v2.png" width="800"/>
[1]:
## The output from Data Wrangler is also provided in the github repo (data/data_wrangler_output.csv).
## You can also read the provided csv directly.

Feature engineering with SageMaker Processing Job

For user churn analysis, usually, you can consider build features from the following aspects:

  • Generate base features:

    • user behavior features (listening behavior, app behavior).

    • customer demographic features.

    • customer support features (interactions, ratings, etc.)

  • Formulate time series as features:

    • construct streaming time as time series.

    • build features in the different time windows (e.g. total songs listened in the last 7 days, 30 days, 180 days, etc.)

For this use case, after exploring the data and with all the findings you gathered, now is the time to create features used for your model. Since the data set is time series, you can enrich your features by adding a time factor to it: e.g., for the total number of songs listened, you can create features like total songs listened in the last 7 days, last 30 days, last 90 days, last 180 days, etc. The features built for these use cases will be at the user level - each row represents one user, and will include the following:

  • daily features:

    • average_events_weekday (numerical): average number of events per day during weekday

    • average_events_weekend (numerical): average number of events per day during the weekend

    • num_ads_7d: number of ads in last 7 days

    • num_error_7d: total errors encountered in last 7 days

    • num_songs_played_7d: total songs played in last 7 days

    • num_songs_played_30d: total songs played in last 30 days

    • num_songs_played_90d: total songs played in last 90 days

  • user features:

    • num_artists (numerical): number of artists the user has listened to

    • num_songs (numerical): number of songs played

    • num_ads (numerical): number of ads played

    • num_thumbsup (numerical): number of times the user likes a song

    • num_thumbsdown (numerical): number of times the user dislikes a song

    • num_playlist (numerical): number of times user adds a song to a playlist

    • num_addfriend (numerical): number of times user adds a friend

    • num_error (numerical): number of times user encountered an error

    • user_downgrade (binary): user has downgraded plan

    • user_upgrade (binary): user has upgraded plan

    • percentage_song: percentage of the user’s action is ‘NextSong’ (only listens to songs)

    • percentage_ad: percentage of the user’s action is ‘Roll Advert’

    • repeats_ratio: percentage of total songs that are repeats

    • days_since_active: days since the user registered and leave (if the user cancels)

  • Session features:

    • num_sessions: number of total sessions

    • avg_time_per_session: average time spent per session

    • avg_events_per_session: average number of events per session

    • avg_gap_between_session: average time between sessions

The following function will create the processing job with SageMaker Processing, a new Python SDK that lets data scientists and ML engineers easily run preprocessing, postprocessing and model evaluation workloads on Amazon SageMaker. This SDK uses SageMaker’s built-in container for scikit-learn, possibly the most popular library for data set transformation. You can find a complete guide to the SageMaker Processing job in this blog.

[2]:
!pip install -q pandas=='1.1.5'
ERROR: pg8000 1.17.0 has requirement scramp==1.2.0, but you'll have scramp 1.2.2 which is incompatible.
[3]:
# !pip -uQ install s3fs
[ ]:
%store -r
%store
[ ]:
processing_output_filename
[27]:
import sagemaker
import json
import pandas as pd
import numpy as np
import glob
import boto3
[31]:
sagemaker_session = sagemaker.Session()
s3 = sagemaker_session.boto_session.resource("s3")

region = boto3.Session().region_name
role = sagemaker.get_execution_role()
smclient = boto3.Session().client("sagemaker")

output_path = f"s3://{bucket}/{prefix}/data/processing/"
[32]:
from sagemaker.sklearn.processing import SKLearnProcessor

sklearn_processor = SKLearnProcessor(
    #     framework_version='0.20.0',
    framework_version="0.23-1",
    role=role,
    instance_type="ml.m5.xlarge",
    instance_count=1,
)
[34]:
### SAVE THE OUTPUT FILE NAME FROM PROCESSING JOB
processing_job_output_name = 'processing_job_output.csv'
%store processing_job_output_name
[35]:
%%writefile preprocessing.py

import os
import warnings
import time
import pandas as pd
import argparse
import subprocess
import sys

subprocess.check_call([sys.executable, "-m", "pip", "install", "awswrangler"])
import awswrangler as wr

start_time = time.time()

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--dw-output-path")
    parser.add_argument("--processing-output-filename")

    args, _ = parser.parse_known_args()
    print("Received arguments {}".format(args))

    data_s3_uri = args.dw_output_path
    output_filename = args.processing_output_filename

    #     data_path = os.path.join('/opt/ml/processing/input', dw_output_name)
    #     df = pd.read_csv(data_path)
    df = wr.s3.read_csv(path=data_s3_uri, dataset=True)
    ## convert to time
    df["date"] = pd.to_datetime(df["ts"], unit="ms")
    df["ts_dow"] = df["date"].dt.weekday
    df["ts_date_day"] = df["date"].dt.date
    df["ts_is_weekday"] = [1 if x in [0, 1, 2, 3, 4] else 0 for x in df["ts_dow"]]
    df["registration_ts"] = pd.to_datetime(df["registration"], unit="ms").dt.date
    ## add labels
    df["churned_event"] = [1 if x == "Cancellation Confirmation" else 0 for x in df["page"]]
    df["user_churned"] = df.groupby("userId")["churned_event"].transform("max")

    ## convert pages categorical variables to numerical
    events_list = [
        "NextSong",
        "Thumbs Down",
        "Thumbs Up",
        "Add to Playlist",
        "Roll Advert",
        "Add Friend",
        "Downgrade",
        "Upgrade",
        "Error",
    ]
    usage_column_name = []
    for event in events_list:
        event_name = "_".join(event.split()).lower()
        usage_column_name.append(event_name)
        df[event_name] = [1 if x == event else 0 for x in df["page"]]
    ## feature engineering
    # average_events_weekday (numerical): average number of events per day during weekday
    # average_events_weekend (numerical): average number of events per day during the weekend
    base_df = (
        df.groupby(["userId", "ts_date_day", "ts_is_weekday"])
        .agg({"page": "count"})
        .groupby(["userId", "ts_is_weekday"])["page"]
        .mean()
        .unstack(fill_value=0)
        .reset_index()
        .rename(columns={0: "average_events_weekend", 1: "average_events_weekday"})
    )

    # num_ads_7d, num_songs_played_7d, num_songs_played_30d, num_songs_played_90d, num_ads_7d, num_error_7d
    base_df_daily = (
        df.groupby(["userId", "ts_date_day"])
        .agg({"page": "count", "nextsong": "sum", "roll_advert": "sum", "error": "sum"})
        .reset_index()
    )
    feature34 = (
        base_df_daily.groupby(["userId", "ts_date_day"])
        .tail(7)
        .groupby(["userId"])
        .agg({"nextsong": "sum", "roll_advert": "sum", "error": "sum"})
        .reset_index()
        .rename(
            columns={
                "nextsong": "num_songs_played_7d",
                "roll_advert": "num_ads_7d",
                "error": "num_error_7d",
            }
        )
    )
    feature5 = (
        base_df_daily.groupby(["userId", "ts_date_day"])
        .tail(30)
        .groupby(["userId"])
        .agg({"nextsong": "sum"})
        .reset_index()
        .rename(columns={"nextsong": "num_songs_played_30d"})
    )
    feature6 = (
        base_df_daily.groupby(["userId", "ts_date_day"])
        .tail(90)
        .groupby(["userId"])
        .agg({"nextsong": "sum"})
        .reset_index()
        .rename(columns={"nextsong": "num_songs_played_90d"})
    )
    # num_artists, num_songs, num_ads, num_thumbsup, num_thumbsdown, num_playlist, num_addfriend, num_error, user_downgrade,
    # user_upgrade, percentage_ad, days_since_active
    base_df_user = (
        df.groupby(["userId"])
        .agg(
            {
                "page": "count",
                "nextsong": "sum",
                "artist": "nunique",
                "song": "nunique",
                "thumbs_down": "sum",
                "thumbs_up": "sum",
                "add_to_playlist": "sum",
                "roll_advert": "sum",
                "add_friend": "sum",
                "downgrade": "max",
                "upgrade": "max",
                "error": "sum",
                "ts_date_day": "max",
                "registration_ts": "min",
                "user_churned": "max",
            }
        )
        .reset_index()
    )
    base_df_user["percentage_ad"] = base_df_user["roll_advert"] / base_df_user["page"]
    base_df_user["days_since_active"] = (
        base_df_user["ts_date_day"] - base_df_user["registration_ts"]
    ).dt.days
    # repeats ratio
    base_df_user["repeats_ratio"] = 1 - base_df_user["song"] / base_df_user["nextsong"]

    # num_sessions, avg_time_per_session, avg_events_per_session,
    base_df_session = (
        df.groupby(["userId", "sessionId"])
        .agg({"length": "sum", "page": "count", "date": "min"})
        .reset_index()
    )
    base_df_session["prev_session_ts"] = base_df_session.groupby(["userId"])["date"].shift(1)
    base_df_session["gap_session"] = (
        base_df_session["date"] - base_df_session["prev_session_ts"]
    ).dt.days
    user_sessions = (
        base_df_session.groupby("userId")
        .agg({"sessionId": "count", "length": "mean", "page": "mean", "gap_session": "mean"})
        .reset_index()
        .rename(
            columns={
                "sessionId": "num_sessions",
                "length": "avg_time_per_session",
                "page": "avg_events_per_session",
                "gap_session": "avg_gap_between_session",
            }
        )
    )

    # merge features together
    base_df["userId"] = base_df["userId"].astype("int")
    final_feature_df = base_df.merge(feature34, how="left", on="userId")
    final_feature_df = final_feature_df.merge(feature5, how="left", on="userId")
    final_feature_df = final_feature_df.merge(feature6, how="left", on="userId")
    final_feature_df = final_feature_df.merge(user_sessions, how="left", on="userId")
    final_feature_df = final_feature_df.merge(base_df_user, how="left", on="userId")

    final_feature_df = final_feature_df.fillna(0)
    # renaming columns
    final_feature_df.columns = [
        "userId",
        "average_events_weekend",
        "average_events_weekday",
        "num_songs_played_7d",
        "num_ads_7d",
        "num_error_7d",
        "num_songs_played_30d",
        "num_songs_played_90d",
        "num_sessions",
        "avg_time_per_session",
        "avg_events_per_session",
        "avg_gap_between_session",
        "num_events",
        "num_songs",
        "num_artists",
        "num_unique_songs",
        "num_thumbs_down",
        "num_thumbs_up",
        "num_add_to_playlist",
        "num_ads",
        "num_add_friend",
        "num_downgrade",
        "num_upgrade",
        "num_error",
        "ts_date_day",
        "registration_ts",
        "user_churned",
        "percentage_ad",
        "days_since_active",
        "repeats_ratio",
    ]
    # only keep created feature columns
    final_feature_df = final_feature_df[
        [
            "userId",
            "user_churned",
            "average_events_weekend",
            "average_events_weekday",
            "num_songs_played_7d",
            "num_ads_7d",
            "num_error_7d",
            "num_songs_played_30d",
            "num_songs_played_90d",
            "num_sessions",
            "avg_time_per_session",
            "avg_events_per_session",
            "avg_gap_between_session",
            "num_events",
            "num_songs",
            "num_artists",
            "num_thumbs_down",
            "num_thumbs_up",
            "num_add_to_playlist",
            "num_ads",
            "num_add_friend",
            "num_downgrade",
            "num_upgrade",
            "num_error",
            "percentage_ad",
            "days_since_active",
            "repeats_ratio",
        ]
    ]

    print("shape of file to append:\t\t{}".format(final_feature_df.shape))
    iter_end_time = time.time()
    end_time = time.time()
    print("minutes elapsed: {}".format(str((end_time - start_time) / 60)))

    final_features_output_path = os.path.join("/opt/ml/processing/output", output_filename)
    print("Saving processed data to {}".format(final_features_output_path))
    final_feature_df.to_csv(final_features_output_path, header=True, index=False)
Overwriting preprocessing.py
[36]:
output_path = processing_output_filename
[ ]:
%%time
from sagemaker.processing import ProcessingInput, ProcessingOutput

processing_job_output_path = f"s3://{bucket}/{prefix}/data/processing"

sklearn_processor.run(
    code="preprocessing.py",
    outputs=[
        ProcessingOutput(
            output_name="processed_data",
            source="/opt/ml/processing/output",
            destination=processing_job_output_path,
        )
    ],
    arguments=[
        "--dw-output-path",
        processing_job_output_path,
        "--processing-output-filename",
        processing_job_output_name,
    ],
)

preprocessing_job_description = sklearn_processor.jobs[-1].describe()
[ ]:
preprocessing_job_description

Congratulations! You have completed Part1: Prepare the data, and now you should have created the complete feature set that is ready for modeling. You can proceed to Part2: modeling and Reference.

PART 2: Modeling and Reference

now that you have created the complete feature set, you can start to explore and find a best-working model for your churn use case. By the end of part 2, you will select an algorithm, find the best sets of hyperparameter for the model, examine how well the model performs, and finally find the top influential features.

To start with Part 2, you can either read in data from the output of your Part 1 results, or use the provided ‘data/full_feature_data.csv’ as the input (variable dataframe processed_data) for the next steps.

Data Splitting

You formulated the use case as a classification problem on user level, so you can randomly split your data from last step into train/validation/test. If you want to predict “will user X churn in the next Y days” on per user per day level, you should think about spliting data in chronological order instead of random.

You should split the data and make sure that data of both classes exist in your train, validation and test sets, to make sure both classes are represented in your data.

Find the output of Processing Job

[ ]:
processing_job_output_uri = f"{processing_job_output_path}/{processing_job_output_name}"
processing_job_output_uri
[ ]:
!aws s3 cp $processing_job_output_uri ./data
[48]:
processed_data = pd.read_csv(processing_job_output_uri)
[43]:
# Optional: you can also load the processed data from the provided feature set
# processed_data = pd.read_csv('./data/full_feature_data.csv')
[49]:
processed_data.head(4)
[49]:
userId user_churned average_events_weekend average_events_weekday num_songs_played_7d num_ads_7d num_error_7d num_songs_played_30d num_songs_played_90d num_sessions ... num_thumbs_up num_add_to_playlist num_ads num_add_friend num_downgrade num_upgrade num_error percentage_ad days_since_active repeats_ratio
0 11001 0.0 189.875 152.608696 8270 14 2 8270 8270 51 ... 586 280 14 162 1 1 2 0.001392 359 0.589722
1 11002 0.0 141.000 153.333333 952 2 0 952 952 7 ... 82 32 2 28 1 0 0 0.001664 265 0.526261
2 11003 1.0 197.500 241.750000 7734 24 18 7734 7734 37 ... 544 206 24 138 1 1 18 0.002576 66 0.587665
3 11004 1.0 140.000 240.888889 2168 4 2 2168 2168 7 ... 136 60 4 18 1 0 2 0.001546 48 0.538284

4 rows × 27 columns

Split data to train/validation/test by 70/20/10

[50]:
data = processed_data.sample(frac=1, random_state=1729)
grouped_df = data.groupby("user_churned")
arr_list = [np.split(g, [int(0.7 * len(g)), int(0.9 * len(g))]) for i, g in grouped_df]

train_data = pd.concat([t[0] for t in arr_list])
validation_data = pd.concat([t[1] for t in arr_list])
test_data = pd.concat([v[2] for v in arr_list])
[51]:
def process_data(data, name, header=False):
    data = data.drop(columns=["userId"])
    data = pd.concat([data["user_churned"], data.drop(["user_churned"], axis=1)], axis=1)
    data.to_csv(name, header=header, index=False)
[52]:
process_data(train_data, "data/train_updated.csv")
process_data(validation_data, "data/validation_updated.csv")
process_data(test_data, "data/test_updated.csv")

process_data(train_data, "data/train_w_header.csv", header=True)
process_data(validation_data, "data/validation_w_header.csv", header=True)
process_data(test_data, "data/test_w_header.csv", header=True)

Save splitted data to S3

The splitted data is provided in the /data folder. You can also upload the provided files (data/train_updated.csv,data/validation_updated.csv, data/test_updated.csv) and proceed to the next step.

[53]:
import os

s3_input_train = (
    boto3.Session()
    .resource("s3")
    .Bucket(bucket)
    .Object(os.path.join(prefix, "train/train.csv"))
    .upload_file("data/train_updated.csv")
)
s3_input_validation = (
    boto3.Session()
    .resource("s3")
    .Bucket(bucket)
    .Object(os.path.join(prefix, "validation/validation.csv"))
    .upload_file("data/validation_updated.csv")
)
s3_input_validation = (
    boto3.Session()
    .resource("s3")
    .Bucket(bucket)
    .Object(os.path.join(prefix, "test/test_labeled.csv"))
    .upload_file("data/test_updated.csv")
)

Disclaimer

The data used in this notebook is synthetic and does not contain real user data. The results (all the names, emails, IP addresses, and browser information) of this simulation are fake.

Citation

The data used in this notebook is simulated using the EventSim.

[ ]: