Skip to content

Commit

Permalink
Improve S3 Loaders
Browse files Browse the repository at this point in the history
  • Loading branch information
JJK801 committed Jan 8, 2025
1 parent 2d0bb5e commit fb3c0f1
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 119 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import { omit } from 'lodash'
import { ICommonObject, INode, INodeData, INodeOptionsValue, INodeOutputsValue, INodeParams } from '../../../src/Interface'
import { getCredentialData, getCredentialParam, handleEscapeCharacters } from '../../../src/utils'
import {
getCredentialData,
getCredentialParam,
handleDocumentLoaderDocuments,
handleDocumentLoaderMetadata,
handleDocumentLoaderOutput
} from '../../../src/utils'
import { S3Client, GetObjectCommand, S3ClientConfig, ListObjectsV2Command, ListObjectsV2Output } from '@aws-sdk/client-s3'
import { getRegions, MODEL_TYPE } from '../../../src/modelLoader'
import { Readable } from 'node:stream'
Expand All @@ -10,12 +15,13 @@ import * as os from 'node:os'

import { DirectoryLoader } from 'langchain/document_loaders/fs/directory'
import { JSONLoader } from 'langchain/document_loaders/fs/json'
import { CSVLoader } from '@langchain/community/document_loaders/fs/csv'
import { PDFLoader } from '@langchain/community/document_loaders/fs/pdf'
import { DocxLoader } from '@langchain/community/document_loaders/fs/docx'
import { TextLoader } from 'langchain/document_loaders/fs/text'
import { TextSplitter } from 'langchain/text_splitter'

import { CSVLoader } from '../Csv/CsvLoader'

