Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lineage missing in purview #194

Open
Kishor-Radhakrishnan opened this issue Apr 6, 2023 · 10 comments
Open

Lineage missing in purview #194

Kishor-Radhakrishnan opened this issue Apr 6, 2023 · 10 comments
Labels
bug Something isn't working vnext Resolved in Next Release

Comments

@Kishor-Radhakrishnan
Copy link

We have many cases where lineage is missing in purview. We will keep this issue and update logs to investigate.

23/04/05 08:27:02 ERROR EventEmitter: Could not emit lineage w/ exception java.net.SocketTimeoutException: Read timed out at java.net.SocketInputStream.socketRead0(Native Method) at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) at java.net.SocketInputStream.read(SocketInputStream.java:171) at java.net.SocketInputStream.read(SocketInputStream.java:141) at sun.security.ssl.SSLSocketInputRecord.read(SSLSocketInputRecord.java:476) at sun.security.ssl.SSLSocketInputRecord.readHeader(SSLSocketInputRecord.java:470) at sun.security.ssl.SSLSocketInputRecord.bytesInCompletePacket(SSLSocketInputRecord.java:70) at sun.security.ssl.SSLSocketImpl.readApplicationRecord(SSLSocketImpl.java:1369) at sun.security.ssl.SSLSocketImpl.access$300(SSLSocketImpl.java:73) at sun.security.ssl.SSLSocketImpl$AppInputStream.read(SSLSocketImpl.java:978) at io.openlineage.spark.shaded.org.apache.http.impl.io.SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:137) at io.openlineage.spark.shaded.org.apache.http.impl.io.SessionInputBufferImpl.fillBuffer(SessionInputBufferImpl.java:153) at io.openlineage.spark.shaded.org.apache.http.impl.io.SessionInputBufferImpl.readLine(SessionInputBufferImpl.java:280) at io.openlineage.spark.shaded.org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:138) at io.openlineage.spark.shaded.org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:56) at io.openlineage.spark.shaded.org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.jav [log4j-active (7).txt](https://github.com/microsoft/Purview-ADB-Lineage-Solution-Accelerator/files/11166751/log4j-active.7.txt)

log4j-active (8).txt
Attached two logs from databricks with errors. Please do investigate

@Kishor-Radhakrishnan Kishor-Radhakrishnan added the bug Something isn't working label Apr 6, 2023
@Kishor-Radhakrishnan
Copy link
Author

log4j-active (7).txt

@hmoazam
Copy link
Contributor

hmoazam commented Apr 8, 2023

Hi @Kishor-Radhakrishnan, was this also fixed by the changes you made to fix #193, or do you still need us to look into it?

@Kishor-Radhakrishnan
Copy link
Author

@hmoazam Yes, As this error is different and we are seeing it frequent in DBR 10.4 LTS

23/04/05 08:27:02 ERROR EventEmitter: Could not emit lineage w/ exception java.net.SocketTimeoutException: Read timed out at java.net.SocketInputStream.socketRead0(Native Method) at

@hmoazam
Copy link
Contributor

hmoazam commented Apr 16, 2023

Hi Kishor, could you please share what data sources you're reading from and writing to when you're getting this error?

@wjohnson
Copy link
Collaborator

@Kishor-Radhakrishnan would you please try this updated branch to help us collect more information on this issue?

https://github.com/microsoft/Purview-ADB-Lineage-Solution-Accelerator/tree/hotfix/maxQueryPlanOLIn

If you could build and install this version and then run a notebook that is failing to send lineage even after our last fix.

If you could please monitor the logs in the OpenLineageIn function, we should see:

  1. The OpenLineage payload being sent to Event Hubs
  2. The error message of the event being too large.

If you could provide us that full open lineage payload and the error message, we can troubleshoot further and find a fix for this issue.

@Kishor-Radhakrishnan
Copy link
Author

@wjohnson attaching log having this error from monitor
openlineage.txt.zip

@wjohnson
Copy link
Collaborator

@Kishor-Radhakrishnan thank you so much for this! Would you be able to share any snippet of code on how you're reading the /mnt/lob_shippingcycle/active/gcss/value_added_service_booking/ path?

In this case, it appears to not recognize the partitions of the date_part column and results in a large number of inputs and a large number of metadata for column lineage.

I think there are two approaches to solving this.

  1. (Long Term) Understanding why OpenLineage isn't recognizing the date_part partitions as a single input table.
  2. (Short Term) Adding a feature to the Databricks to Purview Solution Accelerator that limits the size of the column lineage metadata.
    • We could add an app setting named maxColumnLineageSize that would behave similarly to maxQueryPlanSize.

@Kishor-Radhakrishnan
Copy link
Author

@wjohnson I am adding a snippet of code where write occur.

`def write_dataframe(df,tgt_path,part_col):
try:
if part_col != "NA":
(
df
.write
.format("delta")
.mode("overwrite")
.partitionBy(part_col)
.option("mergeSchema","true")
.option("overwriteSchema","true")
.save(tgt_path)
)
print("Data successfully written as partitioned path")
else:
(
df
.write
.mode("overwrite")
.format("delta")
.option("overwriteSchema","true")
.save(tgt_path)
)
print("Data successfully written as non partitioned path")

except Exception as exception:
logger.error("Error in write_dataframe function")
raise Exception('Exception while calling function write_dataframe : ',exception)

def full_load(tgt_tbl,src_data_path,cntrl_tbl,tgt_delta_path,table_type,delete_flag,part_col,opt_flg):
try:
src_format = get_source_data_format(src_data_path)
print("Source data format is {0} ".format(src_format))
src_incremental_df = get_source_data_full(src_data_path,rownumber,delete_flag,src_format)
print("source data created")
write_dataframe(src_incremental_df,tgt_delta_path,part_col)
spark.sql("optimize delta.{target_data_path}".format(target_data_path = tgt_delta_path))
print("optimize command completed")
spark.sql("""insert into {control_table} PARTITION (TABLE_NAME) select '{table_typ}','{target_table}',cast(current_date as timestamp),cast(current_date as string),'INITIAL LOAD' """.format(control_table=cntrl_tbl,target_table=tgt_tbl,table_typ=table_type))
print("Successfully completed the full load for table {0}".format(tgt_tbl))
except Exception as exception:
logger.error("Error in full_load function")
raise Exception('Exception while calling function full_load : ',exception)

`

@Kishor-Radhakrishnan
Copy link
Author

also we face this issue for many different notebooks may be . we need to see why partition is not identified properly.

May we can try short term solution of column lineage skip to see if it capture whole dataset level lineage.

@wjohnson
Copy link
Collaborator

We are adding the column lineage removal as part of the next release.

@wjohnson wjohnson added the vnext Resolved in Next Release label Dec 31, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working vnext Resolved in Next Release
Projects
None yet
Development

No branches or pull requests

3 participants