Skip to content

Commit

Permalink
Adding new ML Example (#1)
Browse files Browse the repository at this point in the history
* Adding new ML Example

New Example

* Google Search

* Delete googleafb094859f465ed0 (1).html

* Google Search Index

* Update README.md

* Delete googleafb094859f465ed0.html

* Create googleafb094859f465ed0.html
  • Loading branch information
NoSQLGuy authored Oct 26, 2021
1 parent 0b007cf commit f61caa2
Show file tree
Hide file tree
Showing 5 changed files with 247 additions and 0 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,5 @@ request, please [review our contribution guide](./CONTRIBUTING.md).
## License

See [LICENSE](./LICENSE.txt)


1 change: 1 addition & 0 deletions googleafb094859f465ed0.html
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
google-site-verification: googleafb094859f465ed0.html
67 changes: 67 additions & 0 deletions python/train_mllib_model/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# Overview

An important task in ML is model selection, or using data to find the best model or parameters for a given task. This is also called tuning. Tuning may be done for individual Estimators such as LogisticRegression, or for entire Pipelines which include multiple algorithms, featurization, and other steps. Users can tune an entire Pipeline at once, rather than tuning each element in the Pipeline separately. This example scores customer profiles using a "Recency, Frequency, Monetary Value" (RFM) metric.

## Prerequisites

Before you begin:

* Ensure your tenant is configured according to the instructions to [setup admin](https://docs.cloud.oracle.com/en-us/iaas/data-flow/using/dfs_getting_started.htm#set_up_admin)
* Know your object store namespace.
* (Optional, strongly recommended): Install Spark to test your code locally before deploying.

## Load Required Data

Upload a sample CSV file to OCI object store.

## Application Setup

1. Recommended: run the sample locally to test it.
2. Upload the sample CSV file to object store
3. Upload ```train_mllib_model.py``` to an object store bucket.
4. Create a Python Data Flow application pointing to ```train_mllib_model.py```
4a. Refer [here](https://docs.cloud.oracle.com/en-us/iaas/data-flow/using/dfs_data_flow_library.htm#create_pyspark_app)

## Run the Application using OCI Cloud Shell or OCI CLI

### Deploy the application to Object Storage

Before deploying the application set the following.

BUCKET= ```Enter an OCI object storage bucket here```
COMPARTMENTID= ```Enter an OCI compartment here```
NAMESPACE=```Enter your oci namespace here```

```sh
oci os object put --bucket-name $BUCKET --file train_mllib_model.py
```

### Upload the training data

```sh
oci os object put --bucket-name $BUCKET --file moviestream_subset.csv
```

### Run the Application. Adjust shape size/count if you want

```sh
oci data-flow run submit \
--compartment-id $COMPARTMENTID \
--executor-shape VM.Standard2.1 \
--num-executors 2 \
--execute "oci://$BUCKET@$NAMESPACE/train_mllib_model.py --input oci://$BUCKET@$NAMESPACE/moviestream_subset.csv --output oci://$BUCKET@$NAMESPACE/scores.csv"
```

Make note of the OCID that is returned in the "id" field.

### Check the status

```sh
oci data-flow run get --run-id <ocid>
```

### When the lifecycle state of that run reaches SUCCEEDED, fetch the application output

```sh
oci data-flow run get-log --run-id <ocid> --name spark_application_stdout.log.gz --file -
```
Empty file.
177 changes: 177 additions & 0 deletions python/train_mllib_model/train_mllib_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
#!/usr/bin/env python3

# This example scores customer profiles using a "Recency, Frequency,
# Monetary Value" (RFM) metric.
#
# The data is
# 1. Augmented with continuous scores for each of these metrics.
# 2. These continuous variables are grouped into 4 clusters using a K-Means
# clustering algorithm.
# 3. Cluster identifiers are used to assign a score along each dimension.
# 4. Scores across each dimension are combined to make an aggregate score.
#
# This code was inspired by https://towardsdatascience.com/data-driven-growth-with-python-part-2-customer-segmentation-5c019d150444
#
# Data used in this example comes from Oracle's "Moviestream" dataset.

import argparse

from pyspark.sql import SparkSession
from pyspark.sql.functions import (
avg,
col,
expr,
max,
row_number,
)
from pyspark.sql.window import Window

from pyspark.ml import Pipeline
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler

parser = argparse.ArgumentParser()
parser.add_argument("--input", default="moviestream_subset.csv")
parser.add_argument("--output", default="customer_scores.csv")
args = parser.parse_args()

spark = SparkSession.builder.appName("rfm_segmentation").getOrCreate()

# Load our raw transaction data.
raw_df = (
spark.read.format("csv")
.load(args.input, header="true")
.withColumn("day", col("day").cast("date"))
)

# Compute continuous values for recency, frequency and monetary value.
customer_df = (
raw_df.groupBy("cust_id")
.agg({"day": "max", "actual_price": "sum", "movie_id": "count"})
.withColumn("recency", expr('datediff(to_date("2020-12-31"), `max(day)`)'))
.toDF(*"cust_id monetary frequency most_recent recency_days".split())
)

# MLlib requires variables to be in vectors, even if there is only
# one value. These code blocks take our RFM metrics and place them
# into vectors.
#
# This code does not run immediately, it runs during the ML Pipeline.
recency_assembler = VectorAssembler(
inputCols=["recency_days"],
outputCol="recency_feature",
handleInvalid="skip",
)
frequency_assembler = VectorAssembler(
inputCols=["frequency"],
outputCol="frequency_feature",
handleInvalid="skip",
)
monetary_assembler = VectorAssembler(
inputCols=["monetary"],
outputCol="monetary_feature",
handleInvalid="skip",
)

# Define our k-means models for Recency, Frequency and Monetary Value.
# We will cluster into a fixed 4 groups. As before these will not
# execute immediately but during the Pipeline.
recency_kmeans = KMeans(
featuresCol="recency_feature", predictionCol="recency_cluster"
).setK(4)
frequency_kmeans = KMeans(
featuresCol="frequency_feature", predictionCol="frequency_cluster"
).setK(4)
monetary_kmeans = KMeans(
featuresCol="monetary_feature", predictionCol="monetary_cluster"
).setK(4)

# Define the pipeline of all the steps we need in our ML process.
# First we run all assemblers, then we run all KMeans clustering
# processes.
#
# This Pipeline object behaves like an Estimator insofar as you can
# call .fit() on the Pipeline and it will call .fit() on the
# estimators within the pipeline.
#
# See https://spark.apache.org/docs/latest/ml-pipeline.html for more
# information on MLlib Pipelines.
pipeline = Pipeline(
stages=[
recency_assembler,
frequency_assembler,
monetary_assembler,
recency_kmeans,
frequency_kmeans,
monetary_kmeans,
]
)

# Fitting the pipeline gives us our ML model.
model = pipeline.fit(customer_df)

# Make predictions with our model. In MLlib this will add columns
# to our input DataFrame. We specified the new column names in the
# KMeans models, for example a column named "recency_cluster" will
# be added since we specified that in the "recency_kmeans" cluster.
predictions = model.transform(customer_df)

# Each customer is assigned a cluster ID from 1 to 4 for across
# each dimension. Unfortunately these cluster IDs are assigned
# randomly. We want the cluster representing "best" to be given
# a score of 4 and "worst" to have a score of 1.
#
# To do this we create mapping DataFrames that map cluster IDs
# to the desired score. We create one DataFrame for each of
# Recency, Frequency and Monetary.
recency_ranks = (
predictions.groupBy("recency_cluster")
.agg(max("recency_days").alias("max_recency_days"))
.withColumn(
"recency_score",
row_number().over(Window().orderBy(col("max_recency_days").desc())),
)
)
frequency_ranks = (
predictions.groupBy("frequency_cluster")
.agg(max("frequency").alias("max_frequency"))
.withColumn(
"frequency_score", row_number().over(Window().orderBy(col("max_frequency")))
)
)
monetary_ranks = (
predictions.groupBy("monetary_cluster")
.agg(max("monetary").alias("max_monetary"))
.withColumn(
"monetary_score",
row_number().over(Window().orderBy(col("max_monetary"))),
)
)

# Pare the DataFrame down to just the columns we care about.
target_columns = "cust_id monetary frequency most_recent recency_score frequency_score monetary_score".split()

# Compute our scored dataset. This code:
# 1. Maps the cluster scores to meaningful ranks.
# 2. Adds the 3 meaninful ranks into an aggregate score.
rfm = (
predictions.join(recency_ranks, "recency_cluster")
.join(frequency_ranks, "frequency_cluster")
.join(monetary_ranks, "monetary_cluster")
.select(target_columns)
.withColumn("score", expr("recency_score + frequency_score + monetary_score"))
)

# This line prevents re-computing the DataFrame when we show the count.
rfm.cache()

# Save the full scored dataset to CSV format.
rfm.write.mode("overwrite").option("header", True).csv(args.output)

# Print out a summary of what happened.
print(
"Scored {} rows with average score of {}".format(
rfm.count(), rfm.agg(avg("score")).first()[0]
)
)
print("Data written to {}".format(args.output))

0 comments on commit f61caa2

Please sign in to comment.