Skip to content

🐻🎷 Scalable Machine Learning (Recommender System) with Apache Beam, Google Dataflow and TensorFlow

Notifications You must be signed in to change notification settings

AdrianHsu/reco-sys-on-gcp

Repository files navigation

Deep Dive Into Production ML workflow for Recommender System on GCP

🐻🎷 Scalable Collaborative Filtering End-to-end Model deployed on Google Cloud (GCS, Dataflow, Apache Beam, AI Platform)

License

MIT License

Project Report

Link

System Requirement

  • Python 3.6
  • tensorflow 1.15.0
  • tensorflow-transform 0.21.2
  • pache-beam[gcp] 2.17.0

Billable Google Cloud Services

  • ML Framework: Tensorflow
  • Data Processing Unified Model: Apache Beam
  • Data Pipelines, DAG analytics service: Google Dataflow
  • ML back-end: Google AI Platform
  • Database: Google Cloud Storage
  • Dataset: MovieLens 100k, MovieLens 25M

GCP setup

  • project name: reco-sys-capstone
  • GCS bucket name: ahsu-movielens
  • region: us-central1
  • AI platform job format: "ahsu_movielens_$(date +%Y%m%d_%H%M%S)"

How-to

$ # set up google cloud CLI tools
$ ./run-cloud.sh

Directory Tree

(ahsu) adrianhsu:~/Desktop/reco-sys-on-gcp (main)
$ tree
.
β”œβ”€β”€ README.md
β”œβ”€β”€ data-extractor.py
β”œβ”€β”€ fig/
β”œβ”€β”€ gitpush.sh
β”œβ”€β”€ loss.txt
β”œβ”€β”€ movielens.zip
β”œβ”€β”€ plot.py
β”œβ”€β”€ preprocess.py
β”œβ”€β”€ run-cloud.sh
β”œβ”€β”€ setup.py
β”œβ”€β”€ config.yaml
└── trainer
    β”œβ”€β”€ __init__.py
    └── task.py

Google Dataflow DAG

Figure

Step 1: read data and process them

    dataset = (
      p
      | 'Read from GCS' >> beam.io.ReadFromText(os.path.join(data_dir, FILENAME), skip_header_lines = SKIP_HEADER)
      | 'Trim spaces' >> beam.Map(lambda x: x.split(",") if SPLIT_CHAR else x.split())
      | 'Format to dict' >> beam.Map(lambda x: {"user": x[0], "item": x[1], "rating": x[2]})
      | 'Shift by 1' >> beam.Map(shift_by_one)
    )

Step 2: Data to TFExample + Split data into Train, Test set

    train_dataset, eval_dataset = (
      dataset
      | 'DataToTfExample' >> beam.ParDo(data_to_tfexample)
      | 'SerializeProto' >> beam.Map(lambda x: x.SerializeToString())
      | 'Split dataset' >> beam.Partition(partitioning, 2, eval_percent) # train, eval -> 2
    ) 

Step 3: Write to GCS as TF Record Format

    train_dataset_prefix = os.path.join(train_dataset_dir, 'part')
    _ = (
      train_dataset
      | 'Write train dataset' >> tfrecordio.WriteToTFRecord(train_dataset_prefix)
      # | 'Write train dataset' >> beam.io.textio.WriteToText(train_dataset_prefix)
    )

    eval_dataset_prefix = os.path.join(eval_dataset_dir, 'part')
    _ = ( 
      eval_dataset
      | 'Write eval dataset' >> tfrecordio.WriteToTFRecord(eval_dataset_prefix)
      # | 'Write eval dataset' >> beam.io.textio.WriteToText(eval_dataset_prefix)
    )

Model performance: MLE Loss function

Cloud Resources Utilization

ahsu_movielens_20201026_171017

  • Dataset: MovieLens 100k

  • Machine Type: n1-standard-4

  • Virtual CPUs: 4

  • Memory (GB): 15

  • Only 1 master node

  • Execution Time: 6 min 26 sec

  • Consumed ML units: 0.06

ahsu_movielens_20201026_215825

  • Dataset: MovieLens 25M

  • Machine Type: n1-highmem-16

  • Virtual CPUs: 16

  • Memory (GB): 104

  • Only 1 master node

  • Execution Time: 3 hr 4 min

  • Consumed ML units: 5.76

2 workers: ahsu_movielens_20201029_235922

  • Dataset: MovieLens 100k

  • Machine Type: n1-standard-4

  • Virtual CPUs: 4

  • Memory (GB): 15

  • 1 master node, 2 worker nodes

  • Execution Time: 5 min 31 sec

  • Consumed ML units: 0.1

4 workers: ahsu_movielens_20201029_235935

  • Dataset: MovieLens 100k

  • Machine Type: n1-standard-4

  • Virtual CPUs: 4

  • Memory (GB): 15

  • 1 master node, 4 worker nodes

  • Execution Time: 5 min 18 sec

  • Consumed ML units: 0.32

2 workers: ahsu_movielens_20201030_000018

  • Dataset: MovieLens 25M

  • Machine Type: n1-highmem-16

  • Virtual CPUs: 16

  • Memory (GB): 104

  • 1 master node, 2 worker nodes

  • Execution Time: 2 hr 57 min

  • Consumed ML units: 15.53

4 workers: ahsu_movielens_20201030_000729

  • Dataset: MovieLens 25M
  • Machine Type: n1-highmem-16
  • Virtual CPUs: 16
  • Memory (GB): 104
  • 1 master node, 4 worker nodes
  • Execution Time: 2 hr 26 min
  • Consumed ML units: 22.68

References

[1] Herlocker, J., Konstan, J., Borchers, A., Riedl, J.. An Algorithmic Framework for Performing Collaborative Filtering. In Proceedings of the Conference on Research and Development in Information Retrieval. 1999.

http://files.grouplens.org/papers/algs.pdf

[2] H. Robbins et al., A Stochastic Approximation Method, 1951

https://projecteuclid.org/download/pdf_1/euclid.aoms/1177729586

[3] LΒ΄eon Bottou, Large-Scale Machine Learning with Stochastic Gradient Descent, COMPSTAT'2010

https://leon.bottou.org/publications/pdf/compstat-2010.pdf

[4] Google Cloud | Large-Scale Recommendation Systems - What and Why?

https://developers.google.com/machine-learning/recommendation/collaborative/basics

[5] Google Cloud | Machine Learning with Apache Beam and TensorFlow (Molecules Walkthrough)

https://cloud.google.com/dataflow/docs/samples/molecules-walkthrough

[6] Apache Beam: An advanced unified programming model

https://beam.apache.org/

[7] Google Dataflow: Unified stream and batch data processing that's serverless, fast, and cost-effective

https://cloud.google.com/dataflow

[8] MartΔ±n Abadi et al, TensorFlow: Large-Scale Machine Learning on Heterogeneous Distributed Systems, 2015

http://download.tensorflow.org/paper/whitepaper2015.pdf

[9] Tyler Akidau et al, The Dataflow Model: A Practical Approach to Balancing Correctness, Latency, and Cost in Massive-Scale, Unbounded, Out-of-Order Data Processing

https://www.vldb.org/pvldb/vol8/p1792-Akidau.pdf

About

🐻🎷 Scalable Machine Learning (Recommender System) with Apache Beam, Google Dataflow and TensorFlow

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published