-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpkg.go
213 lines (184 loc) · 6.23 KB
/
pkg.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
package main
import (
"bytes"
"fmt"
"os"
"github.com/adammw/kinesiscat/kpl"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/aws/aws-sdk-go/service/kinesis/kinesisiface"
"github.com/golang/protobuf/proto"
"github.com/jessevdk/go-flags"
"sync"
"time"
)
const Md5Len = 16
var ProtobufHeader = []byte("\xf3\x89\x9a\xc2")
var Newline = []byte{'\n'}
var ExponentialBackoffBase = time.Second
type Options struct {
Region string `long:"region" description:"AWS Region" required:"true" env:"AWS_REGION"`
IteratorType string `short:"t" long:"iterator-type" description:"Shard Iterator Type" default:"LATEST" choice:"AT_SEQUENCE_NUMBER" choice:"AFTER_SEQUENCE_NUMBER" choice:"AT_TIMESTAMP" choice:"TRIM_HORIZON" choice:"LATEST"`
Timestamp int64 `long:"timestamp" description:"Starting timestamp (used with AT_TIMESTAMP iterator)"`
SequenceNumber string `long:"sequence-number" description:"Starting sequence number (used with *_SEQUENCE_NUMBER iterators)"`
Args struct {
StreamName string `positional-arg-name:"STREAM_NAME"`
} `positional-args:"yes"`
}
var exitFn = os.Exit
func fatalErr(format string, a ...interface{}) {
fmt.Fprintf(os.Stderr, format, a...)
exitFn(1)
}
func fatalfIfErr(format string, err error) {
if err != nil {
fatalErr(format, err)
}
}
var buildKinesisClient = func(region string) (client kinesisiface.KinesisAPI) {
awsConfig := aws.NewConfig().WithRegion(region)
awsSession, err := session.NewSession(awsConfig)
fatalfIfErr("aws error: %v", err)
return kinesis.New(awsSession, awsConfig)
}
func getShardIds(client kinesisiface.KinesisAPI, streamName string) (shardIds []string) {
shardIds = []string{}
params := kinesis.DescribeStreamInput{
StreamName: aws.String(streamName),
}
err := client.DescribeStreamPages(
¶ms,
func(page *kinesis.DescribeStreamOutput, lastPage bool) bool {
for _, shard := range page.StreamDescription.Shards {
shardIds = append(shardIds, *shard.ShardId)
}
return true
},
)
fatalfIfErr("get shards error: %v", err)
return
}
func getShardIterators(client kinesisiface.KinesisAPI, streamName string, shardIds []string, shardIteratorType string, sequenceNumber string, timestampSec int64) (shardIterators []string) {
shardIterators = []string{}
var startingSequenceNumberPtr = &sequenceNumber
if shardIteratorType != kinesis.ShardIteratorTypeAtSequenceNumber && shardIteratorType != kinesis.ShardIteratorTypeAfterSequenceNumber {
startingSequenceNumberPtr = nil
}
var timestampPtr *time.Time = nil
if shardIteratorType == kinesis.ShardIteratorTypeAtTimestamp {
timestamp := time.Unix(timestampSec, 0)
timestampPtr = ×tamp
}
for _, shardId := range shardIds {
iterator, err := client.GetShardIterator(&kinesis.GetShardIteratorInput{
ShardId: &shardId,
ShardIteratorType: &shardIteratorType,
StartingSequenceNumber: startingSequenceNumberPtr,
Timestamp: timestampPtr,
StreamName: &streamName,
})
fatalfIfErr("get iterator error: %v", err)
shardIterators = append(shardIterators, *iterator.ShardIterator)
}
return
}
func streamRecords(client kinesisiface.KinesisAPI, shardIterator string, fn func(*[]byte)) {
var errCount uint = 0
for {
params := kinesis.GetRecordsInput{ShardIterator: &shardIterator}
resp, err := client.GetRecords(¶ms)
if exponentialBackoff(err, &errCount) {
continue
}
eachRecord(resp.Records, fn)
// TODO: can we do `for shardIterator != nil` instead ?
if resp.NextShardIterator == nil {
break
}
shardIterator = *resp.NextShardIterator
}
}
func exponentialBackoff(err error, errCount *uint) (retry bool) {
if err != nil {
if aerr, ok := err.(awserr.Error); ok {
// retry on ProvisionedThroughputExceededException with exponential backoff
if aerr.Code() == kinesis.ErrCodeProvisionedThroughputExceededException {
sleepTime := ExponentialBackoffBase << *errCount
fmt.Fprintf(os.Stderr, "Throughput limit exceeded, waiting %v\n", sleepTime)
time.Sleep(sleepTime)
*errCount += 1
return true
}
}
fatalErr("get records error: %v", err)
} else {
// reset backoff counter on success
*errCount = 0
}
return
}
// yields each record from given record or aggregatedRecords
func eachRecord(aggregatedRecords []*kinesis.Record, fn func(*[]byte)) {
for _, record := range aggregatedRecords {
isAggregated :=
len(record.Data) > len(ProtobufHeader) &&
bytes.Compare(record.Data[0:len(ProtobufHeader)], ProtobufHeader) == 0
if isAggregated {
// see https://github.com/awslabs/amazon-kinesis-producer/blob/master/aggregation-format.md
agg := &kpl.AggregatedRecord{}
data := record.Data[len(ProtobufHeader) : len(record.Data)-Md5Len]
err := proto.Unmarshal(data, agg)
fatalfIfErr("protobuf unmarshal error: %v", err)
for _, record := range agg.Records {
fn(&record.Data)
}
} else {
fn(&record.Data)
}
}
}
func parallel(things []string, fn func(string)) {
var wg sync.WaitGroup
for _, thing := range things {
wg.Add(1)
go func(thing string) {
defer wg.Done()
fn(thing)
}(thing)
}
wg.Wait()
}
func showAvailableStreams(client kinesisiface.KinesisAPI) {
out, err := client.ListStreams(&kinesis.ListStreamsInput{})
fatalfIfErr("get streams error: %v", err)
fmt.Fprintf(os.Stderr, "No stream name give, please chose one of these streams:\n")
for _, name := range out.StreamNames {
fmt.Fprintf(os.Stderr, "%v\n", *name)
}
}
func main() {
// parse options
var opts Options
_, err := flags.ParseArgs(&opts, os.Args[1:])
if err != nil {
exitFn(2)
}
// show available streams if the user did not give one
var client = buildKinesisClient(opts.Region)
if opts.Args.StreamName == "" {
showAvailableStreams(client)
exitFn(2)
}
// stream from all shards of the stream
var shardIds = getShardIds(client, opts.Args.StreamName)
var shardIterators = getShardIterators(client, opts.Args.StreamName, shardIds, opts.IteratorType, opts.SequenceNumber, opts.Timestamp)
parallel(shardIterators, func(shardIterator string) {
streamRecords(client, shardIterator, func(data *[]byte) {
line := append([]byte{}, *data...)
line = append(line, Newline...)
os.Stdout.Write(line)
})
})
}