-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathclickhouse_dump.go
158 lines (136 loc) · 3.92 KB
/
clickhouse_dump.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
package clickhousegraphqlgo
import (
"database/sql"
"fmt"
"log"
"time"
"github.com/ClickHouse/clickhouse-go"
kiteticker "github.com/zerodha/gokiteconnect/v3/ticker"
)
var (
err error
token_list []uint32
ticker *kiteticker.Ticker
dbClient *sql.DB
)
// Create new client instance
func New(clientPar ClientParam) *Client {
// Use DSN as your clickhouse DB setup.
// visit https://github.com/ClickHouse/clickhouse-go#dsn to know more
if clientPar.DBSource == "" {
clientPar.DBSource = "tcp://127.0.0.1:9000?debug=true"
}
connect, err := sql.Open("clickhouse", clientPar.DBSource)
if err = connect.Ping(); err != nil {
if exception, ok := err.(*clickhouse.Exception); ok {
fmt.Printf("[%d] %s \n%s\n", exception.Code, exception.Message, exception.StackTrace)
} else {
fmt.Println(err)
}
}
schemaSingle, err := createSchema(connect, "single")
if err != nil {
log.Fatalf("failed to create single object schema, error: %v", err)
}
schemaList, err := createSchema(connect, "List")
if err != nil {
log.Fatalf("failed to create list of object schema, error: %v", err)
}
return &Client{
dbClient: connect,
apiKey: clientPar.ApiKey,
accessToken: clientPar.AccessToken,
schemaSingle: &schemaSingle,
schemaList: &schemaList,
}
}
// setDB creates tickstore table
func (c *Client) setDB() {
_, err = c.dbClient.Exec(`
CREATE TABLE IF NOT EXISTS tickstore (
instrument_token UInt32,
timestamp DateTime('Asia/Calcutta'),
last_price FLOAT(),
average_traded_price FLOAT(),
volume_traded UInt32,
oi UInt32
) engine=MergeTree()
ORDER BY (timestamp)
`)
if err != nil {
log.Fatalf("Error creating table: %v", err)
}
}
// Triggered when any error is raised
func onError(err error) {
fmt.Println("Error: ", err)
}
// Triggered when websocket connection is closed
func onClose(code int, reason string) {
// Close DB client once
defer dbClient.Close()
fmt.Println("Close: ", code, reason)
}
// Triggered when connection is established and ready to send and accept data
func onConnect() {
err := ticker.Subscribe(token_list)
if err != nil {
fmt.Println("err: ", err)
}
modeerr := ticker.SetMode("full", token_list)
if modeerr != nil {
fmt.Println("err: ", modeerr)
}
}
// Triggered when tick is received
func onTick(tick kiteticker.Tick) {
tx, err := dbClient.Begin()
if err != nil {
log.Fatalf("Error starting DB transaction: %v", err)
}
stmt, err := tx.Prepare(`INSERT INTO tickstore (instrument_token, timestamp, last_price,
average_traded_price, volume_traded, oi) VALUES (?, ?, ?, ?, ?, ?)`)
if err != nil {
log.Fatalf("Error creating sql statement: %v", err)
}
// Load tick data to DB
if _, err := stmt.Exec(
tick.InstrumentToken,
tick.Timestamp.Time,
tick.LastPrice,
tick.AverageTradePrice,
tick.VolumeTraded,
tick.OI,
); err != nil {
log.Fatalf("Error executing a query: %v", err)
}
if err := tx.Commit(); err != nil {
log.Fatalf("Error committing the sql transaction: %v", err)
}
}
// Triggered when reconnection is attempted which is enabled by default
func onReconnect(attempt int, delay time.Duration) {
fmt.Printf("Reconnect attempt %d in %fs\n", attempt, delay.Seconds())
}
// Triggered when maximum number of reconnect attempt is made and the program is terminated
func onNoReconnect(attempt int) {
fmt.Printf("Maximum no of reconnect attempt reached: %d", attempt)
}
// ClickhouseDump starts ticker and dumps tickdata to clickhouse
func (c *Client) ClickhouseDump(tokens []uint32) {
token_list = tokens
dbClient = c.dbClient
// Perform DB setup
c.setDB()
// Create new Kite ticker instance
ticker = kiteticker.New(c.apiKey, c.accessToken)
// Assign callbacks
ticker.OnError(onError)
ticker.OnClose(onClose)
ticker.OnConnect(onConnect)
ticker.OnReconnect(onReconnect)
ticker.OnNoReconnect(onNoReconnect)
ticker.OnTick(onTick)
// Start the connection
ticker.Serve()
}