Skip to content

This repositories contains my thesis project for the Master Degree in Computer Engineering.

Notifications You must be signed in to change notification settings

LorenzoPiazza/HeraSDG-BigDataAnalyticsPipeline

Repository files navigation

HeraSDG-BigDataAnalyticsPipeline

This repository contains my thesis project for the Master Degree in Computer Engineering.
Here it is the documentation to reproduce the project. Have fun!
Description:
The project consist in a Big Data Analytics Pipeline that could be use to collect, store and analyze all the data produced by Hera Smart Community.

1. Setup the Kubernetes cluster

Firstly you have to create a K8s cluster on which deploy the Big Data Analytics pipeline.
You can either choose to set up a local environment with a single virtual node cluster, or set up a real cluster.

2. Install Helm

Now you have to install Helm, a package manager for K8s. It helps to deploy software on K8s.

You can follow this installation guide.

Then, you have to configure kubectl to talk to the right cluster. Helm, infact, will refer to the current kubectl context settings.

Note:
If you installed minikube and you have only that cluster, kubectl is already configured.
Otherwise, if you have more than one K8s cluster installed, you should configure kubectl to talk to the right cluster:

If you haven't already, follow step 4, 5, 6, 7 of Configure your laptop to act as an external cluster workstation


The initial setup ends here. You can start to deploy the pipeline components!

3. Deploy HDFS on cluster (using Helm)

The helm chart that I used deploys an HDFS 3.2.1 cluster with a namenode and 3 datanodes.
The replica factor I set is 3, and the block-size is 128Mb.

  • Firstly, add the gaffer/ Helm repository to your local repository list:
    helm repo add gaffer https://gchq.github.io/gaffer-docker

  • Then, deploy a gaffer/hdfs release on the cluster, providing the custom value in the file /HDFS/my-hdfs-values.yaml:
    helm install -f ./HDFS/my-hdfs-values.yaml my-hdfs gaffer/hdfs --version 0.10.0

  • If you want, you can create a port-forward to access the hdfs manager UI:
    kubectl port-forward -n default svc/my-hdfs-namenodes 9870:9870

    Then open the ui in your browser:

    open http://localhost:9870

Debugging: You can use the hdfs-shell Pod to execute some useful commands on the HDFS deployed.

POSSIBLE ERRORS:
After an unexpected stop of the HDFS connector (e.g. after a computer freeze) it can happen that the connector doesn't restart correctly and print this ERROR on the log:

ERROR Recovery failed at state RECOVERY_PARTITION_PAUSED (io.confluent.connect.hdfs3.TopicPartitionWriter:273)
org.apache.kafka.connect.errors.DataException: Error creating writer for log file hdfs://my-hdfs-namenodes:8020//tmp/utenti/0/log
.
.
.
Caused by: org.apache.hadoop.hdfs.CannotObtainBlockLengthException: Cannot obtain block length for LocatedBlock ... of <file>.

It happens because there is a file still in written state, not closed correctly because the previous connectors has stopped.

You can solve as follows:
You can use fsck (Filesystem check) to run a DFS filesystem checking utility on a particular directory in order to check if there are some openwrite file:
hdfs fsck / or hdfs fsck / -openforwrite
If so, require the namenode to recover the lease for that file:
hdfs debug recoverLease -path /tmp/premi/0/log

4. Deploy Kafka on cluster (using Helm)

  • Firstly, add the bitnami/ Helm repository to your local repository list:
    helm repo add bitnami https://charts.bitnami.com/bitnami

  • Then, deploy a bitnami/kafka release on the cluster, providing the custom values in the file /Kafka/my-kakfa-values.yaml:
    helm install -f ./Kafka/my-kafka-values.yaml my-kafka bitnami/kafka

    Setup the connection between the data_source (external to the cluster) and Kafka (internal to the cluster):

    Kafka is reachable using a K8s NodePort Service that expose the port 30001 on all the node of the cluster.
    This port is configured to proxy the incoming connections to the my-kafka-0-external service on its 9094 port. Finally, the service will forward the connection to the my-kafka-0 pod, which is the Kafka broker.
    However the minikube node has a internal IP which is not reachable from the laptop where the data_source.py is executed. To resolve this issue a possible solution is:

    1. Open a console and run the following commands:
      minikube service my-kafka-0-external --url
    2. Open a second console and run:
      kubectl port-forward svc/my-kafka-0-external 30001:9094
    3. Now open a third console and execute the data_source.py:
      python data_source.py <bootstrap-server:port> <topic>
      where bootstrap-server is the url returned from minikube service command and topic the topic on which we want to publish data.

    Keep in mind that the first connection to the bootstrap server will return to the producer some metadata with the addresses on which the broker (or the brokers) could be reached. These metadata are called advertisedListeners and are configurable on the server.properties file on the kafka broker. The advertised listener returned to the client is then used to establish the connection on which send the data.
    In my configuration, the advertised listener returned is 127.0.0.1:30001. The host address is configurable in the values .Values.externalAccess.service.domain of the my-kafka-values.yaml. It will determine how the scripts-configmap will be templated and how the server.properties will be written.
    For more details consult the scripts-configmap.yaml source file of the bitnami/kafka helm chart.

    Launch a Kafka Debugger Pod with Kafkacat (optional):

    1. Launch then Pod:
      kubectl run kafkacat-debugger --image=edenhill/kafkacat:1.6.0 --restart=Never --command -- sleep 999d
    2. Open a shell in the Pod and use the Kafkacat tool.
      For example, to inspect the metadata returned to the client:
      kafkacat -b <bootstrap-server:port> -t <topic> -L

