Luis Caballero Diaz's profile

Spark Machine Learning Streaming Prediction

This project focuses on developing a machine learning model for text classification in a distributed data system and perform predictions for new streaming data. For that purpose, the below modules from Spark framework are used. 

- Spark SQL and Spark Dataframes is used to operate with distributed data
- Spark MLlib is used to create the machine learning model
- Spark Structured Streaming is used to manage the prediction for new streaming data
- Kafka is used to manage the input data streams to be predicted

As reference, the work is done using a cluster from Google Cloud and PYSPARK, and the data used in the project is accessible here from Stanford university website (dataset "reviews_Sports_and_Outdoors_5.json.gz").

The project work is split into next sections.

SECTION 1: DISTRIBUTED DATA SYSTEM
SECTION 2: MACHINE LEARNING MODEL DEVELOPMENT
SECTION 3: NEW STREAMING DATA PREDICTION
SECTION 1: DISTRIBUTED DATA SYSTEM
This section focuses on providing a short introduction about distributed data systems.

A distributed data system or also known as cluster is basically a data storage network created by a group of computers being each one responsible for storing only a limited part of the whole data. The need of this approach is triggered due to the increase in available data, which is expected to keep a more pronounced trend in the coming years. 

Considering this big amount of available data, it is required a distributed framework to manage and operate the cluster containing the data of interest. There are two main distributed frameworks: Hadoop and Spark, but they are not interchangeable (one does not replace the other one) and both can operate together. 

Hadoop mainly consists on two components: 
- HDFS --> it is a distributed storage system
- Map Reduce --> it is an operation processing engine among nodes in a cluster

Instead, Spark focuses on the operation processing engine, being much more efficient than Map Reduce due to using RAM memory (unlike Map Reduce, which uses hard disk ROM memory). However, Apache Spark framework also provides additional components as Spark SQL, Spark MLlib or Spark Structured Streaming to provide development tools to be used into a wider number of applications.

When storing large volume of data in a distributed cluster, the data is split into smaller parts and each part is stored in a different node. As reference, HDFS creates a minimum of 3 replicas for each part to ensure a high level of availability even under a node failure. The general rule is to store only one replica of a part in the same node and maximum two replicas of a part in the same rack. For example, below picture depicts a cluster of 8 nodes consisting on 2 racks storing data split in four parts.
Spark architecture consists on working nodes also named executors or workers and one master node. It also includes a driver program to manage the Spark Session. The driver program might be embedded in a node in the cluster or in an external computer connected remotely to the Spark cluster. It is defined as client execution when the driver program is in an external computer, and defined as cluster execution when the driver program is part of an cluster node. 
The main operating principle in Spark is to operate in lazy mode, avoiding transferring data unless explicitly requested by an action to maximize code efficiency. In short, the system is recording the required transformations in the data, but they are not applied until really required. Moreover, it provides the "cache" method to store in memory a transformed dataframe if it will be further used, which avoids repeating the same transformations again and speeding up the code execution. 

Note the current project was performed under a simple cluster of three nodes (one master and two workers) created in Google Cloud and executed in client mode, using the below command code.

gcloud beta dataproc clusters create <clustername> --enable-component-gateway --bucket <bucketname> --region europe-west1 --zone europe-west1-c --master-machine-type n1-standard-2 --master-boot-disk-size 500 --num-workers 2 --worker-machine-type n1-standard-2 --worker-boot-disk-size 500 --image-version 2.0-debian10 --properties ^#^spark:spark.jars.repositories=https://repos.spark-packages.org/#spark:spark.jars.packages=graphframes:graphframes:0.8.2-spark3.1-s_2.12,org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.3 --optional-components JUPYTER,ZOOKEEPER --initialization-actions 'gs://goog-dataproc-initialization-actions-europe-west1/kafka/kafka.sh' --project <projectname>
SECTION 2: MACHINE LEARNING MODEL DEVELOPMENT
The exercise to model corresponds to a set of product reviews from Amazon, in which each one has the below information:

reviewerID - customer identifier
asin - product identifier
reviewerName - customer name
helpful - score to measure how helpful the review is
reviewText - review detailed text
overall - score between 1 and 5 provided by the customer
summary - review summary
unixReviewTime - instante en el que se creó esta opinión (expresado como unix time)
reviewTime - time in which the review was submitted

An example of a sample in the dataset is depicted below.
The developed model focuses on review sentiment classification, so the features of interest are overall, review text and summary. Specifically, the model uses as input feature the concatenation for review text and summary to try to predict the overall score.

Firstly, data is downloaded, unzip and store it in Google Cloud Storage.
Then, data is read using spark.read function indicating the folder where dataset is stored (Google Cloud Storage for this particular case) and to infer the schema. Prior to show the data, a new feature reviewTS is created by concatenating both review text and summary. 
To simplify the exercise, the reviews with neutral score (overall == 3) are excluded using below code. Thus, the review score will be 1, 2, 4 and 5 and 1 and 2 will be considered as negative (output class 0) and 4 and 5 will be considered as positive (output class 1).
Prior to start modeling, an exploratory data analysis is performed to understand the class distribution in the dataset. 

