Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Kafka/AVRO output bot. #1251

Draft
wants to merge 5 commits into
base: develop
Choose a base branch
from

Conversation

z0r0
Copy link

@z0r0 z0r0 commented Jun 4, 2018

This is my first output bot contribution, and should be considered a work in progress.

This output bot goes a bit farther than simply outputting all threat intel to a kafka topic. The idea is that the intelligence can get routed to a topic that's dedicated to its source intelligence type. I'm opening this PR to solicit feedback on ironing out this bot.

@z0r0
Copy link
Author

z0r0 commented Jun 4, 2018

First of my questions: in order to make avro work, i've got to define an avro schema. For IntelMQ, this is a large data type mapping that should be stored as a configuration template. An example can be seen Here.

Another thing to note is that for output, especially to avro, fields need to be flattened, and all . chars need to be removed from field names, so these field names are statically defined in avro as _'s. I'd be curious about any thoughts about the liberties taken with regards to field renaming + flattening for this use case.

@codecov-io
Copy link

codecov-io commented Jun 4, 2018

Codecov Report

Merging #1251 into develop will decrease coverage by 0.32%.
The diff coverage is 25%.

@@             Coverage Diff             @@
##           develop    #1251      +/-   ##
===========================================
- Coverage    75.14%   74.81%   -0.33%     
===========================================
  Files          260      261       +1     
  Lines        12131    12211      +80     
  Branches      1623     1637      +14     
===========================================
+ Hits          9116     9136      +20     
- Misses        2662     2721      +59     
- Partials       353      354       +1
Impacted Files Coverage Δ
intelmq/bots/outputs/kafka/output.py 25% <25%> (ø)

@ghost ghost self-assigned this Jun 5, 2018
@ghost ghost self-requested a review June 5, 2018 09:31
@ghost ghost added this to the 1.1.0 milestone Jun 5, 2018
@ghost ghost added feature request Indicates new feature requests component: bots labels Jun 5, 2018
@ghost
Copy link

ghost commented Jun 5, 2018

First of my questions: in order to make avro work, i've got to define an avro schema. For IntelMQ, this is a large data type mapping that should be stored as a configuration template. An example can be seen Here.

Where's the question? :D

Another thing to note is that for output, especially to avro, fields need to be flattened, and all . chars need to be removed from field names, so these field names are statically defined in avro as _'s. I'd be curious about any thoughts about the liberties taken with regards to field renaming + flattening for this use case.

This is also the case for the elastic search bot, where a parameter has been used to specify the replacement character and _ was the default. (Not necessary any more in ES, see #1188)

@z0r0
Copy link
Author

z0r0 commented Jun 5, 2018

So the question is simply, where should i put the files in the source so they can be referenced? Should i add them in contrib? For my instance i put config files in /opt/intelmq/var/lib/bots/kafka-output/[key_file.avsc, topics.conf, value_file.avsc]

Copy link

@ghost ghost left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good

except ImportError:
avro = None

if avro is not None:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this and the two lines above be removed?

except ImportError:
    from confluent_kafka.avro import AvroProducer

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm just using this to throw better errors in init() for external dependencies. I can remove them if you'd like.

avro = None

if avro is not None:
from confluent_kafka.avro import AvroProducer
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this for a different library version or why is it necessary? Please document the reason inline.

And this line is not in a try-except block.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did this again for better error handling as i was writing this. It can probably be removed as there's multiple python packages which fit within the kafka namespace. many of them have Producer class definitions, but no avro class definitions. confluent_kafka is a more specific namespace.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did this again for better error handling as i was writing this.

But an ImportError is not catched here.

if self.enable_avro is False:
self.producer.produce(self.kafka_topic, dumps(event_dict).encode('utf-8'), callback=self.delivery_report)
self.acknowledge_message()
self.kafka.flush()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is self.kafka defined?

self.logger.debug('Shipped %s to topic: %s', format(submit_key), format(event_topic))
self.producer.produce(topic=event_topic, value=event_dict, key=submit_key)
self.acknowledge_message()
self.producer.poll(0)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better do this check before acknowledging and raise an error if that happened so the message does not get lost.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't look like the AvroProducer has much in terms of error handling, looking at it now, i don't think that poll() does anything. I think i'm just going to have to catch serialization errors on produce.

Here's a link

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a message could not be successfully delivered, self.acknowledge() should not be called. For error handling (and logging), an Exception should be raised.

@ghost
Copy link

ghost commented Jun 21, 2018

AFAIU this is still WIP, right?

Changing the Milestone then to the next version, 1.1.0 is now coming soon.

@ghost ghost modified the milestones: 1.1.0, 1.2.0 Jun 21, 2018
@z0r0
Copy link
Author

z0r0 commented Jun 21, 2018 via email

@ghost ghost added needs: feedback and removed needs: feedback labels Apr 5, 2019
@ghost ghost modified the milestones: 1.2.0, 2.0.0 Apr 9, 2019
@ghost ghost marked this pull request as draft May 14, 2020 11:42
@ghost
Copy link

ghost commented Apr 8, 2021

@z0r0 are you still working on this? The PR is in a pretty good shape and it would be a pity if we'd not continue the work.

@z0r0
Copy link
Author

z0r0 commented Apr 30, 2021

Hello, we ended up not implementing it and went with something else at work. That being said, I'm going to finish this one up on my own time in the coming weeks, so stay tuned.

@ghost
Copy link

ghost commented May 3, 2021

Hello, we ended up not implementing it and went with something else at work. That being said, I'm going to finish this one up on my own time in the coming weeks, so stay tuned.

Sorry to hear that and thanks for your offer to finish it! Please let us know how we can support you.

@ghost ghost removed their assignment Aug 20, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants