-
Notifications
You must be signed in to change notification settings - Fork 800
/
Copy pathcommit.go
39 lines (33 loc) · 1.01 KB
/
commit.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package kafka
// A commit represents the instruction of publishing an update of the last
// offset read by a program for a topic and partition.
type commit struct {
topic string
partition int
offset int64
}
// makeCommit builds a commit value from a message, the resulting commit takes
// its topic, partition, and offset from the message.
func makeCommit(msg Message) commit {
return commit{
topic: msg.Topic,
partition: msg.Partition,
offset: msg.Offset + 1,
}
}
// makeCommits generates a slice of commits from a list of messages, it extracts
// the topic, partition, and offset of each message and builds the corresponding
// commit slice.
func makeCommits(msgs ...Message) []commit {
commits := make([]commit, len(msgs))
for i, m := range msgs {
commits[i] = makeCommit(m)
}
return commits
}
// commitRequest is the data type exchanged between the CommitMessages method
// and internals of the reader's implementation.
type commitRequest struct {
commits []commit
errch chan<- error
}