- There are almost 300k reviews with a mean value of 4.51, so it also shows an imbalanced trend. 
- Max value is 5 and min value is 1, as expected.
- There is a clear imbalanced since there are only 20k negative reviews among the almost 300k reviews.
Overall feature is transformed to binary using a Binarizer with 3 as threshold. The new column is called label and it will be the model prediction target.
The imbalanced is confirmed when applying a groupBy in the "label" column: >92% of the samples corresponds to positive reviews. Therefore, the metric to be used will be area under the ROC curve since accuracy might lead to wrong conclusions.
Next, data is split between train and test sets, but it is important to assign the same output class distribution applies to both sets. Note when working with imbalanced datasets might happen the unfortunate scenario in which one set only contains a very limited samples (or does not contain at all) for the minority class. The method randomSplit from Pyspark already makes a good job in stratifying the output class, but the methods sampleBy + exceptAll might be used in case of pursuing a perfect stratification.
The input review text is tokenized and vectorized using a bag of words approach based on a 20000 vocabulary size, so it will focus only on the 20000 words more common among the corpus from training set. It also removes stopwords, which are very common words in the language but lacking from special meaning on its own such as "the", "and", "but"...
The machine learning model to use in the present example is Logistic Regression since linear models tends to work fast with acceptable performance under sparse matrix features as a bag of words. Note the regulator parameter is recommended to be >0 to avoid overfitting.
The whole process is captured in a pipeline, which is fit with the training data.
Final step for model creation is the validation. For that purpose, an evaluator is created to measure the area under the ROC curve for both train and test sets. Both train and test sets reach an AUC around 55%, which is very small and model cannot be accepted due to the lack of learning power. The imbalanced dataset might cause a bias in the model tending to predict almost all samples as positives (majority class), so next step is to balance data and repeat the simulation.
BALANCING DATA
To balance the classes and considering the high amount of samples for the present dataset, the action is to apply random under sampling technique to reduce the majority class to only 10% of the total samples. Note it might also apply more sophisticated under sampling techniques as cluster centroids or tomek links to decide the majority class samples to discard.
With the new balanced data, the stratified split between train and test sets is applied again, reaching balanced train and test sets. 
When the model trained with balanced data is reassessed, the results are much better reaching an area under the ROC curve of 84% for both train and test sets. That validates the model and allows moving to the next design stage. 
As an additional exercise, the most significant words for both positive and negative reviews can be calculated from the logistic regression weights.
As reference, the words with more negative coefficients are waste, returned, poor, disappointing, useless... and the words with more positive coefficients are great, perfect, easy, excellent... Both lists are clearly referred to positive and negative feelings accordingly, and so it is demonstrated the good work done by the model.
NOTE: in case of pursuing a maximum model performance, a cross validation assessment might be run across a parametrization grid, using below code as reference.
SECTION 3: NEW STREAMING DATA PREDICTION
This section focuses on using the developed model in previous section to predict new streaming data. 

There are mainly the next two main approaches to predict streaming data.

1. PREDICT WITH GENERATED FILES
This approach requires an external process to create input files in a folder (for example, an HDFS folder in the cluster). Then, a routine is executed to check automatically new files in the corresponding folder and predict the new data in the files. A sketch for this approach is depicted next, but it is not very common in real production.
2. PREDICT WITH DATAFRAME STREAMING UPDATE
This approach requires an external framework to receive the new data to predict and be included as new samples in the dataframe under operation. Thus, the new samples will be predicted in the next dataframe prediction execution.
This approach is more popular in professional environments since it might use multiple data streams. In the current exercise, the framework Apache Kafka is used to manage the interface with Spark Structured Streaming, as depicted below. However, instead of having real data streams, the new samples are introduced in the Kafka console manually to emulate the data streams.
Firstly, a Kafka topic needs to be created with below command code in the command line for the master node in the cluster. The topic is named "reviews".

/usr/lib/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --replication-factor 1 --partitions 1 --topic reviews

And then launching a Kafka console producer for this topic.

/usr/lib/kafka/bin/kafka-console-producer.sh --broker-list <clustername>-w-0:9092 --topic review
Secondly, cluster must be configured to subscribe to the created Kafka topic and launch predictions when new streaming data is provided.
The streaming data will be included in the field "value", and so it is needed to accommodate the proper structure in that field according the expected streaming data. Note it cannot be inferred and it is assigned as binary by default.
In this exercise to emulate data streams with Kafka console, the streaming data structure is customizable. For simplicity, the streaming data will come in JSON structure type with a key "reviewTS" and a string value corresponding to the review. An example for the streaming data is as follows.

{"reviewTS": "I am happy with the purchase, it might be better, but it makes the expected work"}

Therefore, the value column is casted to string type. Then, the JSON information in the value field is transformed and stored in a new structured column called "reviewTS", which is finally updated with the information in the streaming message under the reviewTS key. In short, this process basically adds a new column "reviewTS", which is needed for the developed model to make the proper transformations, with the string message introduced in the Kafka console.
Once the streaming dataframe is properly configured, it must be transformed with the pipeline model and initiate the streaming writing process using start method under the query name predictions. It is defined to append the new predictions and save them in memory (note that new predictions in a real application would be saved in a new database, not in memory). To check the new predictions, it is required to refresh the temporary predictions dataframe. In the first below update, the table is empty because no streaming data was sent yet. 
SENDING NEW STREAMING DATA
The next negative review is introduced in the Kafka console:
{"reviewTS": "This was the worst purchase I did, it broke very fast, it is useless!"}

After introducing the review, the below information is reached when updating the predicted dataframe. The prediction for the review is a negative class (0), so model performed well.
Now the introduced review in the Kafka console is a positive one as follows:
{"reviewTS": "I am happy with the purchase, it might be better, but it makes the expected work"}

When updating the predicted dataframe, it has appended a new review prediction (that is defined in the outputMode). The model predicted for this new review a positive class (1), so model also performed well.
I appreciate your attention and I hope you find this work interesting.

Luis Caballero
Spark Machine Learning Streaming Prediction
Published:

Spark Machine Learning Streaming Prediction

Published:

Creative Fields