diff --git a/db-service/lib/cqn2sql.js b/db-service/lib/cqn2sql.js index 3af67b5bb..52a95fa09 100644 --- a/db-service/lib/cqn2sql.js +++ b/db-service/lib/cqn2sql.js @@ -701,7 +701,8 @@ class CQN2SQLRenderer { if (!elements) { this.entries = INSERT.rows - const param = this.param.bind(this, { ref: ['?'] }) + const param = () => this.param({ ref: ['?'] }) + this.updateParams = this.INSERT_rows_unresolved_update return (this.sql = `INSERT INTO ${this.quote(entity)}${alias ? ' as ' + this.quote(alias) : ''} (${this.columns.map(c => this.quote(c))}) VALUES (${columns.map(param)})`) } @@ -728,6 +729,19 @@ class CQN2SQLRenderer { this.entries = [[stream]] } + INSERT_rows_unresolved_update(entries) { + entries = Array.isArray(entries?.[0]) ? entries : [entries] + + const params = this.params + this.params = undefined + this.entries = [] + for(const row of entries) { + this.values = [] + params.forEach(p => this.val({ val: row[p] })) + this.entries.push(this.values) + } + } + /** * Renders an INSERT query with values property * @param {import('./infer/cqn').INSERT} q diff --git a/hana/lib/HANAService.js b/hana/lib/HANAService.js index 25b6518c5..37c8da4c1 100644 --- a/hana/lib/HANAService.js +++ b/hana/lib/HANAService.js @@ -179,7 +179,7 @@ class HANAService extends SQLService { ? entries.reduce((l, c) => l.then(() => this.ensureDBC() && ps.run(c)), Promise.resolve(0)) : entries.length > 1 ? this.ensureDBC() && await ps.runBatch(entries) : this.ensureDBC() && await ps.run(entries[0]) : this.ensureDBC() && ps.run()) - return new this.class.InsertResults(cqn, results) + return new this.class.InsertResults(cqn, [results]) } catch (err) { throw _not_unique(err, 'ENTITY_ALREADY_EXISTS') } @@ -224,7 +224,7 @@ class HANAService extends SQLService { values.length === 1 ? values[0] + (values[0].indexOf(`SELECT '$[' as "_path_"`) < 0 ? pathOrder : '') : 'SELECT * FROM ' + values.map(v => `(${v})`).join(' UNION ALL ') + pathOrder - ) + ) + ' WITH HINT(USE_HEX_PLAN,OPTIMIZATION_LEVEL(RULE_BASED))' DEBUG?.(ret) return ret } @@ -727,12 +727,32 @@ class HANAService extends SQLService { : ObjectKeys(INSERT.entries[0]) this.columns = columns - const extractions = this.managed(columns.map(c => ({ name: c })), elements) + const extractions = this._managed = this.managed(columns.map(c => ({ name: c })), elements) // REVISIT: @cds.extension required const extraction = extractions.map(c => c.extract) const converter = extractions.map(c => c.insert) + if (this.params) this.updateParams = this.INSERT_entries_update + else this.INSERT_entries_update(INSERT.entries) + + // WITH SRC is used to force HANA to interpret the ? as a NCLOB allowing for streaming of the data + // Additionally for drivers that did allow for streaming of NVARCHAR they quickly reached size limits + // This should allow for 2GB of data to be inserted + // When using a buffer table it would be possible to stream indefinitely + // For the buffer table to work the data has to be sanitized by a complex regular expression + // Which in testing took up about a third of the total time processing time + // With the buffer table approach is also greatly reduces the required memory + // JSON_TABLE parses the whole JSON document at once meaning that the whole JSON document has to be in memory + // With the buffer table approach the data is processed in chunks of a configurable size + // Which allows even smaller HANA systems to process large datasets + // But the chunk size determines the maximum size of a single row + return (this.sql = `INSERT INTO ${this.quote(entity)} (${this.columns.map(c => this.quote(transitions.mapping.get(c)?.ref?.[0] || c)) + }) WITH SRC AS (SELECT ? AS JSON FROM DUMMY UNION ALL SELECT TO_NCLOB(NULL) AS JSON FROM DUMMY) + SELECT ${converter} FROM JSON_TABLE(SRC.JSON, '$' COLUMNS(${extraction}) ERROR ON ERROR) AS NEW`) + } + + INSERT_entries_update(entries) { const _stream = entries => { const stream = Readable.from(this.INSERT_entries_stream(entries, 'hex'), { objectMode: false }) stream.setEncoding('utf-8') @@ -741,39 +761,42 @@ class HANAService extends SQLService { return stream } + entries = Array.isArray(entries) ? entries : [entries] + // HANA Express does not process large JSON documents // The limit is somewhere between 64KB and 128KB if (HANAVERSION <= 2) { - this.entries = INSERT.entries.map(e => (e instanceof Readable + this.entries = entries.map(e => (e instanceof Readable ? [e] : [_stream([e])])) } else { this.entries = [[ - INSERT.entries[0] instanceof Readable - ? INSERT.entries[0] - : _stream(INSERT.entries) + entries[0] instanceof Readable + ? entries[0] + : _stream(entries) ]] } - - // WITH SRC is used to force HANA to interpret the ? as a NCLOB allowing for streaming of the data - // Additionally for drivers that did allow for streaming of NVARCHAR they quickly reached size limits - // This should allow for 2GB of data to be inserted - // When using a buffer table it would be possible to stream indefinitely - // For the buffer table to work the data has to be sanitized by a complex regular expression - // Which in testing took up about a third of the total time processing time - // With the buffer table approach is also greatly reduces the required memory - // JSON_TABLE parses the whole JSON document at once meaning that the whole JSON document has to be in memory - // With the buffer table approach the data is processed in chunks of a configurable size - // Which allows even smaller HANA systems to process large datasets - // But the chunk size determines the maximum size of a single row - return (this.sql = `INSERT INTO ${this.quote(entity)} (${this.columns.map(c => this.quote(transitions.mapping.get(c)?.ref?.[0] || c)) - }) WITH SRC AS (SELECT ? AS JSON FROM DUMMY UNION ALL SELECT TO_NCLOB(NULL) AS JSON FROM DUMMY) - SELECT ${converter} FROM JSON_TABLE(SRC.JSON, '$' COLUMNS(${extraction}) ERROR ON ERROR) AS NEW`) } INSERT_rows(q) { const { INSERT } = q + const elements = q.elements || q.target?.elements + if (!elements) { + return super.INSERT_rows(q) + } + + if (!this.params) { + this.INSERT_rows_update(INSERT.rows) + INSERT.entries = this.entries + } + + const ret = this.INSERT_entries(q) + if (this.params) this.updateParams = this.INSERT_rows_update + return ret + } + + INSERT_rows_update(rows) { // Convert the rows into entries to simplify inserting // Tested: // - Array JSON INSERT (1.5x) @@ -782,18 +805,18 @@ class HANAService extends SQLService { // - Object JSON INSERT (1x) // The problem with Simple INSERT is the type mismatch from csv files // Recommendation is to always use entries - const elements = q.elements || q.target?.elements - if (!elements) { - return super.INSERT_rows(q) - } + rows = Array.isArray(rows?.[0]) ? rows : [rows] + + const q = this.cqn + const INSERT = q.INSERT || q.UPSERT + const elements = q.elements || q.target?.elements const columns = INSERT.columns || [] for (const col of ObjectKeys(elements)) { if (!columns.includes(col)) columns.push(col) } - const entries = new Array(INSERT.rows.length) - const rows = INSERT.rows + const entries = new Array(rows.length) for (let x = 0; x < rows.length; x++) { const row = rows[x] const entry = {} @@ -804,8 +827,8 @@ class HANAService extends SQLService { } entries[x] = entry } - INSERT.entries = entries - return this.INSERT_entries(q) + + this.INSERT_entries_update(entries) } UPSERT(q) { @@ -822,22 +845,25 @@ class HANAService extends SQLService { // temporal data keys.push(...ObjectKeys(q.target.elements).filter(e => q.target.elements[e]['@cds.valid.from'])) - const managed = this.managed( - this.columns.map(c => ({ name: c })), - elements - ) - + const managed = this._managed const keyCompare = managed .filter(c => keys.includes(c.name)) .map(c => `${c.insert}=OLD.${this.quote(c.name)}`) .join(' AND ') - const mixing = managed.map(c => c.upsert) - const extraction = managed.map(c => c.extract) - - const sql = `WITH SRC AS (SELECT ? AS JSON FROM DUMMY UNION ALL SELECT TO_NCLOB(NULL) AS JSON FROM DUMMY) -SELECT ${mixing} FROM JSON_TABLE(SRC.JSON, '$' COLUMNS(${extraction})) AS NEW LEFT JOIN ${this.quote(entity)} AS OLD ON ${keyCompare}` - + let src + let prefix = '' + if (UPSERT.as) { + // Reset values and params as they where already created in INSERT_select + if (this.values) this.values = [] + if (this.params) this.params = [] + src = `SELECT ${managed.map(c => `${c.insert} AS ${this.quote(c.name)}`)} FROM (${this.SELECT(this.cqn4sql(UPSERT.as))}) AS NEW` + } else { + const extract = managed.map(c => c.extract) + prefix = `WITH SRC AS (SELECT ? AS JSON FROM DUMMY UNION ALL SELECT TO_NCLOB(NULL) AS JSON FROM DUMMY)` + src = `SELECT * FROM JSON_TABLE(SRC.JSON, '$' COLUMNS(${extract}))` + } + const sql = `${prefix}SELECT ${managed.slice(0, this.columns.length).map(c => c.upsert)} FROM (${src}) AS NEW LEFT JOIN ${this.quote(entity)} AS OLD ON ${keyCompare}` return (this.sql = `UPSERT ${this.quote(entity)} (${this.columns.map(c => this.quote(c))}) ${sql}`) } diff --git a/sqlite/test/general/stream.test.js b/sqlite/test/general/stream.test.js index f7c002dbc..ee56ad999 100644 --- a/sqlite/test/general/stream.test.js +++ b/sqlite/test/general/stream.test.js @@ -85,7 +85,7 @@ describe('streaming', () => { { ID: ID1, data: stream1, data2: stream2 }, { ID: ID2, data: stream3, data2: stream4 }, { ID: ID3, data: stream5, data2: stream6 } - ] = await SELECT.from(Images).columns(['ID', 'data', 'data2']) + ] = await SELECT.from(Images).columns(['ID', 'data', 'data2']).orderBy`ID` await checkSize(stream1) await checkSize(stream2) expect(stream3).to.be.null @@ -107,7 +107,7 @@ describe('streaming', () => { test('READ multiple entries ignore stream properties if columns = all', async () => cds.tx(async () => { const { Images } = cds.entities('test') - const result = await SELECT.from(Images) + const result = await SELECT.from(Images).orderBy`ID` expect(result[0].ID).equals(1) expect(result[0].data).to.be.undefined expect(result[0].data2).to.be.undefined diff --git a/test/compliance/cache.test.js b/test/compliance/cache.test.js index 83403a85a..601a60dc3 100644 --- a/test/compliance/cache.test.js +++ b/test/compliance/cache.test.js @@ -79,18 +79,17 @@ describe('SQL cache', () => { yield ']' } - const first = await cds.run(cqn, row()) - const second = await cds.run(cqn, [row(), row()]) - const third = await cds.run(cqn, Readable.from(str(3), { objectMode: false })) - // const fourth = await cds.run(cqn, gen(4)) - expect(first | 0).eq(1) - expect(second | 0).eq(2) - expect(third | 0).eq(3) - // expect(fourth | 0).eq(4) + await cds.run(cqn, row()) + await cds.run(cqn, [row(), row()]) + await cds.run(cqn, Readable.from(str(3), { objectMode: false })) + // await cds.run(cqn, gen(4)) + let count = 0 for (const r of await SELECT.from(keys)) { expect(r.data).lt(i) + count++ } + expect(count).eq(6) }) test('rows', async () => { @@ -112,14 +111,10 @@ describe('SQL cache', () => { } yield ']' } - const first = await cds.run(cqn, [row()]) - const second = await cds.run(cqn, [row(), row()]) - const third = await cds.run(cqn, Readable.from(str(3), { objectMode: false })) - // const fourth = await cds.run(cqn, gen(4)) - expect(first | 0).eq(1) - expect(second | 0).eq(2) - expect(third | 0).eq(3) - // expect(fourth | 0).eq(4) + await cds.run(cqn, [row()]) + await cds.run(cqn, [row(), row()]) + // await cds.run(cqn, Readable.from(str(3), { objectMode: false })) + // await cds.run(cqn, gen(4)) for (const r of await SELECT.from(keys)) { expect(r.data).lt(i) @@ -134,11 +129,8 @@ describe('SQL cache', () => { const cqn = cds.ql[method](cds.ql`SELECT id + :id as id:Integer, :data as data:String from ${keys}`).into(keys) let i = 1 - const first = await cds.run(cqn, { id: i, data: `${i++}` }) - const second = await cds.run(cqn, { id: i, data: `${i++}` }) - - expect(first | 0).eq(1) - expect(second | 0).eq(2) + await cds.run(cqn, { id: i, data: `${i++}` }) + await cds.run(cqn, { id: i, data: `${i++}` }) for (const r of await SELECT.from(keys)) { expect(r.data).lt(i)