Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

spark-nlp in databricks writing to root s3 in cluster #14139

Open
1 task
kavyapraveen opened this issue Jan 18, 2024 · 1 comment
Open
1 task

spark-nlp in databricks writing to root s3 in cluster #14139

kavyapraveen opened this issue Jan 18, 2024 · 1 comment
Assignees
Labels

Comments

@kavyapraveen
Copy link

kavyapraveen commented Jan 18, 2024

Is there an existing issue for this?

  • I have searched the existing issues and did not find a match.

Who can help?

No response

What are you working on?

we are trying check senetence similarity between two files . here is the code we are using
documentAssembler = DocumentAssembler().setInputCol("text").setOutputCol("document")
sentence = SentenceDetector()
.setInputCols("document")
.setOutputCol("sentence")
.setExplodeSentences(False)

tokenizer = Tokenizer()
.setInputCols(['sentence'])
.setOutputCol('token')

bertEmbeddings = BertEmbeddings
.pretrained('bert_base_cased', 'en')
.setInputCols(["sentence",'token'])
.setOutputCol("bert")
.setCaseSensitive(False)
.setPoolingLayer(0)

embeddingsSentence = SentenceEmbeddings()
.setInputCols(["sentence", "bert"])
.setOutputCol("sentence_embeddings")
.setPoolingStrategy("AVERAGE")

embeddingsFinisher = EmbeddingsFinisher()
.setInputCols(["sentence_embeddings","bert"])
.setOutputCols("sentence_embeddings_vectors", "bert_vectors")
.setOutputAsVector(True)
.setCleanAnnotations(False)

explodeVectors = SQLTransformer()
.setStatement("SELECT EXPLODE(sentence_embeddings_vectors) AS features, * FROM THIS")

vectorNormalizer = Normalizer()
.setInputCol("features")
.setOutputCol("normFeatures")
.setP(1.0)

similartyChecker = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes", bucketLength=6.0,numHashTables=10)

pipeline = Pipeline().setStages([documentAssembler,
sentence,
tokenizer,
bertEmbeddings,
embeddingsSentence,
embeddingsFinisher,
explodeVectors,
vectorNormalizer,
similartyChecker])

pipelineModel = pipeline.fit(primaryCorpus)
primaryDF = pipelineModel.transform(primaryCorpus)
secondaryDF = pipelineModel.transform(secondaryCorpus)

dfA = primaryDF.select("text","features","normFeatures").withColumn("lookupKey", md5("text")).withColumn("id",monotonically_increasing_id())
dfB = secondaryDF.select("text","features","normFeatures").withColumn("id",monotonically_increasing_id())

pipelineModel.stages[8].approxSimilarityJoin(dfA, dfB, 100, distCol="distance")
.where(col("datasetA.id") == col("datasetB.id"))
.select(col("datasetA.text").alias("idA"),
col("datasetB.text").alias("idB"),
col("distance")).show()

using databricks version 13.2
imported spark-nlp and maven reporsitory

Current Behavior

Currently the packages are throwing error because they trying put call in root s3 bucket which is not supported .
Py4JJavaError: An error occurred while calling z:com.johnsnowlabs.nlp.pretrained.PythonResourceDownloader.getDownloadSize.
: java.lang.ExceptionInInitializerError

Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Access Denied; request: PUT https://***-prod-databricks-root.s3.us-east-1.amazonaws.com nvirginia-prod/2820278049549475/root/cache_pretrained/

Expected Behavior

package should not throw access denied .. or we need to specify where files could be written to

Steps To Reproduce

from pyspark.sql.types import StringType
#Spark NLP
import sparknlp
from sparknlp.pretrained import PretrainedPipeline
from sparknlp.annotator import *
from sparknlp.base import *

documentAssembler = DocumentAssembler().setInputCol("text").setOutputCol("document")
sentence = SentenceDetector()
.setInputCols("document")
.setOutputCol("sentence")
.setExplodeSentences(False)

tokenizer = Tokenizer()
.setInputCols(['sentence'])
.setOutputCol('token')

bertEmbeddings = BertEmbeddings
.pretrained('bert_base_cased', 'en')
.setInputCols(["sentence",'token'])
.setOutputCol("bert")
.setCaseSensitive(False)
.setPoolingLayer(0)

embeddingsSentence = SentenceEmbeddings()
.setInputCols(["sentence", "bert"])
.setOutputCol("sentence_embeddings")
.setPoolingStrategy("AVERAGE")

embeddingsFinisher = EmbeddingsFinisher()
.setInputCols(["sentence_embeddings","bert"])
.setOutputCols("sentence_embeddings_vectors", "bert_vectors")
.setOutputAsVector(True)
.setCleanAnnotations(False)

explodeVectors = SQLTransformer()
.setStatement("SELECT EXPLODE(sentence_embeddings_vectors) AS features, * FROM THIS")

vectorNormalizer = Normalizer()
.setInputCol("features")
.setOutputCol("normFeatures")
.setP(1.0)

similartyChecker = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes", bucketLength=6.0,numHashTables=10)

pipeline = Pipeline().setStages([documentAssembler,
sentence,
tokenizer,
bertEmbeddings,
embeddingsSentence,
embeddingsFinisher,
explodeVectors,
vectorNormalizer,
similartyChecker])

pipelineModel = pipeline.fit(primaryCorpus)
primaryDF = pipelineModel.transform(primaryCorpus)
secondaryDF = pipelineModel.transform(secondaryCorpus)

dfA = primaryDF.select("text","features","normFeatures").withColumn("lookupKey", md5("text")).withColumn("id",monotonically_increasing_id())
dfB = secondaryDF.select("text","features","normFeatures").withColumn("id",monotonically_increasing_id())

pipelineModel.stages[8].approxSimilarityJoin(dfA, dfB, 100, distCol="distance")
.where(col("datasetA.id") == col("datasetB.id"))
.select(col("datasetA.text").alias("idA"),
col("datasetB.text").alias("idB"),
col("distance")).show()

Spark NLP version and Apache Spark

spark - 3.4.0
com.johnsnowlabs.nlp:spark-nlp_2.12:5.2.2

Type of Spark Application

No response

Java Version

No response

Java Home Directory

No response

Setup and installation

No response

Operating System and Version

No response

Link to your project (if available)

No response

Additional Information

No response

@maziyarpanahi
Copy link
Member

Hi,

You can set any place you (your Spark app) have permission to read/write via Spark NLP Config: https://github.com/JohnSnowLabs/spark-nlp#spark-nlp-configuration

The config you need to set is cache_folder which by default it either points to a user's home directory, and if it doesn't exist it goes to the /root. But you can set this to a path that has full permission and it will download/load from there.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants