Skip to content

Commit

Permalink
Merge pull request #162 from realratchet/master
Browse files Browse the repository at this point in the history
add support for opening for any number of columns
  • Loading branch information
realratchet authored Apr 10, 2024
2 parents 4d58d36 + c1d48f6 commit 0fabe2d
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 17 deletions.
4 changes: 2 additions & 2 deletions nimlite/funcs/text_reader/csvparse.nim
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ proc parseSaveField(self: var ReaderObj, dia: Dialect): bool =
if likely(self.fieldLen > 0):
copyMem(field[0].addr, self.field[0].addr, self.fieldLen)

if unlikely(self.fieldCount + 1 >= (uint self.field.high)):
self.field.setLen(self.field.len() * 2)
if unlikely(self.fieldCount + 1 >= (uint self.fields.high)):
self.fields.setLen(self.fields.len() * 2)

if dia.skiptrailingspace:
field = field.strip(leading = false, trailing = true)
Expand Down
76 changes: 76 additions & 0 deletions nimlite/funcs/text_reader/file_tracker.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import std/[lists, options, sugar, os, parseutils]
from ../../utils import implement

var MAX_FILES_OPEN = 4096

discard parseInt(os.getEnv("MAX_FILES_OPEN", $MAX_FILES_OPEN), MAX_FILES_OPEN, 0)

type FileTracker* = ref object of RootObj
# base type for file open file management
count: int

type FileTrackerSimple = ref object of FileTracker
# we're on the fast path because we can handle all these files
pageHandles: seq[File]

type FileTrackerDeferred = ref object of FileTracker
# we're on the slow path because we can't handle all these files
pageHandles: seq[Option[File]]
pages: seq[string]
pagesOpen: int
openOrder: SinglyLinkedList[int]

proc newFileTracker*(pages: seq[string]): FileTracker =
if pages.len <= MAX_FILES_OPEN:
let handles = collect:
for p in pages:
open(p, fmWrite)

return FileTrackerSimple(count: pages.len, pageHandles: handles)

stderr.writeLine("Too many columns in file, using deferred importer instead. This will greatly reduce performance: " & $pages.len & " > " & $MAX_FILES_OPEN & ". You can increase MAX_FILES_OPEN if your OS allows.")

for p in pages:
# create empties
open(p, fmWrite).close()

return FileTrackerDeferred(count: pages.len, pages: pages, pageHandles: newSeq[Option[File]](pages.len), openOrder: initSinglyLinkedList[int]())

method getHandle(self: FileTrackerDeferred, index: int): File {.base.} =
if self.pageHandles[index].isSome:
return self.pageHandles[index].get

let handle = open(self.pages[index], fmAppend)

self.pageHandles[index] = some(handle)

inc self.pagesOpen
self.openOrder.add(newSinglyLinkedNode(index))

return handle

method closeHandle(self: FileTrackerDeferred, index: int): void {.base.} =
if self.pageHandles[index].isNone:
return

self.pageHandles[index].get.flushFile()
self.pageHandles[index].get.close()
self.pageHandles[index] = none[File]()
dec self.pagesOpen

method `[]`*(self: FileTracker, index: int): File {.base, inline.} = implement("FileTracker.`[]` must be implemented by inheriting class.")
method `[]`(self: FileTrackerSimple, index: int): File = self.pageHandles[index]
method `[]`(self: FileTrackerDeferred, index: int): File =
if self.pagesOpen < MAX_FILES_OPEN:
return self.getHandle(index)

self.closeHandle(self.openOrder.head.value)
self.openOrder.remove(self.openOrder.head)

return self.getHandle(index)

method close*(self: FileTracker): void {.base, inline.} = implement("FileTracker.close must be implemented by inheriting class.")
method close(self: FileTrackerSimple): void = (for h in self.pageHandles: h.close())
method close(self: FileTrackerDeferred): void = (for h in self.pageHandles: (if h.isSome: h.get.close()))

proc len*(self: FileTracker): int = self.count
21 changes: 11 additions & 10 deletions nimlite/funcs/text_reader/paging.nim
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import nimpy as nimpy
import std/[sugar, sequtils, unicode, enumerate, tables]
import encfile, csvparse
import std/[sugar, unicode, tables]
import encfile, csvparse, file_tracker
import ../../[numpy, pickling, ranking, infertypes, pytypes, utils]

type PageType = enum
Expand Down Expand Up @@ -91,17 +91,18 @@ proc isAnyFloat(dt: DataTypes): bool {.inline.} =
proc dumpPageHeader*(
destinations: var seq[string],
nPages: int, nRows: uint, guessDtypes: bool,
longestStr: var seq[uint], ranks: var seq[Rank]
): (seq[File], seq[PageType], uint32) =
let pageFileHandlers = collect(newSeqOfCap(nPages)):
for p in destinations:
open(p, fmWrite)
longestStr: seq[uint], ranks: var seq[Rank]
): (FileTracker, seq[PageType], uint32) =
let pageFileHandlers = newFileTracker(destinations)

var columnDtypes = newSeq[PageType](nPages)
var binput: uint32 = 0

if not guessDtypes:
for idx, (fh, i) in enumerate(zip(pageFileHandlers, longestStr)):
for idx in 0..<pageFileHandlers.len:
let fh = pageFileHandlers[idx]
let i = longestStr[idx]

columnDtypes[idx] = PageType.PG_UNICODE
fh.writeNumpyHeader(endiannessMark & "U" & $i, nRows)
else:
Expand Down Expand Up @@ -217,7 +218,7 @@ proc dumpPageBody*(
obj: var ReaderObj, fh: var BaseEncodedFile,
guessDtypes: bool, nPages: int, rowCount: int, skipEmpty: SkipEmpty,
importFields: var seq[uint],
pageFileHandlers: var seq[File],
pageFileHandlers: var FileTracker,
longestStr: var seq[uint], ranks: var seq[Rank], columnDtypes: var seq[PageType],
binput: var uint32
): (seq[Table[KindObjectND, int]], seq[int]) =
Expand Down Expand Up @@ -352,7 +353,7 @@ proc dumpPageBody*(

proc dumpPageFooter*(
nPages: int, nRows: uint,
pageFileHandlers: var seq[File],
pageFileHandlers: var FileTracker,
columnDtypes: var seq[PageType],
binput: var uint32
): void =
Expand Down
8 changes: 4 additions & 4 deletions nimlite/funcs/text_reader/text_reader.nim
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import nimpy as nimpy
import std/[os, enumerate, sugar, tables, json, options, strutils, paths]
import encfile, csvparse, table, ../../utils, paging, taskargs
import std/[os, enumerate, sugar, tables, options, strutils, paths]
import encfile, csvparse, table, paging, taskargs, file_tracker
import ../../utils
from ../../numpy import newPyPage
from ../../ranking import Rank

Expand Down Expand Up @@ -106,8 +107,7 @@ proc textReaderTask*(task: TaskArgs, page_info: PageInfo): seq[nimpy.PyObject] =

return elems
finally:
for f in pageFileHandlers:
f.close()
pageFileHandlers.close()

finally:
fh.close()
Expand Down
2 changes: 1 addition & 1 deletion tablite/version.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
major, minor, patch = 2023, 11, 1
major, minor, patch = 2023, 11, 2
__version_info__ = (major, minor, patch)
__version__ = ".".join(str(i) for i in __version_info__)

0 comments on commit 0fabe2d

Please sign in to comment.