-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathkafka-stream-readable.js
73 lines (73 loc) · 2.08 KB
/
kafka-stream-readable.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.KafkaStreamReadable = void 0;
const stream_1 = require("stream");
/**
* KafkaStreamReadable class
* @extends Readable
*/
class KafkaStreamReadable extends stream_1.Readable {
/**
* Creates a KafkaStreamReadable instance
*/
constructor(kafkaConsumer) {
super({ objectMode: true });
this.kafkaConsumer = kafkaConsumer;
if (!kafkaConsumer) {
throw new Error('A valid KafkaConsumer instance is required.');
}
this.kafkaConsumer = kafkaConsumer;
}
/**
* Subscribes to topics
*/
async subscribe(topics) {
if (!topics || (Array.isArray(topics) && topics.length === 0)) {
throw new Error('Topics must be a non-empty string or array.');
}
await this.kafkaConsumer.subscribe(topics);
}
seek(topic, partition, offsetModel, timeout) {
this.kafkaConsumer.seek(topic, partition, offsetModel, timeout);
}
commit(topic, partition, offset, commit) {
this.kafkaConsumer.commit(topic, partition, offset, commit);
}
/**
* Unsubscribe from topics
*/
unsubscribe() {
this.kafkaConsumer.unsubscribe();
}
/**
* Returns the raw Kafka consumer
* @returns {KafkaConsumer} The Kafka consumer instance
*/
rawConsumer() {
return this.kafkaConsumer;
}
/**
* Internal method called by the Readable stream to fetch data
* @private
*/
async _read() {
try {
const message = await this.kafkaConsumer.recv(); // Call the napi-rs method
if (message) {
this.push(message); // Push message into the stream
}
else {
this.push(null); // No more data, end of stream
}
}
catch (error) {
if (error instanceof Error) {
this.destroy(error);
}
else {
this.destroy();
}
}
}
}
exports.KafkaStreamReadable = KafkaStreamReadable;