Train Machine Learning Models using Amazon Keyspaces as a Data Source

Contributors - Vadim Lyakhovich (AWS) - Ram Pathangi (AWS) - Parth Patel (AWS)

Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved

Prerequisites

The Notebook execution role must include permissions to access Amazon Keyspaces and Assume the role.

  • To access Amazon Keyspaces database - use AmazonKeyspacesReadOnlyAccess or AmazonKeyspacesFullAccess managed policies. Use policy of the least privilege when running in production.

See more at `AWS Identity and Access Management for Amazon Keyspaces <https://docs.aws.amazon.com/keyspaces/latest/devguide/security-iam.html>`__.

Note:

Amazon Keyspaces is available in the following AWS Regions.

This notebook was tested with conda_python3 kernel and should work with Python 3.x.

In this notebook,

  1. First, we install Sigv4 driver to connect to Amazon Keyspaces

    The Amazon Keyspaces SigV4 authentication plugin for Cassandra client drivers enables you to authenticate calls to Amazon Keyspaces *using IAM access keys instead of username and password*. To learn more about how the Amazon Keyspaces SigV4 plugin enables `IAM users, roles, and federated identities <https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles.html>`__ to authenticate in Amazon Keyspaces API requests, see `AWS Signature Version 4 process (SigV4) <https://docs.aws.amazon.com/general/latest/gr/signature-version-4.html>`__

  2. Next, we establish a connection to Amazon Keyspaces

  3. Next, we create a new Keyspace ***blog_(yyyymmdd)*** and a new table ***online_retail***

  4. Next, we will download retail data about customers.

  5. Next, we will ingest retail data about customers into Keyspaces.

  6. Next, we use a notebook available within SageMaker Studio to collect data from Keyspaces database, and prepare data for training using KNN Algorithm. Most of our customers use SageMaker Studio for end to end development of ML Use Cases. They could use this notebook as a base and customize it quickly for their use case. Additionally, they will be able to share this with other collaborators without requiring them to install any additional software.

  7. Next, we will train the data for clustering.

  8. After the training is complete, we can view the mapping between customer and their associated cluster.

  9. And finally, Cleanup Step to drop Keyspaces table to avoid future charges.

[ ]:
# Install missing packages and import dependencies

# Installing Cassanda SigV4
%pip install  cassandra-sigv4

# Get Security certificate
!curl https://certs.secureserver.net/repository/sf-class2-root.crt -O

# Import
from sagemaker import get_execution_role
from cassandra.cluster import Cluster
from ssl import SSLContext, PROTOCOL_TLSv1_2, CERT_REQUIRED
from cassandra_sigv4.auth import SigV4AuthProvider
import boto3

import pandas as pd
from pandas import DataFrame

import csv
from cassandra import ConsistencyLevel
from datetime import datetime
import time
from datetime import timedelta

import pandas as pd
import datetime as dt
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.cluster import KMeans
from sklearn.preprocessing import MinMaxScaler

# Getting credentials from the role
client = boto3.client("sts")

# Get notebook Role
role = get_execution_role()
role_info = {"RoleArn": role, "RoleSessionName": "session1"}
print(role_info)

credentials = client.assume_role(**role_info)
[ ]:
# Connect to Cassandra Database from SageMaker Notebook using temporary credentials from the Role.
session = boto3.session.Session(
    aws_access_key_id=credentials["Credentials"]["AccessKeyId"],
    aws_secret_access_key=credentials["Credentials"]["SecretAccessKey"],
    aws_session_token=credentials["Credentials"]["SessionToken"],
)

region_name = session.region_name

# Set Context
ssl_context = SSLContext(PROTOCOL_TLSv1_2)
ssl_context.load_verify_locations("sf-class2-root.crt")
ssl_context.verify_mode = CERT_REQUIRED

auth_provider = SigV4AuthProvider(session)

keyspaces_host = "cassandra." + region_name + ".amazonaws.com"

cluster = Cluster([keyspaces_host], ssl_context=ssl_context, auth_provider=auth_provider, port=9142)
session = cluster.connect()


# Read data from Keyspaces system table.  Keyspaces is serverless DB so you don't have to create Keyspaces DB ahead of time.
r = session.execute("select * from system_schema.keyspaces")

# Read Keyspaces row into Panda DataFrame
df = DataFrame(r)
print(df)

Download Sample data

For this example we are using public repository at http://archive.ics.uci.edu/ml [1]

References

