This repository has been archived by the owner on Sep 21, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtr_connector.rb
187 lines (155 loc) · 5.75 KB
/
tr_connector.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
require 'socket'
require 'event_bus'
require 'set'
require 'logger'
require 'wise_omf/server'
require_relative '../resources/event_type'
# the TRConnector handles the tcp socket connection to the testbed runtime
class TRConnector
@@log = Logger.new(STDOUT)
@@log.level = Logger::DEBUG
include Singleton
include De::Uniluebeck::Itm::Tr::Iwsn::Messages
@thread
@socket
@abort = false
def initialize
EventBus.subscribe(Events::DOWN_FLASH_IMAGE, self, :on_flash_image)
EventBus.subscribe(Events::DOWN_MESSAGE, self, :on_message)
EventBus.subscribe(Events::DOWN_RESET, self, :on_nodes_reset)
EventBus.subscribe(Events::DOWN_ARE_NODES_CONNECTED, self, :on_nodes_connected_request)
EventBus.subscribe(Events::DOWN_ARE_NODES_ALIVE, self, :on_nodes_alive_request)
end
def start
@thread = Thread.new {
while !@abort
begin
info "All Sockets: #{TCPSocket.gethostbyname(CONFIG[:trhost])}"
@socket = TCPSocket.new(CONFIG[:trhost], CONFIG[:trport])
info "Connected to #{CONFIG[:trhost]} on #{CONFIG[:trport]}."
break
rescue
warn "Can't connect to testbed runtime. Sleeping for 10 seconds"
sleep 10
end
end
while !@abort
begin
lengthField = @socket.read(4)
unless lengthField.nil?
length = lengthField.unpack('N').first
data = @socket.read(length)
epm = ExternalPluginMessage.parse(data)
#info "parse success"
case epm.type
when ExternalPluginMessage::Type::INTERNAL_MESSAGE
self.handleInternalMessage(epm)
when ExternalPluginMessage::Type::IWSN_MESSAGE
self.handleIwsnMessage(epm)
end
end
rescue Exception => e
error e
end
end
info ">> LOOP End"
socket.close
}
end
def write(message)
@socket.puts(message) unless @socket.nil?
end
# Event Bus Events (Downstream)
def on_flash_image(payload)
pack_and_send_request(payload)
end
def on_message(payload)
pack_and_send_request(payload)
end
def on_nodes_reset(payload)
pack_and_send_request(payload)
end
def on_nodes_connected_request(payload)
pack_and_send_request(payload)
end
def on_nodes_alive_request(payload)
pack_and_send_request(payload)
end
def pack_request(payload)
message = Message.new
message.type = Message::Type::REQUEST
message.request = payload[:request]
external = ExternalPluginMessage.new
external.type = ExternalPluginMessage::Type::IWSN_MESSAGE
external.iwsn_message = message
str = external.serialize_to_string
@@log.debug "External Message: #{external.to_hash}"
@@log.debug "Bytes: #{str.bytes}"
@@log.debug "Bytesize: #{str.bytesize}"
length = [str.bytesize].pack('N')
return length, str
end
def pack_and_send_request(payload)
length, msg = pack_request(payload)
unless @socket.nil?
@socket.send(length, 0)
@socket.send(msg, 0)
else
@@log.error('Can\'t write to socket.')
end
end
# Testbed Events (Upstream)
def handleInternalMessage(epm)
#info "Internal Message: #{epm.internal_message.to_s}"
case epm.internal_message.type
when InternalMessage::Type::RESERVATION_EVENT
re = epm.internal_message.reservationEvent
case re.type
when ReservationEvent::Type::STARTED
EventBus.publish(Events::RESERVATION_STARTED, event: re)
when ReservationEvent::Type::ENDED
EventBus.publish(Events::RESERVATION_ENDED, event: re)
end
# no other cases atm
end
end
def handleIwsnMessage(epm)
#info "Iwsn Message: #{epm.iwsn_message.to_s}"
message = epm.iwsn_message
debug "Received iwsn message from testbed: #{message.to_hash}"
case message.type
when Message::Type::EVENT
handleIwsnEvent(message.event)
when Message::Type::RESPONSE
EventBus.publish(Events::IWSN_RESPONSE, event: message.response, requestId: message.response.requestId, nodeUrns: Set.new([message.response.nodeUrn]))
when Message::Type::PROGRESS
EventBus.publish(Events::IWSN_PROGRESS, event: message.progress, requestId: message.progress.requestId, nodeUrns: Set.new([message.progress.nodeUrn]))
when Message::Type::GET_CHANNELPIPELINES_RESPONSE
EventBus.publish(Events::IWSN_GET_CHANNEL_PIPELINES_RESPONSE, event: message.getChannelPipelinesResponse, requestId: message.getChannelPipelinesResponse.requestId)
end
end
def handleIwsnEvent(event)
case event.type
when Event::Type::UPSTREAM_MESSAGE
EventBus.publish(Events::IWSN_UPSTREAM_MESSAGE, event: event.upstreamMessageEvent, eventId: event.eventId, nodeUrns: Set.new([event.upstreamMessageEvent.sourceNodeUrn]))
when Event::Type::DEVICES_DETACHED
EventBus.publish(Events::IWSN_DEVICES_DETACHED, event: event.devicesDetachedEvent, eventId: event.eventId, nodeUrns: Set.new(event.devicesDetachedEvent.nodeUrns))
when Event::Type::DEVICES_ATTACHED
EventBus.publish(Events::IWSN_DEVICES_ATTACHED, event: event.devicesAttachedEvent, eventId: event.eventId, nodeUrns: Set.new(event.devicesAttachedEvent.nodeUrns))
when Event::Type::NOTIFICATION
unless event.notificationEvent.nodeUrn.nil?
EventBus.publish(Events::IWSN_NOTIFICATION, event: event.notificationEvent, eventId: event.eventId, nodeUrns: Set.new([event.notificationEvent.nodeUrn]))
else
EventBus.publish(Events::IWSN_NOTIFICATION, event: event.notificationEvent, eventId: event.eventId)
end
end
end
def abort
if !@abort
@abort = true
info 'Aborting TRConnector'
@thread.kill
@socket.close unless @socket.nil?
end
end
end