Skip to content

Commit

Permalink
feat: provide WHIP endpoint for external audio devices
Browse files Browse the repository at this point in the history
  • Loading branch information
birme committed Jun 28, 2024
1 parent 14108d6 commit 8b25f89
Show file tree
Hide file tree
Showing 10 changed files with 394 additions and 32 deletions.
10 changes: 10 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"node": ">=18.15.0"
},
"dependencies": {
"@fastify/bearer-auth": "^9.4.0",
"@fastify/cors": "^8.2.0",
"@fastify/swagger": "^8.3.1",
"@fastify/swagger-ui": "^1.5.0",
Expand Down
10 changes: 9 additions & 1 deletion src/api.test.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
import api from './api';
import { CoreFunctions } from './api_productions_core_functions';
import { ConnectionQueue } from './connection_queue';
import { ProductionManager } from './production_manager';

jest.mock('./db_manager');

describe('api', () => {
it('responds with hello, world!', async () => {
const productionManager = new ProductionManager();
const connectionQueue = new ConnectionQueue();
const server = await api({
title: 'my awesome service',
whipApiKey: 'apikeyindev',
smbServerBaseUrl: 'http://localhost',
endpointIdleTimeout: '60'
endpointIdleTimeout: '60',
productionManager,
coreFunctions: new CoreFunctions(productionManager, connectionQueue)
});
const response = await server.inject({
method: 'GET',
Expand Down
33 changes: 25 additions & 8 deletions src/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ import swaggerUI from '@fastify/swagger-ui';
import { TypeBoxTypeProvider } from '@fastify/type-provider-typebox';
import { Static, Type } from '@sinclair/typebox';
import { FastifyPluginCallback } from 'fastify';
import { getApiProductions } from './api_productions';
import apiProductions, { ApiProductionsOptions } from './api_productions';
import apiWhip, { ApiWhipOptions } from './api_whip';
import { ProductionManager } from './production_manager';

Check warning on line 10 in src/api.ts

View workflow job for this annotation

GitHub Actions / lint

'ProductionManager' is defined but never used
import { CoreFunctions } from './api_productions_core_functions';

Check warning on line 11 in src/api.ts

View workflow job for this annotation

GitHub Actions / lint

'CoreFunctions' is defined but never used

const HelloWorld = Type.String({
description: 'The magical words!'
Expand Down Expand Up @@ -37,20 +40,24 @@ const healthcheck: FastifyPluginCallback<HealthcheckOptions> = (
next();
};

export interface ApiOptions {
export interface ApiGeneralOptions {
title: string;
smbServerBaseUrl: string;
endpointIdleTimeout: string;
smbServerApiKey?: string;
}
export type ApiOptions = ApiGeneralOptions &
ApiProductionsOptions &
ApiWhipOptions;

export default async (opts: ApiOptions) => {
const api = fastify({
ignoreTrailingSlash: true
}).withTypeProvider<TypeBoxTypeProvider>();

// register the cors plugin, configure it for better security
api.register(cors);
api.register(cors, {
methods: ['OPTIONS', 'GET', 'POST', 'PATCH', 'DELETE'],
allowedHeaders: ['Content-Type', 'Authorization', 'x-jwt'],
exposedHeaders: ['Content-Type', 'Location']
});

// register the swagger plugins, it will automagically do magic
api.register(swagger, {
Expand All @@ -68,11 +75,21 @@ export default async (opts: ApiOptions) => {

api.register(healthcheck, { title: opts.title });
// register other API routes here
api.register(await getApiProductions(), {
api.register(apiProductions, {
prefix: 'api/v1',
smbServerBaseUrl: opts.smbServerBaseUrl,
endpointIdleTimeout: opts.endpointIdleTimeout,
smbServerApiKey: opts.smbServerApiKey
smbServerApiKey: opts.smbServerApiKey,
productionManager: opts.productionManager,
coreFunctions: opts.coreFunctions
});
api.register(apiWhip, {
prefix: 'whip',
whipApiKey: opts.whipApiKey,
smbServerBaseUrl: opts.smbServerBaseUrl,
endpointIdleTimeout: opts.endpointIdleTimeout,
smbServerApiKey: opts.smbServerApiKey,
coreFunctions: opts.coreFunctions
});

return api;
Expand Down
20 changes: 5 additions & 15 deletions src/api_productions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,16 @@ import {
} from './models';
import { SmbProtocol } from './smb';
import { ProductionManager } from './production_manager';
import dotenv from 'dotenv';
import { v4 as uuidv4 } from 'uuid';
import { ConnectionQueue } from './connection_queue';
import { CoreFunctions } from './api_productions_core_functions';
import { Log } from './log';
dotenv.config();

const productionManager = new ProductionManager();
const connectionQueue = new ConnectionQueue();
const coreFunctions = new CoreFunctions(productionManager, connectionQueue);

export function checkUserStatus() {
productionManager.checkUserStatus();
}

export interface ApiProductionsOptions {
smbServerBaseUrl: string;
endpointIdleTimeout: string;
smbServerApiKey?: string;
productionManager: ProductionManager;
coreFunctions: CoreFunctions;
}

const apiProductions: FastifyPluginCallback<ApiProductionsOptions> = (
Expand All @@ -50,6 +41,8 @@ const apiProductions: FastifyPluginCallback<ApiProductionsOptions> = (
).toString();
const smb = new SmbProtocol();
const smbServerApiKey = opts.smbServerApiKey || '';
const productionManager = opts.productionManager;
const coreFunctions = opts.coreFunctions;

fastify.post<{
Body: NewProduction;
Expand Down Expand Up @@ -667,7 +660,4 @@ const apiProductions: FastifyPluginCallback<ApiProductionsOptions> = (
next();
};

export async function getApiProductions() {
await productionManager.load();
return apiProductions;
}
export default apiProductions;
91 changes: 91 additions & 0 deletions src/api_productions_core_functions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,97 @@ export class CoreFunctions {
return sdpOffer;
}

/**
* When handshake is initiated by the client (e.g. with WHIP)
*/
async createConnectionFromOffer(
productionId: string,
lineId: string,
endpointDescription: SmbEndpointDescription,
username: string,
endpointId: string,
sessionId: string,
offer: string,
smb: SmbProtocol,
smbServerUrl: string,
smbServerApiKey: string,
smbConferenceId: string
): Promise<string> {
const ssrcs: MediaStreamsInfoSsrc[] = [];
endpointDescription.audio.ssrcs.forEach((ssrcsNr) => {
ssrcs.push({
ssrc: ssrcsNr.toString(),
cname: uuidv4(),
mslabel: uuidv4(),
label: uuidv4()
});
});
const endpointMediaStreamInfo = {
audio: {
ssrcs: ssrcs
}
};
const connection = new Connection(
username,
endpointMediaStreamInfo,
endpointDescription,
endpointId
);

const parsedOffer = parse(offer);
const answer: SessionDescription = connection.createAnswer(offer);
const sdpAnswer: string = write(answer);

if (sdpAnswer) {
for (const media of parsedOffer.media) {
if (media.type === 'audio') {
endpointDescription.audio.ssrcs = [];
media.ssrcs &&
media.ssrcs
.filter((ssrcs) => ssrcs.attribute === 'msid')
.forEach((ssrcs) =>
endpointDescription.audio.ssrcs.push(parseInt(`${ssrcs.id}`))
);
}
}
for (const media of answer.media) {
if (media.type === 'audio') {
endpointDescription.audio['payload-type'].id = media.rtp[0].payload;
endpointDescription.audio['rtp-hdrexts'] = [];
media.ext &&
media.ext.forEach((ext: any) =>

Check warning on line 134 in src/api_productions_core_functions.ts

View workflow job for this annotation

GitHub Actions / lint

Unexpected any. Specify a different type
endpointDescription.audio['rtp-hdrexts'].push({
id: ext.value,
uri: ext.uri
})
);
}
}
// Configure endpoint
await smb.configureEndpoint(
smbServerUrl,
smbConferenceId,
endpointId,
endpointDescription,
smbServerApiKey
);

this.productionManager.createUserSession(
productionId,
lineId,
sessionId,
username
);
this.productionManager.updateUserEndpoint(
sessionId,
endpointId,
endpointDescription
);
}

return sdpAnswer;
}

async createEndpoint(
smb: SmbProtocol,
smbServerUrl: string,
Expand Down
126 changes: 126 additions & 0 deletions src/api_whip.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import { FastifyPluginCallback } from 'fastify';
import bearerAuthPlugin from '@fastify/bearer-auth';

Check warning on line 2 in src/api_whip.ts

View workflow job for this annotation

GitHub Actions / lint

'bearerAuthPlugin' is defined but never used
import { ErrorResponse } from './models';
import { Type } from '@sinclair/typebox';
import { v4 as uuidv4 } from 'uuid';
import { CoreFunctions } from './api_productions_core_functions';
import { SmbProtocol } from './smb';

export interface ApiWhipOptions {
whipApiKey: string;
smbServerBaseUrl: string;
endpointIdleTimeout: string;
smbServerApiKey?: string;
coreFunctions: CoreFunctions;
}

const apiWhip: FastifyPluginCallback<ApiWhipOptions> = (
fastify,
opts,
next
) => {
fastify.addContentTypeParser(
'application/sdp',
{ parseAs: 'string' },
(req, body, done) => {
done(null, body);
}
);

fastify.addContentTypeParser(
'application/trickle-ice-sdpfrag',
{ parseAs: 'string' },
(req, body, done) => {
done(null, body);
}
);

const smbServerUrl = new URL(
'/conferences/',
opts.smbServerBaseUrl
).toString();
const smb = new SmbProtocol();
const smbServerApiKey = opts.smbServerApiKey || '';
const coreFunctions = opts.coreFunctions;

fastify.post<{
Params: { productionId: string; lineId: string; username: string };
Body: string;
}>(
'/production/:productionId/line/:lineId/username/:username',
{
schema: {
description:
'Initiate ingest of an external source to a production and line',
body: Type.String(),
params: {
productionId: Type.String(),
lineId: Type.String(),
username: Type.String()
},
response: {
201: Type.String(),
500: ErrorResponse
}
}
},
async (request, reply) => {
try {
const { productionId, lineId, username } = request.params;
const sdpOffer = request.body;
const sessionId: string = uuidv4();

const smbConferenceId = await coreFunctions.createConferenceForLine(
smb,
smbServerUrl,
smbServerApiKey,
productionId,
lineId
);
const endpointId: string = uuidv4();
const endpoint = await coreFunctions.createEndpoint(
smb,
smbServerUrl,
smbServerApiKey,
smbConferenceId,
endpointId,
true,
true,
parseInt(opts.endpointIdleTimeout, 10)
);
if (!endpoint.audio) {
throw new Error('Missing audio when creating sdp offer for endpoint');
}
if (!endpoint.audio.ssrcs) {
throw new Error('Missing ssrcs when creating sdp offer for endpoint');
}

const sdpAnswer = await coreFunctions.createConnectionFromOffer(
productionId,
lineId,
endpoint,
username,
endpointId,
sessionId,
sdpOffer,
smb,
smbServerUrl,
smbServerApiKey,
smbConferenceId
);
const locationUrl = `/whip/session/${sessionId}`;
reply.headers({
'Content-Type': 'application/sdp',
Location: locationUrl
});
reply.code(201).send(sdpAnswer);
} catch (err) {
console.error(err);
reply.code(500).send({ message: 'Unhandled error: ' + err });
}
}
);
next();
};

export default apiWhip;
Loading

0 comments on commit 8b25f89

Please sign in to comment.