class S3_DocumentLoaders implements INode {
label: string
name: string
Expand Down Expand Up @@ -151,11 +157,6 @@ class S3_DocumentLoaders implements INode {
const _omitMetadataKeys = nodeData.inputs?.omitMetadataKeys as string
const output = nodeData.outputs?.output as string

let omitMetadataKeys: string[] = []
if (_omitMetadataKeys) {
omitMetadataKeys = _omitMetadataKeys.split(',').map((key) => key.trim())
}

let credentials: S3ClientConfig['credentials'] | undefined

if (nodeData.credential) {
Expand Down Expand Up @@ -241,11 +242,11 @@ class S3_DocumentLoaders implements INode {
'.csv': (path) => new CSVLoader(path),
'.docx': (path) => new DocxLoader(path),
'.pdf': (path) =>
pdfUsage === 'perFile'
? // @ts-ignore
new PDFLoader(path, { splitPages: false, pdfjs: () => import('pdf-parse/lib/pdf.js/v1.10.100/build/pdf.js') })
: // @ts-ignore
new PDFLoader(path, { pdfjs: () => import('pdf-parse/lib/pdf.js/v1.10.100/build/pdf.js') }),
new PDFLoader(path, {
splitPages: pdfUsage !== 'perFile',
// @ts-ignore
pdfjs: () => import('pdf-parse/lib/pdf.js/v1.10.100/build/pdf.js')
}),
'.aspx': (path) => new TextLoader(path),
'.asp': (path) => new TextLoader(path),
'.cpp': (path) => new TextLoader(path), // C++
Expand Down Expand Up @@ -284,63 +285,16 @@ class S3_DocumentLoaders implements INode {
true
)

let docs = []

if (textSplitter) {
let splittedDocs = await loader.load()
splittedDocs = await textSplitter.splitDocuments(splittedDocs)
docs.push(...splittedDocs)
} else {
docs = await loader.load()
}

if (metadata) {
const parsedMetadata = typeof metadata === 'object' ? metadata : JSON.parse(metadata)
docs = docs.map((doc) => ({
...doc,
metadata:
_omitMetadataKeys === '*'
? {
...parsedMetadata
}
: omit(
{
...doc.metadata,
...parsedMetadata
},
omitMetadataKeys
)
}))
} else {
docs = docs.map((doc) => ({
...doc,
metadata:
_omitMetadataKeys === '*'
? {}
: omit(
{
...doc.metadata
},
omitMetadataKeys
)
}))
}
let docs = await handleDocumentLoaderDocuments(loader, textSplitter)

// remove the temp directory before returning docs
fsDefault.rmSync(tempDir, { recursive: true })
docs = handleDocumentLoaderMetadata(docs, _omitMetadataKeys, metadata)

if (output === 'document') {
return docs
} else {
let finaltext = ''
for (const doc of docs) {
finaltext += `${doc.pageContent}\n`
}
return handleEscapeCharacters(finaltext, false)
}
return handleDocumentLoaderOutput(docs, output)
} catch (e: any) {
fsDefault.rmSync(tempDir, { recursive: true })
throw new Error(`Failed to load data from bucket ${bucketName}: ${e.message}`)
} finally {
// remove the temp directory before returning docs
fsDefault.rmSync(tempDir, { recursive: true })
}
}
}
Expand Down
65 changes: 12 additions & 53 deletions packages/components/nodes/documentloaders/S3File/S3File.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { omit } from 'lodash'
import { ICommonObject, INode, INodeData, INodeOptionsValue, INodeOutputsValue, INodeParams } from '../../../src/Interface'
import { S3Loader } from '@langchain/community/document_loaders/web/s3'
import {
Expand All @@ -8,7 +7,13 @@ import {
SkipInferTableTypes,
HiResModelName
} from '@langchain/community/document_loaders/fs/unstructured'
import { getCredentialData, getCredentialParam, handleEscapeCharacters } from '../../../src/utils'
import {
getCredentialData,
getCredentialParam,
handleDocumentLoaderDocuments,
handleDocumentLoaderMetadata,
handleDocumentLoaderOutput
} from '../../../src/utils'
import { S3Client, GetObjectCommand, S3ClientConfig } from '@aws-sdk/client-s3'
import { getRegions, MODEL_TYPE } from '../../../src/modelLoader'
import { Readable } from 'node:stream'
Expand Down Expand Up @@ -483,11 +488,6 @@ class S3_DocumentLoaders implements INode {
const _omitMetadataKeys = nodeData.inputs?.omitMetadataKeys as string
const output = nodeData.outputs?.output as string

let omitMetadataKeys: string[] = []
if (_omitMetadataKeys) {
omitMetadataKeys = _omitMetadataKeys.split(',').map((key) => key.trim())
}

let credentials: S3ClientConfig['credentials'] | undefined

if (nodeData.credential) {
Expand Down Expand Up @@ -572,56 +572,15 @@ class S3_DocumentLoaders implements INode {

const unstructuredLoader = new UnstructuredLoader(filePath, obj)

let docs = await unstructuredLoader.load()

if (metadata) {
const parsedMetadata = typeof metadata === 'object' ? metadata : JSON.parse(metadata)
docs = docs.map((doc) => ({
...doc,
metadata:
_omitMetadataKeys === '*'
? {
...parsedMetadata
}
: omit(
{
...doc.metadata,
...parsedMetadata,
[sourceIdKey]: doc.metadata[sourceIdKey] || sourceIdKey
},
omitMetadataKeys
)
}))
} else {
docs = docs.map((doc) => ({
...doc,
metadata:
_omitMetadataKeys === '*'
? {}
: omit(
{
...doc.metadata,
[sourceIdKey]: doc.metadata[sourceIdKey] || sourceIdKey
},
omitMetadataKeys
)
}))
}
let docs = await handleDocumentLoaderDocuments(unstructuredLoader)

fsDefault.rmSync(path.dirname(filePath), { recursive: true })
docs = handleDocumentLoaderMetadata(docs, _omitMetadataKeys, metadata, sourceIdKey)

if (output === 'document') {
return docs
} else {
let finaltext = ''
for (const doc of docs) {
finaltext += `${doc.pageContent}\n`
}
return handleEscapeCharacters(finaltext, false)
}
return handleDocumentLoaderOutput(docs, output)
} catch {
fsDefault.rmSync(path.dirname(filePath), { recursive: true })
throw new Error(`Failed to load file ${filePath} using unstructured loader.`)
} finally {
fsDefault.rmSync(path.dirname(filePath), { recursive: true })
}
}

Expand Down

0 comments on commit fb3c0f1

Please sign in to comment.