Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: provide a WHIP endpoint for audio from external devices #99

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"node": ">=18.15.0"
},
"dependencies": {
"@fastify/bearer-auth": "^9.4.0",
"@fastify/cors": "^8.2.0",
"@fastify/swagger": "^8.3.1",
"@fastify/swagger-ui": "^1.5.0",
Expand Down
10 changes: 9 additions & 1 deletion src/api.test.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
import api from './api';
import { CoreFunctions } from './api_productions_core_functions';
import { ConnectionQueue } from './connection_queue';
import { ProductionManager } from './production_manager';

jest.mock('./db_manager');

describe('api', () => {
it('responds with hello, world!', async () => {
const productionManager = new ProductionManager();
const connectionQueue = new ConnectionQueue();
const server = await api({
title: 'my awesome service',
whipApiKey: 'apikeyindev',
smbServerBaseUrl: 'http://localhost',
endpointIdleTimeout: '60'
endpointIdleTimeout: '60',
productionManager,
coreFunctions: new CoreFunctions(productionManager, connectionQueue)
});
const response = await server.inject({
method: 'GET',
Expand Down
33 changes: 25 additions & 8 deletions src/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@
import { TypeBoxTypeProvider } from '@fastify/type-provider-typebox';
import { Static, Type } from '@sinclair/typebox';
import { FastifyPluginCallback } from 'fastify';
import { getApiProductions } from './api_productions';
import apiProductions, { ApiProductionsOptions } from './api_productions';
import apiWhip, { ApiWhipOptions } from './api_whip';
import { ProductionManager } from './production_manager';

Check warning on line 10 in src/api.ts

View workflow job for this annotation

GitHub Actions / lint

'ProductionManager' is defined but never used
import { CoreFunctions } from './api_productions_core_functions';

Check warning on line 11 in src/api.ts

View workflow job for this annotation

GitHub Actions / lint

'CoreFunctions' is defined but never used

const HelloWorld = Type.String({
description: 'The magical words!'
Expand Down Expand Up @@ -37,20 +40,24 @@
next();
};

export interface ApiOptions {
export interface ApiGeneralOptions {
title: string;
smbServerBaseUrl: string;
endpointIdleTimeout: string;
smbServerApiKey?: string;
}
export type ApiOptions = ApiGeneralOptions &
ApiProductionsOptions &
ApiWhipOptions;

export default async (opts: ApiOptions) => {
const api = fastify({
ignoreTrailingSlash: true
}).withTypeProvider<TypeBoxTypeProvider>();

// register the cors plugin, configure it for better security
api.register(cors);
api.register(cors, {
methods: ['OPTIONS', 'GET', 'POST', 'PATCH', 'DELETE'],
allowedHeaders: ['Content-Type', 'Authorization', 'x-jwt'],
exposedHeaders: ['Content-Type', 'Location']
});

// register the swagger plugins, it will automagically do magic
api.register(swagger, {
Expand All @@ -68,11 +75,21 @@

api.register(healthcheck, { title: opts.title });
// register other API routes here
api.register(await getApiProductions(), {
api.register(apiProductions, {
prefix: 'api/v1',
smbServerBaseUrl: opts.smbServerBaseUrl,
endpointIdleTimeout: opts.endpointIdleTimeout,
smbServerApiKey: opts.smbServerApiKey
smbServerApiKey: opts.smbServerApiKey,
productionManager: opts.productionManager,
coreFunctions: opts.coreFunctions
});
api.register(apiWhip, {
prefix: 'whip',
whipApiKey: opts.whipApiKey,
smbServerBaseUrl: opts.smbServerBaseUrl,
endpointIdleTimeout: opts.endpointIdleTimeout,
smbServerApiKey: opts.smbServerApiKey,
coreFunctions: opts.coreFunctions
});

return api;
Expand Down
20 changes: 5 additions & 15 deletions src/api_productions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,16 @@ import {
} from './models';
import { SmbProtocol } from './smb';
import { ProductionManager } from './production_manager';
import dotenv from 'dotenv';
import { v4 as uuidv4 } from 'uuid';
import { ConnectionQueue } from './connection_queue';
import { CoreFunctions } from './api_productions_core_functions';
import { Log } from './log';
dotenv.config();

const productionManager = new ProductionManager();
const connectionQueue = new ConnectionQueue();
const coreFunctions = new CoreFunctions(productionManager, connectionQueue);

export function checkUserStatus() {
productionManager.checkUserStatus();
}

export interface ApiProductionsOptions {
smbServerBaseUrl: string;
endpointIdleTimeout: string;
smbServerApiKey?: string;
productionManager: ProductionManager;
coreFunctions: CoreFunctions;
}

const apiProductions: FastifyPluginCallback<ApiProductionsOptions> = (
Expand All @@ -50,6 +41,8 @@ const apiProductions: FastifyPluginCallback<ApiProductionsOptions> = (
).toString();
const smb = new SmbProtocol();
const smbServerApiKey = opts.smbServerApiKey || '';
const productionManager = opts.productionManager;
const coreFunctions = opts.coreFunctions;

fastify.post<{
Body: NewProduction;
Expand Down Expand Up @@ -667,7 +660,4 @@ const apiProductions: FastifyPluginCallback<ApiProductionsOptions> = (
next();
};

export async function getApiProductions() {
await productionManager.load();
return apiProductions;
}
export default apiProductions;
91 changes: 91 additions & 0 deletions src/api_productions_core_functions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,97 @@
return sdpOffer;
}

/**
* When handshake is initiated by the client (e.g. with WHIP)
*/
async createConnectionFromOffer(
productionId: string,
lineId: string,
endpointDescription: SmbEndpointDescription,
username: string,
endpointId: string,
sessionId: string,
offer: string,
smb: SmbProtocol,
smbServerUrl: string,
smbServerApiKey: string,
smbConferenceId: string
): Promise<string> {
const ssrcs: MediaStreamsInfoSsrc[] = [];
endpointDescription.audio.ssrcs.forEach((ssrcsNr) => {
ssrcs.push({
ssrc: ssrcsNr.toString(),
cname: uuidv4(),
mslabel: uuidv4(),
label: uuidv4()
});
});
const endpointMediaStreamInfo = {
audio: {
ssrcs: ssrcs
}
};
const connection = new Connection(
username,
endpointMediaStreamInfo,
endpointDescription,
endpointId
);

const parsedOffer = parse(offer);
const answer: SessionDescription = connection.createAnswer(offer);
const sdpAnswer: string = write(answer);

if (sdpAnswer) {
for (const media of parsedOffer.media) {
if (media.type === 'audio') {
endpointDescription.audio.ssrcs = [];
media.ssrcs &&
media.ssrcs
.filter((ssrcs) => ssrcs.attribute === 'msid')
.forEach((ssrcs) =>
endpointDescription.audio.ssrcs.push(parseInt(`${ssrcs.id}`))
);
}
}
for (const media of answer.media) {
if (media.type === 'audio') {
endpointDescription.audio['payload-type'].id = media.rtp[0].payload;
endpointDescription.audio['rtp-hdrexts'] = [];
media.ext &&
media.ext.forEach((ext: any) =>

Check warning on line 134 in src/api_productions_core_functions.ts

View workflow job for this annotation

GitHub Actions / lint

Unexpected any. Specify a different type
endpointDescription.audio['rtp-hdrexts'].push({
id: ext.value,
uri: ext.uri
})
);
}
}
// Configure endpoint
await smb.configureEndpoint(
smbServerUrl,
smbConferenceId,
endpointId,
endpointDescription,
smbServerApiKey
);

this.productionManager.createUserSession(
productionId,
lineId,
sessionId,
username
);
this.productionManager.updateUserEndpoint(
sessionId,
endpointId,
endpointDescription
);
}

return sdpAnswer;
}

async createEndpoint(
smb: SmbProtocol,
smbServerUrl: string,
Expand Down Expand Up @@ -275,7 +366,7 @@
return allLinesResponse;
}

private toStringIfNumber(value: any): string {

Check warning on line 369 in src/api_productions_core_functions.ts

View workflow job for this annotation

GitHub Actions / lint

Unexpected any. Specify a different type
if (typeof value === 'number') {
return String(value);
} else if (typeof value === 'string') {
Expand Down
126 changes: 126 additions & 0 deletions src/api_whip.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import { FastifyPluginCallback } from 'fastify';
import bearerAuthPlugin from '@fastify/bearer-auth';

Check warning on line 2 in src/api_whip.ts

View workflow job for this annotation

GitHub Actions / lint

'bearerAuthPlugin' is defined but never used
import { ErrorResponse } from './models';
import { Type } from '@sinclair/typebox';
import { v4 as uuidv4 } from 'uuid';
import { CoreFunctions } from './api_productions_core_functions';
import { SmbProtocol } from './smb';

export interface ApiWhipOptions {
whipApiKey: string;
smbServerBaseUrl: string;
endpointIdleTimeout: string;
smbServerApiKey?: string;
coreFunctions: CoreFunctions;
}

const apiWhip: FastifyPluginCallback<ApiWhipOptions> = (
fastify,
opts,
next
) => {
fastify.addContentTypeParser(
'application/sdp',
{ parseAs: 'string' },
(req, body, done) => {
done(null, body);
}
);

fastify.addContentTypeParser(
'application/trickle-ice-sdpfrag',
{ parseAs: 'string' },
(req, body, done) => {
done(null, body);
}
);

const smbServerUrl = new URL(
'/conferences/',
opts.smbServerBaseUrl
).toString();
const smb = new SmbProtocol();
const smbServerApiKey = opts.smbServerApiKey || '';
const coreFunctions = opts.coreFunctions;

fastify.post<{
Params: { productionId: string; lineId: string; username: string };
Body: string;
}>(
'/production/:productionId/line/:lineId/username/:username',
{
schema: {
description:
'Initiate ingest of an external source to a production and line',
body: Type.String(),
params: {
productionId: Type.String(),
lineId: Type.String(),
username: Type.String()
},
response: {
201: Type.String(),
500: ErrorResponse
}
}
},
async (request, reply) => {
try {
const { productionId, lineId, username } = request.params;
const sdpOffer = request.body;
const sessionId: string = uuidv4();

const smbConferenceId = await coreFunctions.createConferenceForLine(
smb,
smbServerUrl,
smbServerApiKey,
productionId,
lineId
);
const endpointId: string = uuidv4();
const endpoint = await coreFunctions.createEndpoint(
smb,
smbServerUrl,
smbServerApiKey,
smbConferenceId,
endpointId,
true,
true,
parseInt(opts.endpointIdleTimeout, 10)
);
if (!endpoint.audio) {
throw new Error('Missing audio when creating sdp offer for endpoint');
}
if (!endpoint.audio.ssrcs) {
throw new Error('Missing ssrcs when creating sdp offer for endpoint');
}

const sdpAnswer = await coreFunctions.createConnectionFromOffer(
productionId,
lineId,
endpoint,
username,
endpointId,
sessionId,
sdpOffer,
smb,
smbServerUrl,
smbServerApiKey,
smbConferenceId
);
const locationUrl = `/whip/session/${sessionId}`;
reply.headers({
'Content-Type': 'application/sdp',
Location: locationUrl
});
reply.code(201).send(sdpAnswer);
} catch (err) {
console.error(err);
reply.code(500).send({ message: 'Unhandled error: ' + err });
}
}
);
next();
};

export default apiWhip;
Loading
Loading