-
Notifications
You must be signed in to change notification settings - Fork 483
/
Copy pathkcat.h
237 lines (195 loc) · 6.72 KB
/
kcat.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
/*
* kcat - Apache Kafka consumer and producer
*
* Copyright (c) 2015-2019, Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#pragma once
#include <inttypes.h>
#include <errno.h>
#include <string.h>
#include <stdlib.h>
#include <librdkafka/rdkafka.h>
#include "rdport.h"
#ifdef _MSC_VER
#pragma comment(lib, "librdkafka.lib")
#include "win32/win32_config.h"
#else
#include "config.h"
#endif
#if ENABLE_AVRO
#include <libserdes/serdes.h>
#endif
#ifdef RD_KAFKA_V_HEADER
#define HAVE_HEADERS 1
#else
#define HAVE_HEADERS 0
#endif
#if RD_KAFKA_VERSION >= 0x000b0500
#define HAVE_CONTROLLERID 1
#else
#define HAVE_CONTROLLERID 0
#endif
#if RD_KAFKA_VERSION >= 0x01030000
#define ENABLE_MOCK 1
#endif
typedef enum {
KC_FMT_STR,
KC_FMT_OFFSET,
KC_FMT_KEY,
KC_FMT_KEY_LEN,
KC_FMT_PAYLOAD,
KC_FMT_PAYLOAD_LEN,
KC_FMT_PAYLOAD_LEN_BINARY,
KC_FMT_TOPIC,
KC_FMT_PARTITION,
KC_FMT_TIMESTAMP,
KC_FMT_HEADERS
} fmt_type_t;
#define KC_FMT_MAX_SIZE 128
typedef enum {
KC_MSG_FIELD_VALUE,
KC_MSG_FIELD_KEY,
KC_MSG_FIELD_CNT
} kc_msg_field_t;
struct conf {
int run;
int verbosity;
int exitcode;
int exitonerror;
char mode;
int flags;
#define CONF_F_FMT_JSON 0x1 /* JSON formatting */
#define CONF_F_KEY_DELIM 0x2 /* Producer: use key delimiter */
#define CONF_F_OFFSET 0x4 /* Print offsets */
#define CONF_F_TEE 0x8 /* Tee output when producing */
#define CONF_F_NULL 0x10 /* -Z: Send empty messages as NULL */
#define CONF_F_LINE 0x20 /* Read files in line mode when producing */
#define CONF_F_APIVERREQ 0x40 /* Enable api.version.request=true */
#define CONF_F_APIVERREQ_USER 0x80 /* User set api.version.request */
#define CONF_F_NO_CONF_SEARCH 0x100 /* Disable default config file search */
#define CONF_F_BROKERS_SEEN 0x200 /* Brokers have been configured */
#define CONF_F_FMT_AVRO_KEY 0x400 /* Convert key from Avro to JSON */
#define CONF_F_FMT_AVRO_VALUE 0x800 /* Convert value from Avro to JSON */
#define CONF_F_SR_URL_SEEN 0x1000 /* schema.registry.url/-r seen */
char *delim;
size_t delim_size;
char *key_delim;
size_t key_delim_size;
struct {
fmt_type_t type;
const char *str;
int str_len;
} fmt[KC_FMT_MAX_SIZE];
int fmt_cnt;
/**< Producer: per-field pack-format, see pack()
* Consumer: per-field unpack-format, see unpack(). */
const char *pack[KC_MSG_FIELD_CNT];
int msg_size;
char *brokers;
char *topic;
int32_t partition;
rd_kafka_headers_t *headers;
char *group;
char *fixed_key;
int32_t fixed_key_len;
int64_t offset;
#if RD_KAFKA_VERSION >= 0x00090300
int64_t startts;
int64_t stopts;
#endif
int64_t msg_cnt;
int exit_eof;
int eof_cnt; /**< Current number of partitions reached EOF */
rd_kafka_topic_partition_list_t *assignment; /**< Current -G consumer
* assignment */
int metadata_timeout;
char *null_str;
int null_str_len;
int txn;
rd_kafka_conf_t *rk_conf;
rd_kafka_topic_conf_t *rkt_conf;
rd_kafka_t *rk;
rd_kafka_topic_t *rkt;
char *debug;
int term_sig; /**< Termination signal */
#if ENABLE_AVRO
serdes_conf_t *srconf;
char *schema_registry_url;
#endif
#if ENABLE_MOCK
struct {
int broker_cnt;
} mock;
#endif
};
extern struct conf conf;
void RD_NORETURN fatal0 (const char *func, int line,
const char *fmt, ...);
void error0 (int erroronexit, const char *func, int line,
const char *fmt, ...);
#define KC_FATAL(.../*fmt*/) fatal0(__FUNCTION__, __LINE__, __VA_ARGS__)
#define KC_ERROR(.../*fmt*/) error0(conf.exitonerror, __FUNCTION__, __LINE__, __VA_ARGS__)
/* Info printout */
#define KC_INFO(VERBLVL,.../*fmt*/) do { \
if (conf.verbosity >= (VERBLVL)) \
fprintf(stderr, "%% " __VA_ARGS__); \
} while (0)
/*
* format.c
*/
void pack_check (const char *what, const char *fmt);
void fmt_msg_output (FILE *fp, const rd_kafka_message_t *rkmessage);
void fmt_parse (const char *fmt);
void fmt_init (void);
void fmt_term (void);
#if ENABLE_JSON
/*
* json.c
*/
void fmt_msg_output_json (FILE *fp, const rd_kafka_message_t *rkmessage);
void metadata_print_json (const struct rd_kafka_metadata *metadata,
int32_t controllerid);
void partition_list_print_json (const rd_kafka_topic_partition_list_t *parts,
void *json_gen);
void fmt_init_json (void);
void fmt_term_json (void);
int json_can_emit_verbatim (void);
#endif
#if ENABLE_AVRO
/*
* avro.c
*/
char *kc_avro_to_json (const void *data, size_t data_len, int *schema_idp,
char *errstr, size_t errstr_size);
void kc_avro_init (const char *key_schema_name,
const char *key_schema_path,
const char *value_schema_name,
const char *value_schema_path);
void kc_avro_term (void);
#endif
/*
* tools.c
*/
int query_offsets_by_time (rd_kafka_topic_partition_list_t *offsets);