Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Slow/no progress with cascaded pandas udfs/mapInPandas in Databricks #10770

Open
eordentlich opened this issue May 7, 2024 · 2 comments
Assignees
Labels
bug Something isn't working

Comments

@eordentlich
Copy link
Contributor

Describe the bug
Successively applied Pandas UDFs and MapInPandas make no progress in Databricks.

Steps/Code to reproduce bug

import pyspark.sql.functions as F
import numpy as np
import pandas as pd
transformed_df = spark.range(1000000) 
from pyspark.sql.functions import pandas_udf

@pandas_udf("int")
def rand_label(col: pd.Series) -> pd.Series: 
  import logging
  logger = logging.getLogger('rand_label')
  logger.info("in rand label")
  print("in rand label 1")
  return pd.Series(np.random.randint(0,999,size=col.shape[0]))

@pandas_udf("int")
def rand_label2(col: pd.Series) -> pd.Series: 
  import logging
  logger = logging.getLogger('rand_label')
  logger.info("in rand label")
  print("in rand label 2")
  return pd.Series(np.random.randint(0,999,size=col.shape[0]))

transformed_df_w_label_2 = transformed_df.withColumn("label", rand_label(F.lit(0)))

The following then is problematic.

transformed_df = spark.read.parquet("s3a://spark-rapids-ml-bm-datasets-public/pca/1m_3k_singlecol_float32_50_files.parquet")
features_col = 'feature_array'
prediction_col = 'label'
centers = np.random.rand(1000,3000)
from pyspark.sql.types import StructType, StructField, DoubleType

sc = transformed_df.rdd.context
centers_bc = sc.broadcast(centers)

def partition_score_udf(
    pdf_iter
) :
    local_centers = centers_bc.value.astype(np.float64)
    partition_score = 0.0
    import logging
    logger = logging.getLogger('partition_score_udf')
    logger.info("in partition score udf")
    for pdf in pdf_iter:
        print("in partition score udf")
        input_vecs = np.array(list(pdf[features_col]), dtype=np.float64)
        predictions = list(pdf[prediction_col])
        center_vecs = local_centers[predictions, :]
        partition_score += np.sum((input_vecs - center_vecs) ** 2)
    yield pd.DataFrame({"partition_score": [partition_score]})

total_score = (
  # the below is extremely slow
  # if instead of transformed_df_w_label_2 we apply to transformed_df_w_label it runs fine
  # one difference is that transformed_df_ws_label_2 is itself the output of another pandas udf
  # so data for this case is passing back and forth between jvm and python workers multiple times
    transformed_df_w_label_2.mapInPandas(
        partition_score_udf,  # type: ignore
        StructType([StructField("partition_score", DoubleType(), True)]),
    )
    .agg(F.sum("partition_score").alias("total_score"))
    .toPandas()
)  # type: ignore
total_score = total_score["total_score"][0]  # type: ignore

In this case, at least in 13.3ML, the computation slows dramatically and may be deadlocked.

Expected behavior
No slowdowns, like with baseline Spark without the plugin.

Environment details (please complete the following information)

  • Environment location: Second example is slow only in Databricks 13.3ML
  • Spark configuration settings related to the issue
spark.task.resource.gpu.amount 1
spark.task.cpus 1
spark.databricks.delta.preview.enabled true
spark.python.worker.reuse true
spark.executorEnv.PYTHONPATH /databricks/jars/rapids-4-spark_2.12-24.04.0.jar:/databricks/spark/python
spark.sql.files.minPartitionNum 2
spark.sql.execution.arrow.maxRecordsPerBatch 10000
spark.executor.cores 8
spark.rapids.memory.gpu.minAllocFraction 0.0001
spark.plugins com.nvidia.spark.SQLPlugin
spark.locality.wait 0s
spark.sql.cache.serializer com.nvidia.spark.ParquetCachedBatchSerializer
spark.rapids.memory.gpu.pooling.enabled false
spark.rapids.sql.explain ALL
spark.sql.execution.sortBeforeRepartition false
spark.rapids.sql.python.gpu.enabled true
spark.rapids.memory.pinnedPool.size 2G
spark.python.daemon.module rapids.daemon_databricks
spark.rapids.sql.batchSizeBytes 512m
spark.sql.adaptive.enabled false
spark.sql.execution.arrow.pyspark.enabled true
spark.sql.files.maxPartitionBytes 2000000000000
spark.databricks.delta.optimizeWrite.enabled false
spark.rapids.sql.concurrentGpuTasks 2

Cluster shape: 2x workers with g5.2xlarge and driver with g4dn.xlarge
Additional context
Also, based on print statement output in the logs, the first udf appears to complete fully before the second one starts. The batches should flow through both python udfs incrementally as is the case with baseline Spark.

Might be related to: #10751

@eordentlich eordentlich added ? - Needs Triage Need team to review and classify bug Something isn't working labels May 7, 2024
@mattahrens mattahrens removed the ? - Needs Triage Need team to review and classify label May 7, 2024
@firestarman
Copy link
Collaborator

could u first try to increase the value of "concurrentGpuTask" to see if we can get any better perf ?

@eordentlich
Copy link
Contributor Author

The computation gets pretty much stuck with essentially no progress. I don't think that will make a difference. Partial stack trace after reaching this point (might be from similar but not identical example to this repro):

Details

at sun.misc.Unsafe.copyMemory(Native Method)
at sun.misc.Unsafe.copyMemory(Unsafe.java:560)
at java.nio.DirectByteBuffer.put(DirectByteBuffer.java:331)
at org.apache.spark.util.DirectByteBufferOutputStream.grow(DirectByteBufferOutputStream.scala:63)
at org.apache.spark.util.DirectByteBufferOutputStream.ensureCapacity(DirectByteBufferOutputStream.scala:49)
at org.apache.spark.util.DirectByteBufferOutputStream.write(DirectByteBufferOutputStream.scala:44)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
- locked <0x0000000768a99630> (a java.io.DataOutputStream)
at org.apache.spark.sql.rapids.execution.python.BufferToStreamWriter.$anonfun$handleBuffer$1(GpuArrowWriter.scala:48)
at org.apache.spark.sql.rapids.execution.python.BufferToStreamWriter.$anonfun$handleBuffer$1$adapted(GpuArrowWriter.scala:42)
at org.apache.spark.sql.rapids.execution.python.BufferToStreamWriter$$Lambda$3498/1244767780.apply(Unknown Source)
at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30)
at org.apache.spark.sql.rapids.execution.python.BufferToStreamWriter.handleBuffer(GpuArrowWriter.scala:42)
at ai.rapids.cudf.Table.writeArrowIPCArrowChunk(Native Method)
at ai.rapids.cudf.Table.access$2000(Table.java:41)
at ai.rapids.cudf.Table$ArrowIPCTableWriter.write(Table.java:1739)
at org.apache.spark.sql.rapids.execution.python.GpuArrowWriter.$anonfun$write$1(GpuArrowWriter.scala:99)
at org.apache.spark.sql.rapids.execution.python.GpuArrowWriter.$anonfun$write$1$adapted(GpuArrowWriter.scala:97)
at org.apache.spark.sql.rapids.execution.python.GpuArrowWriter$$Lambda$3493/108528776.apply(Unknown Source)
at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30)
at org.apache.spark.sql.rapids.execution.python.GpuArrowWriter.write(GpuArrowWriter.scala:97)
at org.apache.spark.sql.rapids.execution.python.GpuArrowWriter.write$(GpuArrowWriter.scala:96)
at org.apache.spark.sql.rapids.execution.python.GpuArrowPythonWriter.write(GpuArrowWriter.scala:144)
at org.apache.spark.sql.rapids.execution.python.GpuArrowWriter.$anonfun$writeAndClose$1(GpuArrowWriter.scala:93)
at org.apache.spark.sql.rapids.execution.python.GpuArrowWriter.$anonfun$writeAndClose$1$adapted(GpuArrowWriter.scala:92)
at org.apache.spark.sql.rapids.execution.python.GpuArrowWriter$$Lambda$3492/1674125626.apply(Unknown Source)
at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30)
at org.apache.spark.sql.rapids.execution.python.GpuArrowWriter.writeAndClose(GpuArrowWriter.scala:92)
at org.apache.spark.sql.rapids.execution.python.GpuArrowWriter.writeAndClose$(GpuArrowWriter.scala:92)
at org.apache.spark.sql.rapids.execution.python.GpuArrowPythonWriter.writeAndClose(GpuArrowWriter.scala:144)
at org.apache.spark.sql.rapids.execution.python.shims.GpuArrowPythonRunner$$anon$1.writeNextInputToStream(GpuArrowPythonRunner.scala:74)
at org.apache.spark.api.python.BasePythonRunner$ReaderInputStream.writeAdditionalInputToPythonWorker(PythonRunner.scala:931)
at org.apache.spark.api.python.BasePythonRunner$ReaderInputStream.read(PythonRunner.scala:851)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
- locked <0x0000000765602c48> (a java.io.BufferedInputStream)
at java.io.DataInputStream.readInt(DataInputStream.java:387)
at org.apache.spark.sql.rapids.execution.python.shims.GpuArrowPythonOutput$$anon$1.read(GpuArrowPythonOutput.scala:71)
at org.apache.spark.sql.rapids.execution.python.shims.GpuArrowPythonOutput$$anon$1.read(GpuArrowPythonOutput.scala:48)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:635)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at com.nvidia.spark.rapids.AbstractProjectSplitIterator.hasNext(basicPhysicalOperators.scala:233)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at com.nvidia.spark.rapids.GpuMergeAggregateIterator.$anonfun$next$2(GpuAggregateExec.scala:751)
at com.nvidia.spark.rapids.GpuMergeAggregateIterator$$Lambda$3706/165795055.apply(Unknown Source)```

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants