Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Increasing trigger frequency #657

Open
AlexisBRENON opened this issue Oct 9, 2023 · 0 comments
Open

Increasing trigger frequency #657

AlexisBRENON opened this issue Oct 9, 2023 · 0 comments
Labels
api: pubsublite Issues related to the googleapis/java-pubsublite-spark API.

Comments

@AlexisBRENON
Copy link

Hi,

I try to use this library and would like to do some debug stuff. I would like to have the processing code executed often (to check sink results, etc.). However the minimal triggering period seems to be 1 minute.
The following code will trigger a sink call (displaying something on the terminal) once each minute (as seen on the output).
Adding a call to .trigger(processingTime="15 seconds") will produce an output every 1 minute plus 15 seconds...

Is there a way to have an output every 15 seconds ?

Kind

from pyspark import sql as psql
from pyspark.sql import functions as F

spark = psql.SparkSession.Builder().getOrCreate()
spark.sparkContext.setLogLevel("WARN")

input_ = (
    (
        spark.readStream.format("pubsublite")
        .option(
            "pubsublite.subscription",
            "...",
        )
        .load()
    )
    .withWatermark("publish_timestamp", "3 seconds")
    .groupBy(F.window("publish_timestamp", "15 seconds"))
    .agg(
        F.min("publish_timestamp"),
        F.max("publish_timestamp"),
        F.min("offset"),
        F.max("offset"),
        F.count("*"),
    )
    .withColumn("ts", F.current_timestamp())
)

query = input_.writeStream.format("console").option("truncate", False).start()

query.awaitTermination()
-------------------------------------------
Batch: 0
-------------------------------------------
+------+----------------------+----------------------+-----------+-----------+--------+---+
|window|min(publish_timestamp)|max(publish_timestamp)|min(offset)|max(offset)|count(1)|ts |
+------+----------------------+----------------------+-----------+-----------+--------+---+
+------+----------------------+----------------------+-----------+-----------+--------+---+

-------------------------------------------
Batch: 1
-------------------------------------------
+------------------------------------------+--------------------------+--------------------------+-----------+-----------+--------+-----------------------+
|window                                    |min(publish_timestamp)    |max(publish_timestamp)    |min(offset)|max(offset)|count(1)|ts                     |
+------------------------------------------+--------------------------+--------------------------+-----------+-----------+--------+-----------------------+
|{2023-10-09 12:26:15, 2023-10-09 12:26:30}|2023-10-09 12:26:19.692721|2023-10-09 12:26:29.948128|5464       |5532       |69      |2023-10-09 12:27:39.789|
|{2023-10-09 12:26:45, 2023-10-09 12:27:00}|2023-10-09 12:26:45.207421|2023-10-09 12:26:59.406092|5596       |5664       |69      |2023-10-09 12:27:39.789|
|{2023-10-09 12:26:30, 2023-10-09 12:26:45}|2023-10-09 12:26:30.952389|2023-10-09 12:26:44.192683|5533       |5595       |63      |2023-10-09 12:27:39.789|
+------------------------------------------+--------------------------+--------------------------+-----------+-----------+--------+-----------------------+

-------------------------------------------
Batch: 2
-------------------------------------------
+------+----------------------+----------------------+-----------+-----------+--------+---+
|window|min(publish_timestamp)|max(publish_timestamp)|min(offset)|max(offset)|count(1)|ts |
+------+----------------------+----------------------+-----------+-----------+--------+---+
+------+----------------------+----------------------+-----------+-----------+--------+---+

-------------------------------------------
Batch: 3
-------------------------------------------
+------------------------------------------+--------------------------+--------------------------+-----------+-----------+--------+-----------------------+
|window                                    |min(publish_timestamp)    |max(publish_timestamp)    |min(offset)|max(offset)|count(1)|ts                     |
+------------------------------------------+--------------------------+--------------------------+-----------+-----------+--------+-----------------------+
|{2023-10-09 12:27:45, 2023-10-09 12:28:00}|2023-10-09 12:27:45.252517|2023-10-09 12:27:59.607855|5896       |6000       |105     |2023-10-09 12:28:37.213|
|{2023-10-09 12:28:15, 2023-10-09 12:28:30}|2023-10-09 12:28:15.905554|2023-10-09 12:28:29.104255|6088       |6147       |60      |2023-10-09 12:28:37.213|
|{2023-10-09 12:27:30, 2023-10-09 12:27:45}|2023-10-09 12:27:37.273949|2023-10-09 12:27:44.23746 |5710       |5895       |186     |2023-10-09 12:28:37.213|
|{2023-10-09 12:27:00, 2023-10-09 12:27:15}|2023-10-09 12:27:00.416084|2023-10-09 12:27:07.588377|5665       |5709       |45      |2023-10-09 12:28:37.213|
|{2023-10-09 12:28:00, 2023-10-09 12:28:15}|2023-10-09 12:28:00.622214|2023-10-09 12:28:14.896156|6001       |6087       |87      |2023-10-09 12:28:37.213|
+------------------------------------------+--------------------------+--------------------------+-----------+-----------+--------+-----------------------+

-------------------------------------------
Batch: 4
-------------------------------------------
+------+----------------------+----------------------+-----------+-----------+--------+---+
|window|min(publish_timestamp)|max(publish_timestamp)|min(offset)|max(offset)|count(1)|ts |
+------+----------------------+----------------------+-----------+-----------+--------+---+
+------+----------------------+----------------------+-----------+-----------+--------+---+

-------------------------------------------
Batch: 5
-------------------------------------------
+------------------------------------------+--------------------------+--------------------------+-----------+-----------+--------+-----------------------+
|window                                    |min(publish_timestamp)    |max(publish_timestamp)    |min(offset)|max(offset)|count(1)|ts                     |
+------------------------------------------+--------------------------+--------------------------+-----------+-----------+--------+-----------------------+
|{2023-10-09 12:28:45, 2023-10-09 12:29:00}|2023-10-09 12:28:45.473525|2023-10-09 12:28:59.785851|6253       |6339       |87      |2023-10-09 12:29:37.296|
|{2023-10-09 12:29:00, 2023-10-09 12:29:15}|2023-10-09 12:29:00.791034|2023-10-09 12:29:14.027443|6340       |6408       |69      |2023-10-09 12:29:37.296|
|{2023-10-09 12:29:15, 2023-10-09 12:29:30}|2023-10-09 12:29:15.03666 |2023-10-09 12:29:29.240616|6409       |6471       |63      |2023-10-09 12:29:37.296|
|{2023-10-09 12:28:30, 2023-10-09 12:28:45}|2023-10-09 12:28:30.114606|2023-10-09 12:28:44.463761|6148       |6252       |105     |2023-10-09 12:29:37.296|
+------------------------------------------+--------------------------+--------------------------+-----------+-----------+--------+-----------------------+

-------------------------------------------
Batch: 6
-------------------------------------------
+------+----------------------+----------------------+-----------+-----------+--------+---+
|window|min(publish_timestamp)|max(publish_timestamp)|min(offset)|max(offset)|count(1)|ts |
+------+----------------------+----------------------+-----------+-----------+--------+---+
+------+----------------------+----------------------+-----------+-----------+--------+---+

-------------------------------------------
Batch: 7
-------------------------------------------
+------------------------------------------+--------------------------+--------------------------+-----------+-----------+--------+-----------------------+
|window                                    |min(publish_timestamp)    |max(publish_timestamp)    |min(offset)|max(offset)|count(1)|ts                     |
+------------------------------------------+--------------------------+--------------------------+-----------+-----------+--------+-----------------------+
|{2023-10-09 12:29:30, 2023-10-09 12:29:45}|2023-10-09 12:29:30.251257|2023-10-09 12:29:44.538427|6472       |6552       |81      |2023-10-09 12:30:38.172|
+------------------------------------------+--------------------------+--------------------------+-----------+-----------+--------+-----------------------+
@product-auto-label product-auto-label bot added the api: pubsublite Issues related to the googleapis/java-pubsublite-spark API. label Oct 9, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsublite Issues related to the googleapis/java-pubsublite-spark API.
Projects
None yet
Development

No branches or pull requests

1 participant