NOTE: The Confluent HDFS3 Sink Connector plugin has a commercial license with a 30d free-use.

In my-kafka-values.yaml file, there is an array field called extraDeploys. It defines some extra K8s resources that are deployed with Kafka release when you execute the command helm install of the previuos section.
In particular it deploys:

  • The connector as a Deployment.
  • A service to expose it.
  • A config map called my-kafka-connect-config with the configuration file.
  • A config map called my-kafka-connect-script with the code to launch the worker and create the HDFS connector.

The connector expose some REST APIs for many purpose (debugging, create/pause/restart connectors or tasks, list the configs, etc.). Some example that you can execute inside the Connector container are:

  • Get the status of hdfs3-sink connector and its tasks:
    curl localhost:8083/connectors/hdfs3-sink/status | python -m json.tool
  • Restart the task with id 0 of the hdfs3-sink connector (there is no output if the command is successful):
    curl -X POST localhost:8083/connectors/hdfs3-sink/tasks/0/restart

Enable Horizontal Scaling
It is possible to monitor the resource usage of the Connector Deployment and command an appropriate automatic scale up-down.
To this goal you have to:

  1. Enable the Metrics API Server:

    In Minikube set-up: minikube addons enable metrics-server
    In real cluster set-up: kubectl apply -f ./metrics-api-server.yaml

  2. Create an Horizontal Pod Autoscaler:

    kubectl autoscale deployment my-kafka-connect --min=1 --max=3 --cpu-percent=80

5. Deploy the ML-Frontend equipped with Spark component on cluster (using Helm)

In this section we deploy a Jupyter release that is equipped with PySpark 3.1.1. This release uses the jupyter/pyspark-notebook image and will create a Stateful Set, of one Pod, that contains the frontend notebook.
This notebook is configured to run Spark on Kubernetes in client mode. It means that, when the user require the Spark Context creation, the desired # of executor are created (in Pods) and the Spark Driver is launched in the same Pod of the notebook.
The executor use the lorenzopiazza/hera_sdg:spark-py_3.1.1-python3.8 image, a custom image that I create from the Pyspark 3.1.1 image and make available on my Docker Hub.

  • Firstly, add the gradiant/ Helm repository to your local repository list:
    helm repo add gradiant https://gradiant.github.io/charts/
  • Then, deploy a gradiant/jupyter release on the cluster, providing the custom values in the file /ML-Frontend/my-jupyter-values.yaml:
    helm install my-jupyter gradiant/jupyter --version 0.1.6 -f ./ML-Frontend/my-jupyter-values.yaml

Using the gitNotebooks value, you can custom the release with an init Container that download an entire Github repo (with your custom notebooks) and make them available inside the Pod.

  • For driver-executor communication purpose you have also to create an headless service that refers the Frontend Pod where the Spark Driver execute:
    kubectl apply -f ./ML-Frontend/jupyter-headless-svc.yaml

  • Running Spark on K8s in client mode means the Spark driver run inside the my-jupyter-jupyter-0 Pod (the executors). So you have to grant this Pod the privileges to create and delete Pod.
    my-jupyter-jupyter-0 is already associated to default:default service account. So, bind that service account to the clusterrole=edit:
    kubectl create clusterrolebinding spark-role --clusterrole=edit --serviceaccount=default:default --namespace=default

How to access the frontend

  1. Get access token from jupyter server log:
    kubectl logs -f -n default svc/my-jupyter-jupyter

  2. Create a port-forward to the jupyter:
    kubectl port-forward -n default svc/my-jupyter-jupyter 8888:8888

  3. Then open the ui in your browser and use the access token:
    open http://localhost:88888

If you set up your own password, remember to restart jupyter server to update the configuration. File -> Shut Down

  1. Upload the ./ML-Frontend/notebooks/HeraSDG.ipynb file.
  2. Grant access to the Spark UI:
    kubectl port-forward -n default my-jupyter-jupyter-0 4040:4040

Delete/Uninstall a Helm release

You can see all the release deployed with the command:
helm list
Then you can choose to uninstall one of them with the command:
helm delete <release-name>
The command removes all the Kubernetes components associated with the chart and deletes the release, but doesn't delete the PVs and PVCs.

Useful Commands

  • Get Kubernetes available objects
    kubectl get <nodes/pods/svc/sts/deployment/cm/pv/pvc>
  • Describe a specific pod
    kubectl describe pod <pod-name>
  • Consult a Pod logs
    kubectl logs <pod-name>
  • Run a command inside a Pod
    kubectl exec --stdin --tty <pod-name> -- /bin/bash
  • Delete all Pods with a specific label
    kubectl delete pods -l spark-role=executor
  • Monitor resource usage
    kubectl top <nodes/pods>


Author: Lorenzo Piazza

Releases

No releases published

Packages

No packages published

Languages