Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support metric to monitor import lag #179

Open
cl-a-us opened this issue Aug 2, 2022 · 8 comments
Open

Support metric to monitor import lag #179

cl-a-us opened this issue Aug 2, 2022 · 8 comments

Comments

@cl-a-us
Copy link

cl-a-us commented Aug 2, 2022

We need a metric to measure the import lag during processing messages from a jdbc source. We were wondering whether it is possible to add further metrics to the source connector that can be added to the kafka connect Prometheus metrics.
Our goal: Especially during first or initial import of database rows we want to monitor how many rows still need to be imported and are not already processed.

There is a metric that displays the number of polled rows (e.g. source-record-poll-total). This metric represents the number of rows which the connector has already processed/read from database. One cannot determine how many unpolled rows are left in the database and still need to be read from db.

I am new to kafka connect plugin development. First of all: is it possible to add such a metric to this kafka connect plugin? Second, if we would implement that metric would you merge that pull request to your code base?

@mstein11
Copy link

mstein11 commented Aug 3, 2022

This would be important to us too. Is anybody here that can answer a few question regrading how this can be achieved? After that we would be open to implement it ourselfs and merge it back here.

@jlprat, @ftisiot

@Ugbot
Copy link

Ugbot commented Aug 3, 2022

Hiya!
Thanks for commenting, we're always looking for insight into usage patterns and extra value we can add.

However, there's only one answer in SW engineering: "it depends" So here are some thoughts & questions

Key questions for this are about expectations of usage, and what you're looking to use this on.

Effectively you'd be running a count as well as the query to get up to the MAX row per poll, then submitting this metric in addition to storing it. Assuming its for just the initial spool up, this could simply be a count run on startup of the task then you'd more or less get the lag running down to zero over time, and that would be relatively easy to manage, if a touch awkward.

BUT, if you are running fast enough that you expect the Connector to get behind a lot, its likely that you'd be better served with something that reads directly from the write ahead log rather than querying.
The problem here is that it is rather hard to quantify the cost of running this, as the count could scan a LOT of rows/tables dependent on the size of the query. Also as the tables can change at any time, there are a lot of assumptions that would have to be built in, making it rather specific to a mostly append only table/view.

Questions:
Is this a continuous metric or a startup metric?
Is this a streaming task or something more batch/ETL based?
How complex is your query?

@cl-a-us
Copy link
Author

cl-a-us commented Aug 4, 2022

We plan to use such a metric for an initial spool up. After all rows are imported other processes in our application landscape can start processing. In other words, other processes must wait for the initial spool up to finish. Our main goal is to recognize that the spool up is finished.

Technically, it would be a count on the database. If I understand you correct, you suggest implementing the count query during jdbcSourceTask.start(…) or jdbcSourceTask.poll depending on whether the count must be updated or not.
How would you store the metric? Is it possible within a kafka connector to define custom metrics that are forwarded to kafka connect and will be provided to external sources via Prometheus? Or would you suggest another way to provide the metric to external sources?

To answer your questions:

  • Continuous vs startup metric: In our use case we would be fine with a startup metric. We do not expect the number of rows to grow over time.
  • Streaming vs batch/ETL task: We would like to use the additional metric for monitoring an initial load of data. So I would say our use case is more batch/ETL based.
  • Query complexity: Our queries are rather complex. We use multiple joins and it takes the database a while to execute those queries.

@Ugbot
Copy link

Ugbot commented Aug 9, 2022

Short version:
We could more or less do either, adding it to metrics would be easest but its then on you to store/edit.
Personally I'd store it and issue an update per task/pull, allowing you to see the updates

@cl-a-us
Copy link
Author

cl-a-us commented Aug 10, 2022

Hey @Ugbot,

If you talk about metrics, do you talk about a metric that is published via JMX MBean? Or is there another way to provide metrics. I’m not sure, if I got what you mean by 'metrics'.
Does your second proposal refer to a general metrics topic that is provided by kafka connect?
If I understand you correct, we prefer the first option. We already use a jmxPrometheusExporter to extract and store the values in our Prometheus. To store a further value would just be another config line for the exporter.

@Ugbot
Copy link

Ugbot commented Aug 10, 2022

Hiya!
Sorry that last post seems to have gotten truncated a bit. some of the context was lost.
Firstly I seem to be using log vs metric interchangeably. What is the ideal format for you?
It would seem to be JMX from your comments.

Storing this data is a bit of an issue, you could put it into your DB, but from the description of the task, it does not actually need to be stored. The table is not going to change so having to do the count again on task startup should not be a huge hit. I'd literally keep it in memory and just have it published with the task poll logging.

I'll need to dig into the JMX stuff a bit to see if I can confirm this will work as expected....

@cl-a-us
Copy link
Author

cl-a-us commented Sep 5, 2022

Hi @Ugbot,

I did some programming and provided a solution with a PullRequest!
As a first step, my solution on startup just executes an initial Select count(*) ... with the given query or tables. It then provides that metric to an JMX endpoint. All other kafka connect metrics are provided via jmx export as well. I just needed to add another jmxPrometheusExporter rule to provide the new metric via Prometheus:

- pattern: io.aiven.connect.jdbc.initialImportCount<task="(.*)", topic="(.*)">
  name: aiven_startup_count
  labels:
    topic: "$2"
    task: "$1"
  help: "Kafka JMX metric initialImportCount"
  type: COUNTER

Is there anything I can do to speed up the merge process?

@cl-a-us
Copy link
Author

cl-a-us commented Oct 7, 2022

Hey @Ugbot,

I’d like to kindly ask if there is any feedback to the code review?
It would be great to see a progress.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants