Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
397582b
commit 582aab9
Showing
8 changed files
with
87 additions
and
9 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
import * as KafkaJS from 'kafkajs'; | ||
|
||
export interface MicroserviceMessage<BodyType> { | ||
messageSchema: any; | ||
getMessageName(): string; | ||
fullyQualifiedName(): string; | ||
} | ||
|
||
|
||
export class KafkaClient { | ||
kafka: KafkaJS.Kafka; | ||
producer: KafkaJS.Producer; | ||
protected isReady = false; | ||
|
||
/** | ||
* Instantiates a new Kafka client | ||
* | ||
* @param brokers - List of brokers to connect to | ||
* @param kafkaConfig - Kafka client configuration | ||
*/ | ||
constructor(kafkaConfig: KafkaJS.KafkaConfig) { | ||
this.kafka = new KafkaJS.Kafka({ | ||
clientId: 'firebase-auth-producer', | ||
...kafkaConfig, | ||
}); | ||
this.producer = this.kafka.producer(kafkaConfig) | ||
} | ||
|
||
async init() { | ||
if(this.ready()) | ||
await this.producer.connect(); | ||
} | ||
|
||
/** | ||
* Returns true if the Producer is ready to produce messages | ||
*/ | ||
public ready(): boolean { | ||
return this.isReady; | ||
} | ||
|
||
async disconnect() { | ||
await this.producer.disconnect(); | ||
} | ||
|
||
async send<T>( | ||
topicName: string, | ||
payload: T, | ||
): Promise<KafkaJS.RecordMetadata[]> { | ||
const result = await this.producer.send({ | ||
topic: topicName, | ||
messages: [ | ||
{ | ||
value: JSON.stringify(payload), | ||
}, | ||
], | ||
}); | ||
return result; | ||
} | ||
} |
Empty file.
Empty file.
Empty file.
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,11 +1,15 @@ | ||
interface Configuration { | ||
broker: string; | ||
topic: string; | ||
fieldsToInclude: string[]; | ||
location: string; | ||
} | ||
|
||
const config: Configuration = { | ||
fieldsToInclude: process.env.FIELDS_TO_INCLUDE!.split(','), | ||
location: process.env.FUNCTIONS_LOCATION!, | ||
broker: process.env.KAFKA_BROKER!, | ||
topic: process.env.KAFKA_TOPIC! | ||
}; | ||
|
||
export default config; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters