-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathmain.py
506 lines (455 loc) · 19.5 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
# pyright: reportUnboundVariable=false
from Common.config import (
NONINTERACTIVE,
TARI_BINS_FOLDER,
TARI_DAN_BINS_FOLDER,
BURN_AMOUNT,
COLOR_RESET,
CREATE_ACCOUNTS,
DATA_FOLDER,
DEFAULT_TEMPLATE_FUNCTION,
DELETE_EVERYTHING_BUT_TEMPLATES_BEFORE,
DELETE_TEMPLATES,
DELETE_STDOUT_LOGS,
SPAWN_INDEXERS,
SPAWN_VNS,
SPAWN_WALLETS,
STEP_COLOR,
STEP_OUTER_COLOR,
STEPS_CREATE_ACCOUNT,
STEPS_CREATE_TEMPLATE,
STEPS_RUN_SIGNALLING_SERVER,
STEPS_RUN_TARI_CONNECTOR_TEST_SITE,
STRESS_TEST,
TEMPLATES,
USE_BINARY_EXECUTABLE,
)
from Processes.miner import miner
from Processes.signaling_server import signaling_server
from Stats.stats import stats
from Processes.tari_connector_sample import TariConnectorSample
from Processes.template import Template
from Processes.template_server import Server
from Common.threads import threads
from Common.local_ip import local_ip
from commands import Commands
from webui import JrpcWebuiServer
from template_web import TemplateWebServer
import os
import re
import shutil
import time
import traceback
import webbrowser
from Collections.base_nodes import base_nodes
from Collections.base_wallets import base_wallets
from Collections.validator_nodes import validator_nodes
from Collections.indexers import indexers
from Collections.dan_wallet_daemons import dan_wallets
from typing import Any
accounts: dict[str, Any] = {}
def cli_loop():
global miner, tari_connector_sample, webui_server, accounts
try:
while True:
try:
command = input("Command (press ctrl-c to exit or type 'help'): ")
except:
# this is for ctrl-c
print("ctrl-c exiting...")
break
for_eval = command
command = command.lower()
try:
if command == "help":
print("Commands available : ")
print("burn <public_key> <path = 'burn.json'>")
print("mine <number of blocks> - to mine blocks")
print("grpc <node|wallet> - to get grpc port of node or wallet")
print("jrpc <vn <id>|dan <id>|indexer> - to get jrpc port of vn with id <id>, dan wallet with id <id> or indexer")
print(
"http <vn <id>|dan <id>|indexer|connector|webui> - to get http address of vn with id <id>, dan with id <id>, indexer or connector (connector sample page), webui"
)
print(
"stop <node|wallet|indexer|vn <id>|dan <id>> - to stop node, wallet, indexer, vn with id or dan wallet with id, the command how to run it locally will be printed without the `-n` (non-interactive switch)"
)
print("live - list of things that are still running from this python (base node, wallet, ...)")
print("---")
print("All indices are zero based")
elif command.startswith("burn"):
public_key = command.split()[1]
outfile = "burn.json"
if len(command.split()) > 2:
outfile = command.split()[2]
commands.burn(public_key, outfile, BURN_AMOUNT)
elif command.startswith("mine"):
blocks = int(command.split()[1])
commands.mine(blocks)
elif command.startswith("grpc"):
what = command.split()[1]
print(commands.grpc(what))
elif command.startswith("http"):
url = f"http://{local_ip}:{webui_server.webui.http_port}"
print(url)
webbrowser.open(url)
elif command.startswith("stop"):
what = command.split(maxsplit=1)[1]
# This should be 'VN <id>'
if r := re.match(r"vn (\d+)", what):
vn_id = int(r.group(1))
validator_nodes.stop(vn_id)
elif r := re.match(r"dan (\d+)", what):
dan_id = int(r.group(1))
dan_wallets.stop(dan_id)
elif r := re.match(r"indexer (\d+)", what):
indexer_id = int(r.group(1))
indexers.stop(indexer_id)
elif r := re.match(r"wallet (\d+)", what):
wallet_id = int(r.group(1))
base_wallets.stop(wallet_id)
elif r := re.match(r"node (\d+)", what):
node_id = int(r.group(1))
base_nodes.stop(node_id)
else:
print("Invalid stop command", command)
# which = what.split()
elif command.startswith("start"):
what = command.split(maxsplit=1)[1]
r = re.match(r"^(vn|indexer)(?: (\d+))?(?: ([a-zA-Z0-9]{64}))?$", what)
if r is None:
print("Invalid start command", command)
continue
what, index, pk = r.groups()
if what == "vn":
if index is None:
index = max(validator_nodes.validator_nodes.keys()) + 1
else:
index = int(index)
validator_nodes.add_validator_node(index)
print("Adding VN", index)
validator_nodes.validator_nodes[index].register(local_ip, pk)
elif what == "indexer":
if index is None:
index = max(indexers.indexers.keys()) + 1
else:
index = int(index)
indexers.add_indexer(index)
elif command == "live":
if "base_node" in locals():
print("Base node is running")
if "wallet" in locals():
print("Wallet is running")
validator_nodes.live()
dan_wallets.live()
indexers.live()
elif command == "tx":
template.call_function(TEMPLATE_FUNCTION[0], dan_wallets.any().jrpc_client, FUNCTION_ARGS)
pass
elif command.startswith("st"):
if command == "st":
stress_test()
else:
stress_test(int(command.split()[1]))
elif command.startswith("eval"):
# In case you need for whatever reason access to the running python script
eval(for_eval[len("eval ") :])
elif command == "stats":
print(stats)
elif command == "exit":
break
else:
print("Wrong command")
except Exception as ex:
print("Command errored:", ex)
traceback.print_exc()
except Exception as ex:
print("Failed in CLI loop:", ex)
traceback.print_exc()
del webui_server
# this is how many times we send the funds back and forth for each of two wallets
def stress_test(num_of_tx: int = 1, dry_run: bool = False):
# The dry run is ignored for now, once there will be a change in the PR I will update this.
global miner, tari_connector_sample, server
global total_num_of_tx
total_num_of_tx = 0
def send_tx(account0: int, account1: int):
global total_num_of_tx
res_addr = "resource_0101010101010101010101010101010101010101010101010101010101010101"
acc0, dan0 = accounts[account0]
acc1, dan1 = accounts[account1]
public_key0 = acc0["public_key"]
public_key1 = acc1["public_key"]
for i in range(num_of_tx):
print(f"tx {account0} -> {account1} ({i})")
# dan0.jrpc_client.confidential_transfer(acc0, 1, res_addr, public_key1, 2000)
# dan1.jrpc_client.confidential_transfer(acc1, 1, res_addr, public_key0, 2000)
dan0.jrpc_client.transfer(acc0, 2000, res_addr, public_key1, 2000, dry_run)
total_num_of_tx += 1
# dan_wallets[dst_id].jrpc_client.transfer(dst_account, 1, res_addr, src_public_key, 2000)
# We will send back and forth between two wallets. So with n*2 wallets we have n concurrent TXs
start = time.time()
threads.set_semaphore_limit(0)
for id in range(0, len(accounts.keys()) - 1, 2):
id1 = list(accounts.keys())[id]
id2 = list(accounts.keys())[id + 1]
threads.add(send_tx, (id1, id2))
threads.wait()
total_time = time.time() - start
if total_num_of_tx:
print(f"Total number of Tx {total_num_of_tx}")
print(f"Total time {total_time}")
print(f"Number of concurrent TXs : {SPAWN_WALLETS//2}")
print(f"Avg time for one TX {total_time/total_num_of_tx}")
else:
print("No TXs")
def print_step(step_name: str):
print(f"{STEP_OUTER_COLOR}### {STEP_COLOR}{step_name.upper()} {STEP_OUTER_COLOR}###{COLOR_RESET}")
def check_executable(bins_folder: str, file_name: str):
if not os.path.exists(os.path.join(bins_folder, file_name)) and not os.path.exists(os.path.join(bins_folder, f"{file_name}.exe")):
print(f"Copy {file_name} executable to '{bins_folder}' here")
exit()
server = None
tari_connector_sample = None
webui_server = None
template_web_server = None
commands = None
try:
if DELETE_EVERYTHING_BUT_TEMPLATES_BEFORE or DELETE_STDOUT_LOGS:
if os.path.exists(DATA_FOLDER):
for file in os.listdir(DATA_FOLDER):
full_path = os.path.join(os.getcwd(), DATA_FOLDER, file)
if os.path.isdir(full_path):
if DELETE_EVERYTHING_BUT_TEMPLATES_BEFORE:
if file != "templates" or DELETE_TEMPLATES:
shutil.rmtree(full_path)
else:
if re.match(r"stdout", file):
shutil.rmtree(full_path)
if USE_BINARY_EXECUTABLE:
print_step("!!! YOU ARE USING EXECUTABLE BINARIES AND NOT COMPILING THE CODE !!!")
print_step(f"Tari folder {TARI_BINS_FOLDER}")
print_step(f"Tari dan folder {TARI_DAN_BINS_FOLDER}")
check_executable(TARI_BINS_FOLDER, "minotari_node")
check_executable(TARI_BINS_FOLDER, "minotari_console_wallet")
check_executable(TARI_BINS_FOLDER, "minotari_sha")
check_executable(TARI_DAN_BINS_FOLDER, "tari_indexer")
check_executable(TARI_DAN_BINS_FOLDER, "tari_dan_wallet_daemon")
check_executable(TARI_DAN_BINS_FOLDER, "tari_dan_wallet_cli")
check_executable(TARI_DAN_BINS_FOLDER, "tari_signaling_server")
check_executable(TARI_DAN_BINS_FOLDER, "tari_validator_node")
check_executable(TARI_DAN_BINS_FOLDER, "tari_validator_node_cli")
try:
os.mkdir(DATA_FOLDER)
except:
pass
try:
os.mkdir(os.path.join(DATA_FOLDER, "stdout"))
except:
pass
# Step 1, start the http server for serving wasm files.
print_step("STARTING HTTP SERVER")
server = Server()
server.run()
if STEPS_RUN_SIGNALLING_SERVER:
print_step("Starting signalling server")
signaling_server.start(local_ip)
if STEPS_RUN_TARI_CONNECTOR_TEST_SITE:
if not STEPS_RUN_SIGNALLING_SERVER:
print("Starting tari-connector test without signaling server is pointless!")
else:
print_step("Starting tari-connector test website")
tari_connector_sample = TariConnectorSample(signaling_server_address=signaling_server.address)
if signaling_server.address:
template_web_server = TemplateWebServer(signaling_server.address)
commands = Commands(tari_connector_sample, template_web_server, server, signaling_server)
webui_server = JrpcWebuiServer(commands)
templates: dict[str, Template] = {}
if STEPS_CREATE_TEMPLATE:
print_step("GENERATING TEMPLATE")
# Generate template
for t in TEMPLATES.split(","):
templates[t] = Template(t)
print_step("STARTING BASE NODE")
# Start base node
base_nodes.add()
print_step("STARTING WALLET")
# Start wallet
base_wallets.add()
# Set ports for miner
miner.start(base_nodes.any().grpc_port, base_wallets.any().grpc_client.get_address().address.hex(), local_ip)
# Mine some blocks
miner.mine((SPAWN_VNS + SPAWN_INDEXERS + SPAWN_WALLETS) * 2 + 13) # Make sure we have enough funds
if SPAWN_INDEXERS > 0:
print_step("STARTING INDEXERS")
def spawn_indexer():
indexers.add()
for id in range(SPAWN_INDEXERS):
threads.add(spawn_indexer, ())
threads.wait()
indexers.wait_for_sync()
# connections = indexer.jrpc_client.get_connections()
# comms_stats = indexer.jrpc_client.get_comms_stats()
# print(connections)
# print(comms_stats)
if SPAWN_INDEXERS == 0 and SPAWN_WALLETS > 0: # type: ignore
raise Exception("Can't create a wallet when there is no indexer")
print_step("CREATING DAN WALLETS DAEMONS")
def create_wallet():
dan_wallets.add()
for id in range(SPAWN_WALLETS):
if SPAWN_INDEXERS > 0:
threads.add(create_wallet, ())
threads.wait()
print(dan_wallets.items.keys())
# Create a new key so we register all VNs to this public key and then create TestAccount_0 for it. So we can claim the fees.
dan_wallets[0].jrpc_client.auth()
new_key = dan_wallets[0].jrpc_client.keys_create()
if not new_key:
raise Exception("Failed to create a key")
claim_public_key: str = new_key["public_key"]
new_key_id: str = new_key["id"]
# Start VNs
print_step("CREATING VNS")
for vn_id in range(SPAWN_VNS):
validator_nodes.add()
validator_nodes.wait_for_sync()
print_step("REGISTER THE VNS")
# Register VNs
validator_nodes.register(claim_public_key)
miner.mine(23)
validator_nodes.wait_for_sync()
indexers.wait_for_sync()
def spawn_wallet(d_id: int):
while True:
try:
dan_wallets[d_id].jrpc_client.auth()
break
except:
time.sleep(1)
print(f"Dan Wallet {d_id} created")
for dwallet_id in dan_wallets:
threads.add(spawn_wallet, (dwallet_id,))
threads.wait()
if STEPS_CREATE_TEMPLATE:
# Publish template
print_step("PUBLISHING TEMPLATE")
for t in templates.values():
t.publish_template(validator_nodes.any().json_rpc_port, server.port, local_ip)
miner.mine(4)
# Wait for the VNs to pickup the blocks from base layer
# TODO wait for VN to download and activate the template
validator_nodes.wait_for_sync()
indexers.wait_for_sync()
if STEPS_CREATE_ACCOUNT:
print_step("CREATING ACCOUNTS")
start = time.time()
def create_account(i: int, amount: int):
name = {"Name": f"TestAccount_{i}"}
dan_wallet_jrpc = dan_wallets[i % SPAWN_WALLETS].jrpc_client
print(f"Account {name['Name']} creation started")
if i == 0:
dan_wallet_jrpc.create_free_test_coins(name, amount, key_id=int(new_key_id))
else:
dan_wallet_jrpc.create_free_test_coins(name, amount)
print(f"Account {name['Name']} created")
threads.set_semaphore_limit(1)
for i in range(CREATE_ACCOUNTS):
threads.add(create_account, (i, 10000000))
threads.wait()
threads.set_semaphore_limit(10)
for did in dan_wallets:
while True:
accs = dan_wallets[did].jrpc_client.accounts_list(0, CREATE_ACCOUNTS)["accounts"]
print("accs", len(accs), end="\r")
if len(accs) != CREATE_ACCOUNTS // SPAWN_WALLETS + (did < CREATE_ACCOUNTS % SPAWN_WALLETS and 1 or 0):
time.sleep(1)
else:
break
for acc in sorted(accs, key=lambda acc: acc["account"]["name"]):
accounts[acc["account"]["name"]] = (acc, dan_wallets[did])
print()
# burns = {}
# accounts = {}
# BURN_AMOUNT = 10000
# print_step(f"BURNING {BURN_AMOUNT}")
# for id in dan_wallets:
# dan_wallet_jrpc = dan_wallets[id].jrpc_client
# account = dan_wallet_jrpc.accounts_list(0, 1)["accounts"][0]
# accounts[id] = account
# public_key = account["public_key"]
# burns[id] = wallet.grpc_client.burn(BURN_AMOUNT, bytes.fromhex(public_key))
# del dan_wallet_jrpc
# del public_key
# del account
# # Wait for the burn to be in the mempool
# print("Waiting for all burns to be in mempool.", end="")
# while base_node.grpc_client.get_mempool_size() != len(dan_wallets):
# print(".", end="")
# time.sleep(1)
# miner.mine(4) # Mine the burns
# print("done")
# print("Wait until they are all mined.", end="")
# while base_node.grpc_client.get_mempool_size() != 0:
# print(".", end="")
# miner.mine(1) # Mine the burns
# time.sleep(1)
# print("done")
# miner.mine(3) # mine 3 more blocks to have confirmation
# validator_nodes.wait_for_sync()
# time.sleep(10)
# print_step("CLAIM BURN")
# for id in dan_wallets:
# dan_wallet_jrpc = dan_wallets[id].jrpc_client
# dan_wallet_jrpc.claim_burn(burns[id], accounts[id])
# del dan_wallet_jrpc
# print_step("CHECKING THE BALANCE")
# for id in dan_wallets:
# # Claim the burn
# for id in dan_wallets:
# dan_wallet_jrpc = dan_wallets[id].jrpc_client
# while (
# dan_wallet_jrpc.get_balances(accounts[id])["balances"][0]["balance"]
# + dan_wallet_jrpc.get_balances(accounts[id])["balances"][0]["confidential_balance"]
# == 0
# ):
# time.sleep(1)
# del dan_wallet_jrpc
# print_step("BURNED AND CLAIMED")
if STEPS_CREATE_TEMPLATE:
print_step("Creating template")
# Call the function
for function in DEFAULT_TEMPLATE_FUNCTION.split("|"):
TEMPLATE_FUNCTION = function.split("=")
FUNCTION_ARGS = len(TEMPLATE_FUNCTION) > 1 and TEMPLATE_FUNCTION[1].split(",") or []
print(TEMPLATE_FUNCTION)
print(FUNCTION_ARGS)
template_name = TEMPLATE_FUNCTION[0].split("::")
template = templates[template_name[0]]
dump_into_account = "!" in template_name[1]
method = template_name[1].replace("!", "")
template.call_function(method, dan_wallets.any().jrpc_client, FUNCTION_ARGS, dump_into_account)
if STRESS_TEST:
stress_test()
print(stats)
if not NONINTERACTIVE:
cli_loop()
else:
print("Non-interactive mode")
while True:
time.sleep(10)
except Exception as ex:
print("Failed setup:", ex)
traceback.print_exc()
except KeyboardInterrupt:
print("ctrl-c pressed during setup")
# del webui_server
del template_web_server
del commands
del tari_connector_sample
del signaling_server
del dan_wallets
del indexers
del validator_nodes
del base_wallets
del base_nodes
del server