Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NER pipeline Not scaling up to use the full cluster nodes #14121

Open
1 task done
mahmoudaymo opened this issue Jan 3, 2024 · 3 comments
Open
1 task done

NER pipeline Not scaling up to use the full cluster nodes #14121

mahmoudaymo opened this issue Jan 3, 2024 · 3 comments
Assignees
Labels

Comments

@mahmoudaymo
Copy link

mahmoudaymo commented Jan 3, 2024

Is there an existing issue for this?

  • I have searched the existing issues and did not find a match.

Who can help?

No response

What are you working on?

I am using the spark-nlp for NER detection on Azure databricks cluster. The cluster is made of 5 nodes. But when running the job it is not scaling up to use the full cluster and uses only a single node. It seems that the NER pipeline does not parallelize and only runs on a single node.

Current Behavior

The NER pipeline uses only one node of the available 5 nodes.
CpuUsage

Expected Behavior

The expected behavior is to fully run on all the worker nodes.
CpuUsage2

Steps To Reproduce

from sparknlp.base import *
from sparknlp.annotator import *
from pyspark.sql import DataFrame
from pyspark.ml import Pipeline
from pyspark.sql.functions import array_distinct, col, concat_ws, size, expr


def extract_and_replace_ner(
    ner_df: DataFrame,
    ner_model_name: str = "bert_base_token_classifier_ontonote",
):
    if "text" not in ner_df.columns:
        raise ValueError("NER DataFrame must contain a column named 'text'!")

    ner_df = ner_df.withColumnRenamed("text", "text_original")
    entities = ["PERSON", "LAW", "NORP", "FAC", "ORG", "GPE", "LOC", "PRODUCT"]

    documentAssembler = DocumentAssembler().setInputCol("text_original").setOutputCol("document")
    tokenizer = Tokenizer().setInputCols(["document"]).setOutputCol("token")
    tokenClassifier = (
        BertForTokenClassification.pretrained(ner_model_name, "en")
        .setInputCols("document", "token")
        .setOutputCol("ner")
    )
    ner_converter = (
        NerConverter()
        .setInputCols(["document", "token", "ner"])
        .setOutputCol("ner_chunk")
        .setWhiteList(entities)
    )
    ner_pipeline = Pipeline(stages=[documentAssembler, tokenizer, tokenClassifier, ner_converter])
    ner_df = ner_pipeline.fit(ner_df).transform(ner_df)

    # keep only unique ners
    ner_df, no_ner_df = extract_unique_ners(ner_df)

    # replace ners in text
    ner_df = ner_df.rdd.mapPartitions(replace_text).toDF(
        ["control_number", "text_original", "text", "ners"]
    )

    return ner_df, no_ner_df


def extract_unique_ners(ner_df: DataFrame):
    ner_df = ner_df.withColumn("ners", ner_df.ner_chunk.result)
    ner_df = ner_df.select("control_number", "text_original", "ners")
    ner_df = ner_df.withColumn("ners", array_distinct("ners"))
    ner_df = ner_df.withColumn(
        "ners", expr("filter(ners, x -> (size(split(x, ' ')) > 1))")
    )

    no_ner_df = ner_df.filter(size("ners") == 0)
    ner_df = ner_df.filter(size("ners") > 0)

    ner_df = ner_df.withColumn(
        "replacements", expr("transform(ners, x -> concat_ws('_', split(x, ' ')))")
    )

    return ner_df, no_ner_df


def replace_text(partitionData):
    updatedData = []
    for row in partitionData:
        text = row.text_original
        ners = row.ners
        reps = row.replacements
        for ner, rep in zip(ners, reps):
            text = text.replace(ner, rep)
        updatedData.append([row.control_number, row.text_original, text, ners])
    return iter(updatedData)

Spark NLP version and Apache Spark

spark-nlp==5.1.4
spark==3.4.1
com.johnsnowlabs.nlp:spark-nlp_2.12:5.1.4

Working on Databricks

Type of Spark Application

Python Application

Java Version

8

Java Home Directory

/usr/lib/jvm/zulu8-ca-amd64/jre/

Setup and installation

Pypi

Operating System and Version

No response

Link to your project (if available)

No response

Additional Information

No response

@mahmoudaymo
Copy link
Author

Any news here?

@maziyarpanahi
Copy link
Member

I recommend watching this Webinar, scaling Apache Spark is independent from Spark NLP. You should follow the general "tuning and sizing your cluster" advice in order to utilize all your executors.

https://www.johnsnowlabs.com/watch-webinar-speed-optimization-benchmarks-in-spark-nlp-3-making-the-most-of-modern-hardware/

Since Spark NLP is a native extension of Apache Spark, any recommendation works for this library as well.

@mahmoudaymo
Copy link
Author

mahmoudaymo commented Feb 5, 2024

Great! Thank you very much Maziyar.

I have watched the webinar and came out with great insights. The issue here is not the speed optimization but why sparknlp NER pipeline is not fully utilizing the cluster and using only one worker? Once this issue is solved, I will work on optimizing the spark application.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants