Skip to content

Commit

Permalink
add unity catalog suppot (#1230)
Browse files Browse the repository at this point in the history
* add unity catalog suppot

* fix closing bracket typo
  • Loading branch information
atangwbd committed Mar 26, 2024
1 parent 6847ef9 commit 45e44af
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ private[offline] class BatchDataLoader(ss: SparkSession, location: DataLocation,
}
df
} catch {
case _: Throwable =>
case e: Throwable =>
e.printStackTrace() // This prints the stack trace of the Throwable
try {
new AvroJsonDataLoader(ss, dataPath + "/data.avro.json").loadDataFrame()
} catch {
Expand All @@ -106,6 +107,7 @@ private[offline] class BatchDataLoader(ss: SparkSession, location: DataLocation,
ss.read.format("csv").option("header", "true").option("delimiter", csvDelimiterOption).load(dataPath)
} catch {
case e: Exception =>
e.printStackTrace()
// If data loading from source failed, retry it automatically, as it might due to data source still being written into.
log.info(s"Loading ${location} failed, retrying for ${retry}-th time..")
if (retry > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ object FileFormat {
// Snowflake type
val SNOWFLAKE = "SNOWFLAKE"

val UNITY_CATALOG = "UNITY_CATALOG"

private val AVRO_DATASOURCE = "avro"
// Use Spark native orc reader instead of hive-orc since Spark 2.3
private val ORC_DATASOURCE = "orc"
Expand All @@ -47,6 +49,7 @@ object FileFormat {
case p if p.endsWith(".avro") => AVRO
case p if p.startsWith("jdbc:") => JDBC
case p if p.startsWith("snowflake:") => SNOWFLAKE
case p if p.startsWith("unity:") => UNITY_CATALOG
case _ =>
// if we cannot tell the file format from the file extensions, we should read from `spark.feathr.inputFormat` to get the format that's sepcified by user.
if (ss.conf.get("spark.feathr.inputFormat","").nonEmpty) ss.conf.get("spark.feathr.inputFormat") else PATHLIST
Expand Down Expand Up @@ -85,6 +88,7 @@ object FileFormat {
case p if p.endsWith(".avro") => AVRO
case p if p.startsWith("jdbc:") => JDBC
case p if p.startsWith("snowflake:") => SNOWFLAKE
case p if p.startsWith("unity:") => UNITY_CATALOG
case _ =>
// if we cannot tell the file format from the file extensions, we should read from `spark.feathr.inputFormat` to get the format that's sepcified by user.
dataIOParameters.getOrElse(DATA_FORMAT, ss.conf.get("spark.feathr.inputFormat", AVRO)).toUpperCase
Expand All @@ -96,7 +100,7 @@ object FileFormat {
def loadHdfsDataFrame(format: String, existingHdfsPaths: Seq[String]): DataFrame = {

// Get csvDelimiterOption set with spark.feathr.inputFormat.csvOptions.sep and check if it is set properly (Only for CSV and TSV)
val csvDelimiterOption = checkDelimiterOption(ss.sqlContext.getConf("spark.feathr.inputFormat.csvOptions.sep", ","))
val csvDelimiterOption = checkDelimiterOption(ss.sqlContext.getConf("spark.feathr.inputFormat.csvOptions.sep", ","))

val df = format match {
case CSV =>
Expand All @@ -112,6 +116,11 @@ object FileFormat {
JdbcUtils.loadDataFrame(ss, existingHdfsPaths.head)
case SNOWFLAKE =>
SnowflakeUtils.loadDataFrame(ss, existingHdfsPaths.head)

case UNITY_CATALOG =>
val pathHead = existingHdfsPaths.head
val unityCatalogTable = pathHead.replaceFirst("unity:", "")
ss.table(unityCatalogTable)
case _ =>
// Allow dynamic config of the file format if users want to use one
if (ss.conf.getOption("spark.feathr.inputFormat").nonEmpty) ss.read.format(ss.conf.get("spark.feathr.inputFormat")).load(existingHdfsPaths: _*)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ private[offline] object AclCheckUtils {
// Check read authorization on a path string
def checkReadAuthorization(conf: Configuration, pathName: String): Try[Unit] = {
// no way to check jdbc auth yet
if (pathName.startsWith("jdbc:")) {
if (pathName.startsWith("jdbc:") || pathName.startsWith("unity:")) {
Success(())
} else {
val path = new Path(pathName)
Expand Down

0 comments on commit 45e44af

Please sign in to comment.