-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathfan_out.bdl
76 lines (65 loc) · 3.13 KB
/
fan_out.bdl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
#!benchDL
#######
# Broadcast Scenario:
# 10k subscribers reading from the same topic "channels/<MF_MZBENCH_CHANNEL_ID>/messages"
# 1 publisher sending 1 msg/s to topic "channels/<MF_MZBENCH_CHANNEL_ID>/messages"
# Overall Msg rate: 1k msg/s
# Message Size: 100 random bytes
# Runtime: 5 min
# QoS level: 2
# NOTE: Scenario's values can be changed with environment variables predefined in defaults
# Set environment variables to connect to Mainflux instance:
# MF_MZBENCH_MQTT_ENPOINT = IP/domain of MQTT endpoint on Mainflux
# MF_MZBENCH_MQTT_PORT = Port of MQTT endpoint on Mainflux
# MF_MZBENCH_THING_1_ID = First thing ID of pre-provisioned Mainflux thing
# MF_MZBENCH_THING_1_KEY = First thing key of pre-provisioned Mainflux thing
# MF_MZBENCH_THING_2_ID = Second thing ID of pre-provisioned Mainflux thing
# MF_MZBENCH_THING_2_KEY = Second thing key of pre-provisioned Mainflux thing.
# MF_MZBENCH_CHANNEL_ID = ID of pre-provisioned channel on which both things are connected
#######
defaults(
"MF_MZBENCH_SUB_NUM" = 10000,
"MF_MZBENCH_PUB_TIME" = 5,
"MF_MZBENCH_PUB_RATE" = 1,
"MF_MZBENCH_MSG_SIZE" = 100,
"MF_MZBENCH_QOS" = 2,
"MF_MZBENCH_MQTT_ENDPOINT" = "127.0.0.1",
"MF_MZBENCH_MQTT_PORT" = 1883
)
make_install(git = "https://github.com/erlio/vmq_mzbench.git",
branch = "master")
pool(size = numvar("MF_MZBENCH_SUB_NUM"),
worker_type = mqtt_worker,
worker_start = poisson(numvar("MF_MZBENCH_SUB_NUM") rps)):
connect(host = var("MF_MZBENCH_MQTT_ENDPOINT"),
port = numvar("MF_MZBENCH_MQTT_PORT"),
client = fixed_client_id("subscriber", worker_id()),
clean_session = true,
keepalive_interval = 60,
username = var("MF_MZBENCH_THING_1_ID"),
password = var("MF_MZBENCH_THING_1_KEY"),
proto_version = 4,
reconnect_timeout = 4
)
set_signal(subscribe1, 1)
wait_signal(subscribe1, numvar("MF_MZBENCH_SUB_NUM"))
wait(10 sec)
subscribe(sprintf("channels/~s/messages", [var("MF_MZBENCH_CHANNEL_ID")]), numvar("MF_MZBENCH_QOS"))
pool(size = 1,
worker_type = mqtt_worker):
connect(host = var("MF_MZBENCH_MQTT_ENDPOINT"),
port = numvar("MF_MZBENCH_MQTT_PORT"),
client = fixed_client_id("publisher", worker_id()),
clean_session = true,
keepalive_interval = 60,
username = var("MF_MZBENCH_THING_2_ID"),
password = var("MF_MZBENCH_THING_2_KEY"),
proto_version = 4,
reconnect_timeout = 4
)
set_signal(connect1, 1)
wait_signal(connect1, 1)
wait(4 sec)
loop(time = numvar("MF_MZBENCH_PUB_TIME") min, rate = numvar("MF_MZBENCH_PUB_RATE") rps):
publish(sprintf("channels/~s/messages", [var("MF_MZBENCH_CHANNEL_ID")]), random_binary(numvar("MF_MZBENCH_MSG_SIZE")), numvar("MF_MZBENCH_QOS"))
disconnect()