Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
cirospaciari committed Jan 13, 2025
1 parent 85b27da commit 25eeff3
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 6 deletions.
1 change: 1 addition & 0 deletions src/bun.js/bindings/ErrorCode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ export default [
["ERR_POSTGRES_IDLE_TIMEOUT", Error, "PostgresError"],
["ERR_POSTGRES_CONNECTION_TIMEOUT", Error, "PostgresError"],
["ERR_POSTGRES_LIFETIME_TIMEOUT", Error, "PostgresError"],
["ERR_POSTGRES_INVALID_TRANSACTION_STATE", Error, "PostgresError"],

// S3
["ERR_S3_MISSING_CREDENTIALS", Error],
Expand Down
114 changes: 108 additions & 6 deletions src/js/bun/sql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,99 @@ init(
},
);

function onQueryFinish(onClose) {
this.queries.delete(onClose);
}
class ConnectionWithState {
pool: ConnectionPool;
connection: ReturnType<typeof createConnection>;
state: "pending" | "connected" | "closed" = "pending";
storedError: Error | null = null;
queries: Set<(err: Error) => void> = new Set();

#onConnected(err, _) {
this.storedError = err;
this.state = err ? "closed" : "connected";
this.pool.release(this);
}
#onClose(err) {
this.state = "closed";
this.connection = null;
this.storedError = err;

// remove from ready connections if its there
this.pool.readyConnections.delete(this);
const queries = new Set(this.queries);
this.queries.clear();
for (const onClose of queries) {
onClose(err);
}

// we need to reconnect
// lets use a retry strategy
// TODO: implement retry strategy, maxLifetime, idleTimeout, connectionTimeout
}
constructor(connectionInfo, pool: ConnectionPool) {
//TODO: maxLifetime, idleTimeout, connectionTimeout
this.connection = createConnection(connectionInfo, this.#onConnected.bind(this), this.#onClose.bind(this));
this.state = "pending";
this.pool = pool;
}

bindQuery(query: Query, onClose: (err: Error) => void) {
this.queries.add(onClose);
// @ts-ignore
query.finally(onQueryFinish.bind(this, onClose));
}
}
class ConnectionPool {
connectionInfo: any;

connections: ConnectionWithState[];
readyConnections: Set<ConnectionWithState>;
waitingQueue: Array<(err: Error | null, result: any) => void> = [];
constructor(connectionInfo) {
this.connectionInfo = connectionInfo;

let max = connectionInfo.max;
if (max && typeof max !== "number") {
throw $ERR_INVALID_ARG_TYPE("max", "number", max);
} else {
max = 10; // same default as postgres.js
}
if (max < 1) {
throw $ERR_INVALID_ARG_VALUE("max", max, "must be greater than 0");
}

this.connections = new Array(max);
for (let i = 0; i < max; i++) {
this.connections[i] = new ConnectionWithState(this.connectionInfo, this);
}
this.readyConnections = new Set();
}

release(connection: ConnectionWithState) {
if (this.waitingQueue.length > 0) {
const pending = this.waitingQueue.shift();
pending?.(null, connection);
} else {
this.readyConnections.add(connection);
}
}

connect(onConnected: (err: Error | null, result: any) => void) {
if (this.readyConnections.size === 0) {
// wait for connection to be released
this.waitingQueue.push(onConnected);
return;
}
// unshift
const first = this.readyConnections.values().next().value;
this.readyConnections.delete(first);
onConnected(null, first);
}
}

function createConnection(
{
hostname,
Expand Down Expand Up @@ -543,6 +636,8 @@ function SQL(o) {
storedErrorForClosedConnection,
connectionInfo = loadOptions(o);

const pool = new ConnectionPool(connectionInfo);

function connectedHandler(query, handle, err) {
if (err) {
return query.reject(err);
Expand Down Expand Up @@ -572,7 +667,7 @@ function SQL(o) {
}

function closedConnectionHandler(query, handle) {
query.reject(storedErrorForClosedConnection || new Error("Connection closed"));
query.reject(storedErrorForClosedConnection || connectionClosedError());
}

function onConnected(err, result) {
Expand Down Expand Up @@ -717,11 +812,18 @@ function SQL(o) {
const sql_with_savepoint = function (strings, ...values) {
return sql(strings, ...values);
};
// dirt copy of the sql object
for (const key in sql) {
sql_with_savepoint[key] = sql[key];
}
// this version accepts savepoints
// allow flush, close, options, then, and asyncDispose to be called on the sql_with_savepoint
sql_with_savepoint.flush = sql.flush;
sql_with_savepoint.close = sql.close;
sql_with_savepoint.options = sql.options;
sql_with_savepoint.then = sql.then;
// begin is not allowed on a transaction we need to use savepoint() instead
sql_with_savepoint.begin = function () {
throw $ERR_POSTGRES_INVALID_TRANSACTION_STATE("cannot call begin on a transaction use savepoint() instead");
};
sql_with_savepoint[Symbol.asyncDispose] = sql[Symbol.asyncDispose];

// this version accepts savepoints with is basically nested transactions
sql_with_savepoint.savepoint = async (fn: TransactionCallback, name?: string) => {
let savepoint_callback = fn;

Expand Down

0 comments on commit 25eeff3

Please sign in to comment.