-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathcementServer.js
100 lines (85 loc) · 3.27 KB
/
cementServer.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
/**
* Module dependencies.
*/
var amqp = require('amqp'),
routes = require('./routes'),
config = require('./config'),
restTools = require('./lib/restTools'),
resources = require('./lib/resourceService'),
path = require('path');
(function cementServer() {
this.connections = [];
function createConnection(i) {
var conn = amqp.createConnection({
port: config.cementServer.queueServer[i].port,
host: config.cementServer.queueServer[i].host,
login: config.cementServer.queueServer[i].login,
password: config.cementServer.queueServer[i].password,
heartbeat: config.cementServer.queueServer[i].heartbeat
});
// Events for this queue
conn.on('ready', (function() {
console.log("Message Broker, Queue ready --> Connected to one Message Broker");
this.connections.push(conn);
}).bind(this));
conn.on('close', (function() {
console.log("Message Broker, Queue closed");
this.removeConnection(conn);
}).bind(this));
conn.on('error', (function(error) {
console.log("Message Broker, Queue error");
this.removeConnection(conn);
}).bind(this));
conn.on('heartbeat', (function() {
console.log("Message Broker, Heartbeat");
}).bind(this));
}
function removeConnection(conn) {
var index = this.connections.indexOf(conn);
if (index >= 0) {
this.connections.splice(index, 1);
}
}
function pushToTheQueue(data) {
// Send to one of the connections that is connected to a queue
var conNum = Math.floor(Math.random()*100%this.connections.length);
this.connections[conNum].publish(config.cementServer.queueName, data);
console.log ("Loading through connection " + conNum + ' -> ' +
JSON.stringify(data));
}
function addCement() {
resources.create(config.cementServer.type, function (err, result) {
console.log(config.cementServer.type + " created");
});
}
function loadCement() {
// Load the cement to the "truck" (the Queue ;)
// As soon as we load it to the truck, the resource is delivered
resources.findAndRemove(config.cementServer.type, function (error, removedCement) {
if (error) {
console.log('Error loading cement into the truck: ' + error);
} else if (removedCement) {
pushToTheQueue(removedCement);
console.log(config.cementServer.type + " loaded into the truck!");
} else {
console.log('Cement not found. Wait it to grow');
}
});
}
function start() {
// Create connection to the broker
for (var i = config.cementServer.queueServer.length - 1; i >= 0; i--) {
process.nextTick(createConnection.bind(this, i));
}
}
resources.createServer(config.cementServer.type,
config.cementServer.queueName,
config.cementServer.description,
function (error) {
start();
setInterval(function() {
addCement();
loadCement();
}, config.cementServer.period);
});
})();