Skip to content

Commit

Permalink
Reduce thrashing during feature ingestion and tiling (#56)
Browse files Browse the repository at this point in the history
* Remove the concept of "separate metadata"

This was an extra level of attribute indirection (features point
to metadata records which point to key and value strings) which was
intended to reduce the size of temporary storage for features with
large numbers of attributes that were also spread across large numbers
of tiles at maxzoom.

For other kinds of features, the extra indirection slowed things down
instead, and, especially when maxzoom guessing was being used, many more
features were having their metadata externalized than could actually
benefit from it.

* Shave a few bytes off temporary files by using more unsigned integers

* Flush stderr after logging progress

* Revert "Shave a few bytes off temporary files by using more unsigned integers"

This reverts commit eef2908.

* Limit the size of the string pools and trees to fit in memory

* Add missing #include

* Move the string pool and search tree from mmap to allocated memory

* Sort in allocated rather than mapped memory too

* Also use pread instead of mapping to read in the data to sort

* When the pool gets too big, switch to just the file, not memory

* Switch string pool from memory to disk when memory is 10% full

* Add to-memory versions of the serialization functions

* Crashy work in progress toward compression

* Fix the pointer bug that was causing the crash

* Serialize features into memory rather than straight to disk

* Compress individual features in the temporary files

* Don't need to store the length of the geometry

* Remove per-feature compression; move minzoom back into the object

* Start adding a stream compressor object

* Track file position within fwrite_check()

* Add compressed stream writer functions

* Pull the writing of the serialized feature out to the callers

* Starting toward compression again from a different point

* Hook up more compression functions

* Remove unused code from the other day

* Make enough deflate calls to flush out all the buffered data

* Start on decompression

* Tile number is uncompressed, tile content is compressed

* Work on alternating compressed and uncompressed in decompression

* Closer, but still doesn't work

* Sort of works

* Works until we get to concatenated tiles

* More attempts that don't work

* One bug down

* It made a tileset!

* Handle nonzero initial zooms

* Fix seeking within compressed feature streams

* Tests pass!

* Remove debug spew

* Oops: remember to delete the temporary files so they don't hang around

* Test that fails with the current compression code

* Properly account for bytes read while closing the compressed stream

* Limit the number of warnings about bad label points

* A little more armor when closing decompression

* This time for sure

* A different, less fragile, test that failed previously with compression

* Move feature stream compression to its own file

* Remove now-unused code to deserialize from a file

* Forgot to add the new files

* Remove a little debugging logging

* Add a couple of comments on what it means to be within decompression

* Fix indentation

* Update changelog. Remove stray debugging comment.
  • Loading branch information
e-n-f authored Feb 14, 2023
1 parent b155b46 commit f2127fe
Show file tree
Hide file tree
Showing 23 changed files with 85,673 additions and 450 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
## 2.23.0

* Remove the concept of "separate metadata." Features now always directly reference their keys and values rather than going through a second level of indirection.
* Limit the size of the string pool to 1/5 the size of memory, to prevent thrashing during feature ingestion.
* Avoid using writeable memory maps. Instead, explicitly copy data in and out of memory.
* Compress streams of features in the temporary files, to reduce disk usage and I/O latency

## 2.22.0

* Speed up feature dropping by removing unnecessary search for other small features
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ C = $(wildcard *.c) $(wildcard *.cpp)
INCLUDES = -I/usr/local/include -I.
LIBS = -L/usr/local/lib

tippecanoe: geojson.o jsonpull/jsonpull.o tile.o pool.o mbtiles.o geometry.o projection.o memfile.o mvt.o serial.o main.o text.o dirtiles.o pmtiles_file.o plugin.o read_json.o write_json.o geobuf.o flatgeobuf.o evaluator.o geocsv.o csv.o geojson-loop.o json_logger.o visvalingam.o
tippecanoe: geojson.o jsonpull/jsonpull.o tile.o pool.o mbtiles.o geometry.o projection.o memfile.o mvt.o serial.o main.o text.o dirtiles.o pmtiles_file.o plugin.o read_json.o write_json.o geobuf.o flatgeobuf.o evaluator.o geocsv.o csv.o geojson-loop.o json_logger.o visvalingam.o compression.o
$(CXX) $(PG) $(LIBS) $(FINAL_FLAGS) $(CXXFLAGS) -o $@ $^ $(LDFLAGS) -lm -lz -lsqlite3 -lpthread

tippecanoe-enumerate: enumerate.o
Expand All @@ -56,7 +56,7 @@ tippecanoe-enumerate: enumerate.o
tippecanoe-decode: decode.o projection.o mvt.o write_json.o text.o jsonpull/jsonpull.o dirtiles.o pmtiles_file.o
$(CXX) $(PG) $(LIBS) $(FINAL_FLAGS) $(CXXFLAGS) -o $@ $^ $(LDFLAGS) -lm -lz -lsqlite3

tile-join: tile-join.o projection.o pool.o mbtiles.o mvt.o memfile.o dirtiles.o jsonpull/jsonpull.o text.o evaluator.o csv.o write_json.o pmtiles_file.o
tile-join: tile-join.o projection.o mbtiles.o mvt.o memfile.o dirtiles.o jsonpull/jsonpull.o text.o evaluator.o csv.o write_json.o pmtiles_file.o
$(CXX) $(PG) $(LIBS) $(FINAL_FLAGS) $(CXXFLAGS) -o $@ $^ $(LDFLAGS) -lm -lz -lsqlite3 -lpthread

tippecanoe-json-tool: jsontool.o jsonpull/jsonpull.o csv.o text.o geojson-loop.o
Expand Down
269 changes: 269 additions & 0 deletions compression.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,269 @@
#ifdef __APPLE__
#define _DARWIN_UNLIMITED_STREAMS
#endif

#include "compression.hpp"
#include "errors.hpp"
#include "protozero/varint.hpp"
#include "serial.hpp"

void decompressor::begin() {
within = true;

zs.zalloc = NULL;
zs.zfree = NULL;
zs.opaque = NULL;
zs.msg = (char *) "";

int d = inflateInit(&zs);
if (d != Z_OK) {
fprintf(stderr, "initialize decompression: %d %s\n", d, zs.msg);
exit(EXIT_IMPOSSIBLE);
}
}

int decompressor::fread(void *p, size_t size, size_t nmemb, std::atomic<long long> *geompos) {
zs.next_out = (Bytef *) p;
zs.avail_out = size * nmemb;

while (zs.avail_out > 0) {
if (zs.avail_in == 0) {
size_t n = ::fread((Bytef *) buf.c_str(), sizeof(char), buf.size(), fp);
if (n == 0) {
if (within) {
fprintf(stderr, "Reached EOF while decompressing\n");
exit(EXIT_IMPOSSIBLE);
} else {
break;
}
}
zs.next_in = (Bytef *) buf.c_str();
zs.avail_in = n;
}

size_t avail_before = zs.avail_in;

if (within) {
int d = inflate(&zs, Z_NO_FLUSH);
*geompos += avail_before - zs.avail_in;

if (d == Z_OK) {
// it made some progress
} else if (d == Z_STREAM_END) {
// it may have made some progress and now we are done
within = false;
break;
} else {
fprintf(stderr, "decompression error %d %s\n", d, zs.msg);
exit(EXIT_IMPOSSIBLE);
}
} else {
size_t n = std::min(zs.avail_in, zs.avail_out);
memcpy(zs.next_out, zs.next_in, n);
*geompos += n;

zs.avail_out -= n;
zs.avail_in -= n;
zs.next_out += n;
zs.next_in += n;
}
}

return (size * nmemb - zs.avail_out) / size;
}

void decompressor::end(std::atomic<long long> *geompos) {
// "within" means that we haven't received end-of-stream yet,
// so consume more compressed data until we get there.
// This can be necessary if the caller knows that it is at
// the end of the feature stream (because it got a 0-length
// feature) but the decompressor doesn't know yet.

if (within) {
while (true) {
if (zs.avail_in == 0) {
size_t n = ::fread((Bytef *) buf.c_str(), sizeof(char), buf.size(), fp);
zs.next_in = (Bytef *) buf.c_str();
zs.avail_in = n;
}

zs.avail_out = 0;

size_t avail_before = zs.avail_in;
int d = inflate(&zs, Z_NO_FLUSH);
*geompos += avail_before - zs.avail_in;

if (d == Z_STREAM_END) {
break;
}
if (d == Z_OK) {
continue;
}

fprintf(stderr, "decompression: got %d, not Z_STREAM_END\n", d);
exit(EXIT_IMPOSSIBLE);
}

within = false;
}

int d = inflateEnd(&zs);
if (d != Z_OK) {
fprintf(stderr, "end decompression: %d %s\n", d, zs.msg);
exit(EXIT_IMPOSSIBLE);
}
}

int decompressor::deserialize_ulong_long(unsigned long long *zigzag, std::atomic<long long> *geompos) {
*zigzag = 0;
int shift = 0;

while (1) {
char c;
if (fread(&c, sizeof(char), 1, geompos) != 1) {
return 0;
}

if ((c & 0x80) == 0) {
*zigzag |= ((unsigned long long) c) << shift;
shift += 7;
break;
} else {
*zigzag |= ((unsigned long long) (c & 0x7F)) << shift;
shift += 7;
}
}

return 1;
}

int decompressor::deserialize_long_long(long long *n, std::atomic<long long> *geompos) {
unsigned long long zigzag = 0;
int ret = deserialize_ulong_long(&zigzag, geompos);
*n = protozero::decode_zigzag64(zigzag);
return ret;
}

int decompressor::deserialize_int(int *n, std::atomic<long long> *geompos) {
long long ll = 0;
int ret = deserialize_long_long(&ll, geompos);
*n = ll;
return ret;
}

int decompressor::deserialize_uint(unsigned *n, std::atomic<long long> *geompos) {
unsigned long long v;
deserialize_ulong_long(&v, geompos);
*n = v;
return 1;
}

void compressor::begin() {
zs.zalloc = NULL;
zs.zfree = NULL;
zs.opaque = NULL;
zs.msg = (char *) "";

int d = deflateInit(&zs, Z_DEFAULT_COMPRESSION);
if (d != Z_OK) {
fprintf(stderr, "initialize compression: %d %s\n", d, zs.msg);
exit(EXIT_IMPOSSIBLE);
}
}

void compressor::compressor::end(std::atomic<long long> *fpos, const char *fname) {
std::string buf;
buf.resize(5000);

if (zs.avail_in != 0) {
fprintf(stderr, "compression end called with data available\n");
exit(EXIT_IMPOSSIBLE);
}

zs.next_in = (Bytef *) buf.c_str();
zs.avail_in = 0;

while (true) {
zs.next_out = (Bytef *) buf.c_str();
zs.avail_out = buf.size();

int d = deflate(&zs, Z_FINISH);
::fwrite_check(buf.c_str(), sizeof(char), zs.next_out - (Bytef *) buf.c_str(), fp, fpos, fname);

if (d == Z_OK || d == Z_BUF_ERROR) {
// it can take several calls to flush out all the buffered data
continue;
}

if (d != Z_STREAM_END) {
fprintf(stderr, "%s: finish compression: %d %s\n", fname, d, zs.msg);
exit(EXIT_IMPOSSIBLE);
}

break;
}

zs.next_out = (Bytef *) buf.c_str();
zs.avail_out = buf.size();

int d = deflateEnd(&zs);
if (d != Z_OK) {
fprintf(stderr, "%s: end compression: %d %s\n", fname, d, zs.msg);
exit(EXIT_IMPOSSIBLE);
}

::fwrite_check(buf.c_str(), sizeof(char), zs.next_out - (Bytef *) buf.c_str(), fp, fpos, fname);
}

int compressor::fclose() {
return ::fclose(fp);
}

void compressor::fwrite_check(const char *p, size_t size, size_t nmemb, std::atomic<long long> *fpos, const char *fname) {
std::string buf;
buf.resize(size * nmemb * 2 + 200);

zs.next_in = (Bytef *) p;
zs.avail_in = size * nmemb;

while (zs.avail_in > 0) {
zs.next_out = (Bytef *) buf.c_str();
zs.avail_out = buf.size();

int d = deflate(&zs, Z_NO_FLUSH);
if (d != Z_OK) {
fprintf(stderr, "%s: deflate: %d %s\n", fname, d, zs.msg);
exit(EXIT_IMPOSSIBLE);
}

::fwrite_check(buf.c_str(), sizeof(char), zs.next_out - (Bytef *) buf.c_str(), fp, fpos, fname);
}
}

void compressor::serialize_ulong_long(unsigned long long val, std::atomic<long long> *fpos, const char *fname) {
while (1) {
unsigned char b = val & 0x7F;
if ((val >> 7) != 0) {
b |= 0x80;
fwrite_check((const char *) &b, 1, 1, fpos, fname);
val >>= 7;
} else {
fwrite_check((const char *) &b, 1, 1, fpos, fname);
break;
}
}
}

void compressor::serialize_long_long(long long val, std::atomic<long long> *fpos, const char *fname) {
unsigned long long zigzag = protozero::encode_zigzag64(val);

serialize_ulong_long(zigzag, fpos, fname);
}

void compressor::serialize_int(int val, std::atomic<long long> *fpos, const char *fname) {
serialize_long_long(val, fpos, fname);
}

void compressor::serialize_uint(unsigned val, std::atomic<long long> *fpos, const char *fname) {
serialize_ulong_long(val, fpos, fname);
}
58 changes: 58 additions & 0 deletions compression.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#ifdef __APPLE__
#define _DARWIN_UNLIMITED_STREAMS
#endif

#include <stdio.h>
#include <string>
#include <atomic>
#include <zlib.h>

struct decompressor {
FILE *fp = NULL;
z_stream zs;
std::string buf;

// from begin() to receiving end-of-stream
bool within = false;

decompressor(FILE *f) {
fp = f;
buf.resize(5000);

zs.next_in = (Bytef *) buf.c_str();
zs.avail_in = 0;
}

decompressor() {
}

void begin();
int fread(void *p, size_t size, size_t nmemb, std::atomic<long long> *geompos);
void end(std::atomic<long long> *geompos);
int deserialize_ulong_long(unsigned long long *zigzag, std::atomic<long long> *geompos);
int deserialize_long_long(long long *n, std::atomic<long long> *geompos);
int deserialize_int(int *n, std::atomic<long long> *geompos);
int deserialize_uint(unsigned *n, std::atomic<long long> *geompos);
};

struct compressor {
FILE *fp = NULL;
z_stream zs;

compressor(FILE *f) {
fp = f;
}

compressor() {
}

void begin();
void end(std::atomic<long long> *fpos, const char *fname);
int fclose();
void fwrite_check(const char *p, size_t size, size_t nmemb, std::atomic<long long> *fpos, const char *fname);
void serialize_ulong_long(unsigned long long val, std::atomic<long long> *fpos, const char *fname);

void serialize_long_long(long long val, std::atomic<long long> *fpos, const char *fname);
void serialize_int(int val, std::atomic<long long> *fpos, const char *fname);
void serialize_uint(unsigned val, std::atomic<long long> *fpos, const char *fname);
};
Loading

0 comments on commit f2127fe

Please sign in to comment.