[1] Dua, D. and Graff, C. (2019). UCI Machine Learning Repository [http://archive.ics.uci.edu/ml]. Irvine, CA: University of California, School of Information and Computer Science.

[ ]:
# Download sample data

!aws s3 cp s3://sagemaker-sample-files/datasets/tabular/online_retail/online_retail_II_20k.csv .

In this step we will create a new Keyspace ***blog_yyymmdd*** and table ***online_retail***

CREATE KEYSPACE IF NOT EXISTS blog_yyyymmdd
WITH
    REPLICATION = {'class': 'SingleRegionStrategy'}


CREATE TABLE IF NOT EXISTS online_retail (
 invoice    text,
 stock_code text,
 description    text,
 quantity   int,
 invoice_date   date,
 price  decimal,
 customer_id    text,
 country    text,
   PRIMARY KEY (invoice,stock_code));
[ ]:
# Create Keyspace

dt = datetime.now()
keyspaces_schema = "blog_" + str(dt.year) + str(dt.month) + str(dt.day)

createKeyspace = """CREATE KEYSPACE IF NOT EXISTS %s
WITH
    REPLICATION = {'class': 'SingleRegionStrategy'}; """
cr = session.execute(createKeyspace % keyspaces_schema)
time.sleep(5)
print("Keyspace '" + keyspaces_schema + "' created")

# Create Table
createTable = """CREATE TABLE IF NOT EXISTS %s.online_retail (
 invoice text,
 stock_code text,
 description text,
 quantity int,
 invoice_date date,
 price decimal,
 customer_id text,
 country text,
   PRIMARY KEY (invoice,stock_code));
"""
cr = session.execute(createTable % keyspaces_schema)
time.sleep(20)
print("Table 'online_retail' created")

Reading Online Retail CSV file and ingesting into Keyspaces table

[ ]:
# Populate test data.

# csv file name
filename = "online_retail_II_20k.csv"
ROW_LIMIT = 20000
PRINT_ROWS = 2000


# initializing the titles and rows list
fields = []
rows = []

insert = (
    "INSERT INTO "
    + keyspaces_schema
    + '.online_retail ("invoice","stock_code","description","quantity","invoice_date","price","customer_id","country") VALUES (?,?,?,?,?,?,?,?);'
)

prepared = session.prepare(insert)
prepared.consistency_level = ConsistencyLevel.LOCAL_QUORUM


print("Start Loading", ROW_LIMIT, "rows into the table at", datetime.now())
start_time = time.monotonic()

# reading csv file.
with open(filename, "r", encoding="utf-8-sig") as csvfile:
    # creating a csv reader object
    csvreader = csv.reader(csvfile)

    # extracting field names through first row
    fields = next(csvreader)

    # extracting each data row one by one
    # print(fields)
    for row in csvreader:
        try:
            if (csvreader.line_num % PRINT_ROWS) == 0:
                print("Rows so far: %d" % (csvreader.line_num))
                print(datetime.now())

            # print(row)
            inv_date = datetime.strptime(row[4], "%m/%d/%y %H:%M")
            # print(inv_date)
            r = session.execute(
                prepared,
                (
                    str(row[0]),
                    str(row[1]),
                    str(row[2]),
                    int(row[3]),
                    inv_date,
                    float(row[5]),
                    str(row[6]),
                    str(row[7]),
                ),
            )

            if csvreader.line_num >= ROW_LIMIT:
                break
        except Exception as ex:
            print("Error for row %d" % (csvreader.line_num))
            print(row)
            print(ex)

    # get total number of rows
    print("Total no. of rows: %d" % (csvreader.line_num))

end_time = time.monotonic()
print("Load time:", timedelta(seconds=end_time - start_time), "for", ROW_LIMIT, "rows")

Now that we have data in Keyspace, let’s read the data from Keyspace into the data frame. Once you have data into data frame you can perform the data cleanup to make sure it’s ready to train the modal.

In this example we will also group the data based on Recency, Frequency and Monetary value to generate RFM Matrix.

[ ]:
# Prepare Data

r = session.execute("select * from " + keyspaces_schema + ".online_retail")

df = DataFrame(r)
df.head(100)

df.count()
df["description"].nunique()
df["totalprice"] = df["quantity"] * df["price"]
df.groupby("invoice").agg({"totalprice": "sum"}).head()

df.groupby("description").agg({"price": "max"}).sort_values("price", ascending=False).head()
df.sort_values("price", ascending=False).head()
df["country"].value_counts().head()
df.groupby("country").agg({"totalprice": "sum"}).sort_values("totalprice", ascending=False).head()

returned = df[df["invoice"].str.contains("C", na=False)]
returned.sort_values("quantity", ascending=True).head()

df.isnull().sum()
df.dropna(inplace=True)
df.isnull().sum()
df.dropna(inplace=True)
df.isnull().sum()
df.describe([0.05, 0.01, 0.25, 0.50, 0.75, 0.80, 0.90, 0.95, 0.99]).T
df.drop(df.loc[df["customer_id"] == ""].index, inplace=True)

# Recency Metric
import datetime as dt

today_date = dt.date(2011, 12, 9)
df["customer_id"] = df["customer_id"].astype(int)

# create get the most recent invoice for each customer
temp_df = df.groupby("customer_id").agg({"invoice_date": "max"})
temp_df["invoice_date"] = temp_df["invoice_date"].astype(str)
temp_df["invoice_date"] = pd.to_datetime(temp_df["invoice_date"]).dt.date
temp_df["Recency"] = (today_date - temp_df["invoice_date"]).dt.days
recency_df = temp_df.drop(columns=["invoice_date"])
recency_df.head()

# Frequency Metric
temp_df = df.groupby(["customer_id", "invoice"]).agg({"invoice": "count"})
freq_df = temp_df.groupby("customer_id").agg({"invoice": "count"})
freq_df.rename(columns={"invoice": "Frequency"}, inplace=True)

# Monetary Metric
monetary_df = df.groupby("customer_id").agg({"totalprice": "sum"})
monetary_df.rename(columns={"totalprice": "Monetary"}, inplace=True)
rfm = pd.concat([recency_df, freq_df, monetary_df], axis=1)

df = rfm
df["RecencyScore"] = pd.qcut(df["Recency"], 5, labels=[5, 4, 3, 2, 1])
df["FrequencyScore"] = pd.qcut(df["Frequency"].rank(method="first"), 5, labels=[1, 2, 3, 4, 5])
df["Monetary"] = df["Monetary"].astype(int)
df["MonetaryScore"] = pd.qcut(df["Monetary"], 5, labels=[1, 2, 3, 4, 5])
df["RFM_SCORE"] = (
    df["RecencyScore"].astype(str)
    + df["FrequencyScore"].astype(str)
    + df["MonetaryScore"].astype(str)
)
seg_map = {
    r"[1-2][1-2]": "Hibernating",
    r"[1-2][3-4]": "At Risk",
    r"[1-2]5": "Can't Loose",
    r"3[1-2]": "About to Sleep",
    r"33": "Need Attention",
    r"[3-4][4-5]": "Loyal Customers",
    r"41": "Promising",
    r"51": "New Customers",
    r"[4-5][2-3]": "Potential Loyalists",
    r"5[4-5]": "Champions",
}

df["Segment"] = df["RecencyScore"].astype(str) + rfm["FrequencyScore"].astype(str)
df["Segment"] = df["Segment"].replace(seg_map, regex=True)
df.head()
rfm = df.loc[:, "Recency":"Monetary"]
df.groupby("customer_id").agg({"Segment": "sum"}).head()

Now that we have our final dataset, we will start our training.

[ ]:
# Training

sc = MinMaxScaler((0, 1))
df = sc.fit_transform(rfm)

# Clustering
kmeans = KMeans(n_clusters=6).fit(df)

# Result
kumeler = kmeans.labels_

Let’s visualize the data to see how records are distributed in different clusters.

[ ]:
# Visualize the clusters
import matplotlib.pyplot as plt

final_df = pd.DataFrame({"customer_id": rfm.index, "Kumeler": kumeler})
bucket_deta = final_df.groupby("Kumeler").agg({"customer_id": "count"}).head()
index_deta = final_df.groupby("Kumeler").agg({"Kumeler": "max"}).head()
index_deta["Kumeler"] = index_deta["Kumeler"].astype(int)
dataFrame = pd.DataFrame(data=bucket_deta["customer_id"], index=index_deta["Kumeler"])
dataFrame.rename(columns={"customer_id": "Total Customers"}).plot.bar(
    rot=70, title="RFM clustoring"
)
# dataFrame.plot.bar(rot=70, title="RFM clustoring");
plt.show(block=True);

In this step we will drop the Keyspaces to prevent future charges

[ ]:
deleteKeyspace = "DROP KEYSPACE IF EXISTS " + keyspaces_schema
dr = session.execute(deleteKeyspace)
time.sleep(5)
print(
    "Dropping %s keyspace.  It may take a few seconds to a minute to complete deletion of keyspace and table."
    % keyspaces_schema
)
[ ]: