-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsqlite_manager.py
146 lines (122 loc) · 5.2 KB
/
sqlite_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import sqlite3
from sqlite3 import Connection
from typing import List, Union
from track_historical_staked_sui import StakedSuiRef, SuiCoinRef, DeletedObjectRef
class SqliteManager:
def __init__(self, version="v1", purge=True):
if version == "v1":
self.init_v1(purge)
else:
self.init_v2(purge)
def init_v2(self, purge=True):
self.conn = sqlite3.connect("sui_data.db", check_same_thread=False)
cursor = self.conn.cursor()
if purge:
cursor.execute("DROP TABLE IF EXISTS staked_sui_v2")
cursor.execute("DROP TABLE IF EXISTS sui_coins_v2")
self.conn.commit()
cursor.execute("""
CREATE TABLE IF NOT EXISTS staked_sui_v2 (
object_id TEXT NOT NULL,
version INTEGER NOT NULL,
at_epoch INTEGER NOT NULL,
owner TEXT NOT NULL,
pool_id TEXT,
principal INTEGER,
stake_activation_epoch INTEGER,
deleted BOOLEAN NOT NULL,
PRIMARY KEY (object_id, version)
)
""")
self.conn.commit()
cursor.execute("""
CREATE TABLE IF NOT EXISTS sui_coins_v2 (
object_id TEXT NOT NULL,
version INTEGER NOT NULL,
at_epoch INTEGER NOT NULL,
owner TEXT NOT NULL,
balance INTEGER,
deleted BOOLEAN NOT NULL,
PRIMARY KEY (object_id, version)
)
""")
self.conn.commit()
cursor.close()
def init_v1(self, purge=True):
self.conn = sqlite3.connect("sui_data.db", check_same_thread=False)
cursor = self.conn.cursor()
if purge:
cursor.execute("DROP TABLE IF EXISTS staked_sui")
cursor.execute("DROP TABLE IF EXISTS sui_coins")
self.conn.commit()
cursor.execute("""
CREATE TABLE IF NOT EXISTS staked_sui (
object_id TEXT NOT NULL,
version INTEGER NOT NULL,
owner TEXT NOT NULL,
pool_id TEXT NOT NULL,
principal INTEGER NOT NULL,
stake_activation_epoch INTEGER NOT NULL,
at_epoch INTEGER NOT NULL,
PRIMARY KEY (object_id, at_epoch)
)
""")
self.conn.commit()
cursor.execute("""
CREATE TABLE IF NOT EXISTS sui_coins (
object_id TEXT NOT NULL,
version INTEGER NOT NULL,
owner TEXT NOT NULL,
balance INTEGER NOT NULL,
at_epoch INTEGER NOT NULL,
PRIMARY KEY (object_id, at_epoch)
)
""")
self.conn.commit()
cursor.close()
def insert_batch_staked_sui_v2(self, items: List[Union[StakedSuiRef, DeletedObjectRef]]):
cursor = self.conn.cursor()
data = []
for item in items:
if isinstance(item, StakedSuiRef):
data.append((item.object_id, item.version, item.at_epoch, item.owner, item.pool_id, item.principal, item.stake_activation_epoch, False))
else:
data.append((item.object_id, item.version, item.at_epoch, item.owner, None, None, None, True))
cursor.executemany("""
INSERT OR REPLACE INTO staked_sui_v2 (object_id, version, at_epoch, owner, pool_id, principal, stake_activation_epoch, deleted)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", data)
self.conn.commit()
cursor.close()
def insert_batch_sui_coin_v2(self, items: List[Union[SuiCoinRef, DeletedObjectRef]]):
cursor = self.conn.cursor()
data = []
for item in items:
if isinstance(item, SuiCoinRef):
data.append((item.object_id, item.version, item.at_epoch, item.owner, item.balance, False))
else:
data.append((item.object_id, item.version, item.at_epoch, item.owner, None, True))
cursor.executemany("""
INSERT OR REPLACE INTO sui_coins_v2 (object_id, version, at_epoch, owner, balance, deleted)
VALUES (?, ?, ?, ?, ?, ?)
""", data)
self.conn.commit()
cursor.close()
def insert_batch_staked_sui(self, items: List[StakedSuiRef]):
cursor = self.conn.cursor()
data = [(item.object_id, item.version, item.owner, item.pool_id, item.principal, item.stake_activation_epoch, item.at_epoch) for item in items]
cursor.executemany("""
INSERT OR REPLACE INTO staked_sui (object_id, version, owner, pool_id, principal, stake_activation_epoch, at_epoch)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", data)
self.conn.commit()
cursor.close()
def insert_batch_sui_coin(self, items: List[SuiCoinRef]):
cursor = self.conn.cursor()
data = [(item.object_id, item.version, item.owner, item.balance, item.at_epoch) for item in items]
cursor.executemany("""
INSERT OR REPLACE INTO sui_coins (object_id, version, owner, balance, at_epoch)
VALUES (?, ?, ?, ?, ?)
""", data)
self.conn.commit()
cursor.close()