Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow receive command notifications from CB #800

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGES_NEXT_RELEASE
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
- allow receive command notifications from CB (iotagent-node-lib#1455)
- Fix: check endpoint expression when execute http command
- Fix: use config.defaultTransport (from config.js or IOTA_DEFAULT_TRANSPORT env var) instead of magic 'HTTP' at provision device
- Add: X-Processing-Time response header with processing time (in milliseconds) expended by current HTTP measure (iotagent-node-lib#1650)
- Add: print also IOTA_CONFIG_RETRIEVAL, IOTA_DEFAULT_KEY, IOTA_DEFAULT_TRANSPORT env var values at iotagent startup
- Add: print also IOTA_CONFIG_RETRIEVAL, IOTA_DEFAULT_KEY, IOTA_DEFAULT_TRANSPORT env var values at iotagent startup
58 changes: 50 additions & 8 deletions lib/bindings/HTTPBinding.js
Original file line number Diff line number Diff line change
Expand Up @@ -756,28 +756,70 @@ function stop(callback) {
}
}

function sendPushNotifications(device, values, callback) {
const executions = _.flatten(values.map(commandHandler.generateCommandExecution.bind(null, null, device)));
function sendPushNotifications(device, group, values, callback) {
const executions = _.flatten(
values.map(commandHandler.generateCommandExecution.bind(null, group.apikey, device, group))
);

async.series(executions, function (error) {
callback(error);
});
}

function storePollNotifications(device, values, callback) {
function storePollNotifications(device, group, values, callback) {
function addPollNotification(item, innerCallback) {
iotAgentLib.addCommand(device.service, device.subservice, device.id, item, innerCallback);
}

async.map(values, addPollNotification, callback);
}

function notificationHandler(device, values, callback) {
if (device.endpoint) {
sendPushNotifications(device, values, callback);
} else {
storePollNotifications(device, values, callback);
config.getLogger().debug(context, 'values for command %j and device %j', values, device);

function invokeWithConfiguration(apiKey, callback) {
let group = {};
iotAgentLib.getConfigurationSilently(config.getConfig().iota.defaultResource || '', apiKey, function (
error,
foundGroup
) {
if (!error) {
group = foundGroup;
}
var cmdValue = { type: 'command' };
for (let val of values) {
if (val.name === 'cmd') {
cmdValue.name = val.value;
} else if (val.name === 'params') {
cmdValue.value = val.value;
} else {
// other fields like status, info, onDelivered, OnError
cmdValue[val.name] = val.value;
}
}
var cmdValues = [cmdValue];
config.getLogger().debug(context, 'cmdValues %j', cmdValues);
iotAgentLib.executeUpdateSideEffects(
device,
device.id,
device.type,
device.service,
device.subservice,
cmdValues,
function () {
if (device.endpoint || group.endpoint) {
sendPushNotifications(device, group, cmdValues, callback);
} else {
storePollNotifications(device, group, cmdValues, callback);
}
}
);
});
}

async.waterfall(
[apply(iotaUtils.getEffectiveApiKey, device.service, device.subservice, device), invokeWithConfiguration],
callback
);
}

exports.start = start;
Expand Down
36 changes: 36 additions & 0 deletions lib/iotagent-json.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,40 @@ function configurationHandler(configuration, callback) {
}
}

/**
* Calls all the command execution handlers for each transport protocol binding whenever a new notification request
* arrives from the Context Broker.
*
* @param {Object} device Device data object containing all stored information about the device.
* @param {Array} values Values recieved in the notification.
*/
function notificationHandler(device, values, callback) {
function invokeWithConfiguration(apiKey, callback) {
let group = {};
iotAgentLib.getConfigurationSilently(config.getConfig().iota.defaultResource || '', apiKey, function (
error,
foundGroup
) {
if (!error) {
group = foundGroup;
}
transportSelector.applyFunctionFromBinding(
[device, values],
'notificationHandler',
device.transport ||
(group && group.transport ? group.transport : undefined) ||
config.getConfig().defaultTransport,
callback
);
});
}

async.waterfall(
[apply(iotaUtils.getEffectiveApiKey, device.service, device.subservice, device), invokeWithConfiguration],
callback
);
}

/**
* Handles incoming updateContext requests related with lazy attributes. This handler is still just registered,
* but empty.
Expand Down Expand Up @@ -156,6 +190,8 @@ function start(newConfig, callback) {

if (config.getConfig().configRetrieval) {
iotAgentLib.setNotificationHandler(configurationNotificationHandler);
} else {
iotAgentLib.setNotificationHandler(notificationHandler);
}

transportSelector.startTransportBindings(newConfig, callback);
Expand Down
109 changes: 109 additions & 0 deletions test/unit/ngsiv2/HTTP_commands_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -388,3 +388,112 @@ describe('HTTP: Commands from groups', function () {
});
});
});

describe('HTTP: Commands from CB notifications', function () {
beforeEach(function (done) {
config.logLevel = 'INFO';

nock.cleanAll();

contextBrokerMock = nock('http://192.168.1.1:1026')
.matchHeader('fiware-service', 'smartgondor')
.matchHeader('fiware-servicepath', '/gardens')
.post('/v2/registrations')
.reply(201, null, { Location: '/v2/registrations/6319a7f5254b05844116584d' });

iotagentMqtt.start(config, function () {
done();
});
});

afterEach(function (done) {
nock.cleanAll();
async.series([iotAgentLib.clearAll, iotagentMqtt.stop], done);
});
describe('When a POST measure arrives for an unprovisioned device in a command group', function () {
const optionsMeasure = {
url: 'http://localhost:' + config.http.port + '/iot/json',
method: 'POST',
json: {
h: '33'
},
headers: {
'fiware-service': 'smartgondor',
'fiware-servicepath': '/gardens'
},
qs: {
i: 'JSON_UNPROVISIONED',
k: 'KL223HHV8732SFL1'
}
};
// This mock does not check the payload since the aim of the test is not to verify
// device provisioning functionality. Appropriate verification is done in tests under
// provisioning folder of iotagent-node-lib
beforeEach(function (done) {
contextBrokerMock
.matchHeader('fiware-service', 'smartgondor')
.matchHeader('fiware-servicepath', '/gardens')
.post(
'/v2/entities?options=upsert',
utils.readExampleFile('./test/unit/ngsiv2/contextRequests/unprovisionedDevice3.json')
)
.reply(204);

request(groupCreation, function (error, response, body) {
done();
});
});

it('should send its value to the Context Broker', function (done) {
request(optionsMeasure, function (error, result, body) {
contextBrokerMock.done();
done();
});
});

describe('When a CB notification with a command arrive to the Agent for a device with the HTTP protocol', function () {
const commandOptions = {
url: 'http://localhost:' + config.iota.server.port + '/notify',
method: 'POST',
json: utils.readExampleFile('./test/unit/ngsiv2/contextRequests/notifyCommand.json'),
headers: {
'fiware-service': 'smartgondor',
'fiware-servicepath': '/gardens'
}
};
beforeEach(function () {
contextBrokerMock
.matchHeader('fiware-service', 'smartgondor')
.matchHeader('fiware-servicepath', '/gardens')
.post(
'/v2/entities?options=upsert',
utils.readExampleFile('./test/unit/ngsiv2/contextRequests/updateStatus9.json')
)
.reply(204);
contextBrokerMock
.matchHeader('fiware-service', 'smartgondor')
.matchHeader('fiware-servicepath', '/gardens')
.post(
'/v2/entities?options=upsert',
utils.readExampleFile('./test/unit/ngsiv2/contextRequests/updateStatus10.json')
)
.reply(204);
mockedClientServer = nock('http://localhost:9876')
.post('/command', function (body) {
return body.cmd1 || body.cmd1.data || body.cmd1.data === 22;
})
.reply(200, '{"cmd1":{"data":"22"}}');
});
it('should return a 200 OK without errors', function (done) {
request(optionsMeasure, function (error, result, body) {
request(commandOptions, function (error, response, body) {
should.not.exist(error);
response.statusCode.should.equal(200);
contextBrokerMock.done();
done();
});
});
});
});
});
});
64 changes: 64 additions & 0 deletions test/unit/ngsiv2/contextRequests/notifyCommand.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
{
"subscriptionId": "60b0cedd497e8b681d40b58e",
"data": [{
"id": "123456abcdefg",
"type": "cmd1Execution",
"targetEntityId": {
"type": "Text",
"value": "TheLightType:JSON_UNPROVISIONED",
"metadata": {}
},
"targetEntityType": {
"type": "Text",
"value": "TheLightType",
"metadata": {}
},
"execTs": {
"type": "DateTime",
"value": "2020-05-27T00:00:00.000Z",
"metadata": {}
},
"cmd": {
"type": "Text",
"value": "cmd1",
"metadata": {}
},
"params": {
"type": "Text",
"value": { "data": "22" },
"metadata": {}
},
"status": {
"type": "Text",
"value": "FORWARDED",
"metadata": {}
},
"info": {
"type": "Text",
"value": null,
"metadata": {}
},
"onDelivered": {
"type": "Request"
},
"onOk": {
"type": "Request"
},
"onError": {
"type": "Request"
},
"onInfo": {
"type": "Request"
},
"cmdExecution": {
"type": "value",
"value": true,
"metadata": {}
},
"dateExpiration": {
"type": "DateTime",
"value": "2030-05-27T20:00:00.000Z",
"metadata": {}
}
}]
}
Loading