Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk handling of logs #49

Open
GeertJohan opened this issue Mar 4, 2015 · 14 comments
Open

Bulk handling of logs #49

GeertJohan opened this issue Mar 4, 2015 · 14 comments

Comments

@GeertJohan
Copy link

This issue originates from #44 where an external dependency (go-loggly) did buffering of log messages.

One of the key features of log15 is the way Handlers connect and create a handling tree. I think properties such as async, buffering (with max buffersize and flush interval) should be handled by a Handler, and the hook to a logging service shouldn't have to implement buffering itself and should just send logs to the logging service.

Now ofcourse the whole purpose of the buffering in go-loggly is so that can send multiple log entries in a single call to loggly. And this is actually very important, especially when working with huge amounts of logs. But buffering and bulk writes are also important in other handlers. For instance, a colleague of mine (@michaelvlaar) found that some of our application spend a huge amount of time in the kernel due to writes to files and os.Stdout. So a more generic approach to buffering and handling of bulk log records seems to be an interesting topic. We brainstormed a bit about it and came up with the following idea:

Add an extra interface type:

type BulkHandler interface{
    LogBulk([]*Record) error
}

Some Handlers such as the LogglyHandler and the FileHandler could implement this interface, taking a slice of logs and sending/writing them all at once, or at least as much as possible/efficient per call/write. For instance: Loggly's bulk endpoint states "You can send a maximum of 5MB per batch and up to 1MB per event", so the LogglyHandler will need to take care of that.

Then a new Handler would be added:

type bufferedBulkHandler struct {
    buffer []*Record
    handler Handler
    bufferSize int // buffer is this size large, and will be flushed when it's full
    flushInterval time.Duration
    forceFlushLevel Level // buffer is flushed directly (while blocking the logging call) after a message with this level (or higher) is added.
}

func BufferedBulkHander(size int, interval time.Duration, forceFlushLevel Level, handler Handler) Handler

The ForceFlushLevel is actually quite important, there's a good chance a program is crashing when a log with LvlCrit is added. And you wouldn't want that message to be stuck in the buffer when the program crashes. Normal flushing takes place asynchronously, not directly blocking a Log call.

Maybe in this case (because of the number of options) the BufferedBulkHandler should be an exported type with exported fields, and not a function.

In summary, what I propose is:

  • Define the BulkHandler interface
  • Add the BufferedBulkHandler (either as func or exported struct)
  • Implement LogBulk([] *log15.Record) error for the FileHandler and LogglyHandler, and possibly more.

What I'm not sure about is whether BufferedBulkHandler should accept a Handler as argument, and check with a type assertion if the Handler also happens to implement BulkHandler. Or maybe it should just accept BulkHandler's (it doesn't really make sense to first buffer messages, and then handle them one by one through a normal Handler, or does it?).

@ChrisHines
Copy link
Collaborator

How about:

type bufferedBulkHandler struct {
    buffer []*Record // the buffer is flushed when len(buffer) == cap(buffer)
    handler BulkHandler
    flushInterval time.Duration
    forceFlushLevel Level // buffer is flushed directly (while blocking the logging call) after a message with this level (or higher) is added.
}

We get rid of the bufferSize field as it is redundant with the slice capacity.

What I'm not sure about is whether BufferedBulkHandler should accept a Handler as argument, and check with a type assertion if the Handler also happens to implement BulkHandler. Or maybe it should just accept BulkHandler's (it doesn't really make sense to first buffer messages, and then handle them one by one through a normal Handler, or does it?).

My preference is for the argument type to be BulkHandler. It is more explicit, less magical, and as you pointed out, it doesn't make sense to buffer the records and then forward them one at a time.

Alternative Design

I have a bigger design concern though. I'm not sure that buffering the log.Records is the right scope. The LogglyHandler wants to batch up to a maximum number of bytes which we only know after the records have been formatted. Likewise, it makes more sense for a FileHandler to wrap the os.File with a bufio.Buffer to combine Write calls.

On the other hand we also want the capability to force a flush on certain events, such as the passage of time or the logging of an important message. This situation makes me think that a better solution involves some mechanism for sending a flush signal (or hint) through the handler chain. Some Handlers may respond to the flush signal, and others may generate it. In this approach we would add a LvlFlushHandler and a TimerFlushHandler to generate flush signals. Meanwhile BufferedFileHandler (new) and LogglyHandler would collect formatted log data in a bufio.Buffer and flush when the buffer is full or when a flush signal is received from upstream. The flush signal is implemented as an additional bool field on the log15.Record struct.

@GeertJohan
Copy link
Author

I like the idea here; the Handler at the end of the chain (LogglyHandler, BufferedFileHandler, etc.) knows best how to buffer the messages, and when to flush (Loggly 5MB limit, etc.). So that definitly makes sense!
What I dislike is adding a Flush bool to the *log15.Record, and having a LvlFlushHandler and timerFlushHandler. I think when the buffer is moved back to the Handler's themselves, they should be configurable to flush at a given level (defaulting to LvlCrit). It's probably only 4 lines of code per handler so not really a duplication of code. It will also be able to directly interact with the Handler's internals. Furthermore: I can imagine situations where you want to flush one Handler at LvlWarn, and flush the other Handler only at LvlCrit. When adding a field FlushLevel to LogglyHandler and others, that becomes possible.

I'll think some more about this, maybe there's an even better solution that combines the best of both..

@inconshreveable
Copy link
Owner

Just like all buffered IO APIs wrap the IO interfaces and add a Flush() method:

type BulkHandler interface {
    Handler
    Flush() error
}

Applications can then determine their own flushing strategies, and we can provide a few sane and easy ones.

func LevelFlusher(lvl Lvl, h BulkHandler) BulkHandler {
    return FuncHandler(func (r *Record) error {
        h.Log(r)
        if r.Lvl > lvl {
            h.Flush()
        }
        return nil
    })
}
func TimedFlusher(d time.Duration, h BulkHandler) BulkHandler {
    go func() {
        for _ = range time.Tick(d) { h.Flush() }
    }()
    return h
}
func SyncFlusher(h BulkHandler) Handler {
    return FuncHandler(func (r *Record) error {
        h.Log(r)
        return h.Flush()
    })
}

Obviously, this means each BulkHandler will have to responsible for doing its own internal buffering of pending messages.

@GeertJohan
Copy link
Author

But that doesn't allow you to set the max buffer size, and I still think it's a bit strange to have a signal on the handler tree. It also can be quite hard to implement good buffering with guaranteed non-blocking calls to Log (also when the buffer is flushing). And I think it doesn't make much sense to do that for each Handler that wants to be buffered.

What if we introduce a new type Buffer that is used by a buffered Handler. The Handler sets two func's on the type; one to convert the *Record to a []byte, and another to handle the flush. The Handler itself can choose to export some of the Buffer configuration fields through setter methods such as setting the flush interval, flush level, and buffer size. This means that the Handler can also implement custom checks for setting those values (e.g. Loggly can't handle >5MB), so the buffer shouldn't be larger than that. The Handler can also set default values that are actually make sense for that Handler.

Small example, I left out the synchronization for now.
I can work out a prototype if you like the idea.

// Buffer is exported so it can be used by subpackages
type Buffer struct {
    bufferSize    int
    flushInterval time.Duration
    flushLevel    Lvl

    currentNode *bufferNode
    bufferPool  sync.Pool // pool with used buffer slices to avoid allocations and gc
    //++ and other synchonization related fields

    convert func(*Record) []byte
    flush   func([]byte)
}

type bufferNode struct {
    bufferBytes []byte // slice cap == Buffer.bufferSize
    //++ other fields for synchronization and timeout
}

func NewBuffer(size int, interval time.Duration, convert func(*Record) []byte, flush func([]byte)) *Buffer {
    b := &Buffer{
        bufferSize:    size,
        flushInterval: interval,
        flushLevel:    LvlCrit, // default value

        convert: convert,
        flush:   flush,
    }
    //++ setup sync.Pool
    //++ create first bufferNode
    //++ setup synchronization and buffer rotation (Log() shouldn't block when buffer is being flushed)
    return b
}

// LogBuffered is not "Log" as we don't want *buffer to implement Handler
func (b *Buffer) LogBuffered(r *Record) {
    newBytes := b.convert(r)
    //++ starting here there should be synchronization to avoid races
    //++ calculate if newBytes will overflow the buffer. if yes flush that buffer
    //++ also should add newBytes to new buffer taken from the bufferPool
    _ = append(b.currentNode.bufferBytes, newBytes...)
    //++ force a flush when r.Lvl < b.flushLevel
}

func (b *Buffer) SetBufferSize(size int) {
    //++ if buffer was used already: return error
    b.bufferSize = size
}

func (b *Buffer) SetFlushInterval(interval time.Duration) {
    //++ if buffer was used already: return error
    b.flushInterval = interval
}

func (b *Buffer) SetFlushLevel(lvl Lvl) {
    //++ if buffer was used already: return error
    b.flushLevel = lvl
}

// ##### and now how the buffer is used to create a buffered Handler

type bufferedFileHandler struct {
    buffer   *Buffer
    filename string
}

func BufferedFileHandler(filename string) Handler {
    h := &bufferedFileHandler{
        filename: filename,
    }
    // create new buffer with defaults that make sense for this Handler
    h.buffer = NewBuffer(1*1024*1024, 5*time.Second, h.convert, h.flush)
    return h
}

func (h *bufferedFileHandler) Log(r *Record) {
    h.buffer.LogBuffered(r)
}

func (h *bufferedFileHandler) convert(r *Record) []byte { return nil }
func (h *bufferedFileHandler) flush([]byte)             {}

// in this example we allow the user to change flushInterval and flushLevel but not bufferSize
func (h *bufferedFileHandler) SetFlushInterval(interval time.Duration) error {
    if interval == 42*time.Second {
        return errors.New("for a mysterious reason the BufferedFileHandler cannot flush exactly every 42 seconds")
    }
    h.buffer.SetFlushInterval(interval)
    return nil
}
func (h *bufferedFileHandler) SetFlushLevel(lvl Lvl) {
    h.buffer.SetFlushLevel(lvl)
}

@ChrisHines
Copy link
Collaborator

type BulkHandler interface {
    Handler
    Flush() error
}

@inconshreveable Unfortunately this doesn't compose well with things like MultiHandler and FailoverHandler. It also just misses describing ext/SpeculativeHandler, which has a Flush() that does not return an error.

@GeertJohan convert func(*Record) []byte should be written convert Format and more generally this approach does not feel like it fits well with log15. Like Unix commands, each Handler should be composible and do one thing well. The above provides exactly two reasons for flushing and doesn't allow for new reasons we haven't thought of yet.

In general I think we should be trying really hard to fit in with the Go standard library when it comes to buffering bytes. For example bufio.Writer conforms to the following interface:

type WriteFlusher interface {
    io.Writer
    Available() int // helps avoid splitting log records
    Flush() error
}

With that we could implement a StreamFlushHandler(wf WriteFlusher, fmtr Format) Handler which could be told when to flush either by an upstream Handler or the application. Given that, we could construct a pipeline such as:

import "gopkg.in/natefinch/lumberjack.v2"

lj := &lumberjack.Logger{
    Filename: "/path/to/file.log",
}
h := log15.StreamFlushHandler(bufio.NewWriter(lj), log15.JsonFormat())
h = log15.LvlFlusher(log15.LvlError, h)

log15.Root().SetHandler(h)

Furthermore, anyone can implement the WriteFlusher interface if they want to optimize buffering differently and it will just plug right in. Time based flushing could be implemented as a wrapper around bufio.Writer, or externally like LvlFlusher.

@GeertJohan
Copy link
Author

If new Record-based reasons to flush a log arrise those can be added to Buffer and (when applicable) exposed through the Handlers using the Buffer.
Another approach is to custom-build the buffer for each buffering handler. That does make more sense as it provides better integration with how and when the specific handler needs to flush.

I still don't like that the level-based flushing is a Handler pipeline. In the end the pipeline will contain a lot of lines like: log15.LvlFlusher(log15.LvlError, log15.IntervalFlusher(5*time.second, log15.BufferedFileHandler("filename.log"))). I think the pipeline is meant to modify and handle log15.Records, not influence or manage the bytes and buffers they're eventually formatted in to.

Furthermore, any buffering handler should by default flush at LvlCrit as a convenience to both new and advanced users. It is very likely that the application will exit or crash after a LvlCrit log. So these things should be enabled by default. This is also the case for interval based flushing.

How about a combination of the ideas we've gathered so far?
Let a buffered handler simply implement normal Handler. Do export Flush() on it, but don't aim use Flush from the pipeline. Have the buffered handler provide the SetFlushLevel, SetFlushInterval, SetBufferSize methods when applicable. This enables the user to modify all flushing paramters directly on the handler, while at the same time allowing external Flush() calls (graceful shutdown).

I wouldn't always want to use an external buffer (e.g. bufio) because of performance reasons. The Handler should have direct access to the buffer so it can be optimized to minimize allocations and flush without having a blocking Log() method.

@ChrisHines
Copy link
Collaborator

I disagree. I view Handlers as a highly composable set of tools for building arbitrary log record routing and filtering trees. I think LvlFilterHandler sets plenty of precedent for making level based decisions in a handler. Furthermore, I propose that whether or not a given Record is important enough to warrant flushing buffers is a property of the Record, not the buffer. If you accept that premise then changing that property of a Record is best done by a Handler. In addition, it becomes trivially easy to implement new reasons for flushing—post-hoc—without having to edit the buffer management code.

h := log15.FuncHandler(func (r *log15.Record) error {
    if r.Msg == "fizzbuzz" {
        r.Flush = true
    }
})

This approach does not prevent someone from writing a monolithic Handler that does everything precisely as their application requires with ultimate efficiency. But log15 is "modeled after the standard library's io and net/http packages" and that is how I try to judge contributions.

Finally, I usually write my handler chains like this:

h := log15.BufferedFileHandler("filename.log")
h = log15.IntervalFlusher(5*time.second, h)
h = log15.LvlFlusher(log15.LvlError, h)

log15.Root().SetHandler(h)

I see no problem with that. It is done once during set up and is easy to understand and maintain.

@GeertJohan
Copy link
Author

Hmm... But then the Handler would look like the one @inconshreveable proposed?

type BulkHandler interface {
    Handler
    Flush() error
}

And what about MultiHandler or FailoverHandler?
Can I put an IntervalFlusher before a MultiHandler? Should those Handlers now implement and link Flush() signalling calls down the chain?
And what happens when an IntervalFlush fires a Flush() 500ms after the buffer was flushed due to size limit? Just unforunate? Or can we do something about that?

I think I have one more idea that I'd like to present, and maybe you'll even like it. It's different from what we've seen so far, but I'll need to work it out a bit better.. I'll get back at that tomorrow. After that lets just make the call and implement one of the solutions. We've probably spent enough time thinking about this already.

@ChrisHines
Copy link
Collaborator

@GeertJohan My example pipeline was just a copy of yours reformated. I am not in favor of BufferedFileHandler.

My preferred approach remains the one I described in my comment above that defines the WriteFlusher interface. In that post I proposed a StreamFlushHandler that would be a normal Handler. My untested implementation follows.

func StreamFlushHandler(wf WriteFlusher, fmtr Format) Handler {
    h := FuncHandler(func(r *Record) error {
        _, err := wr.Write(fmtr.Format(r))
        if err != nil || !r.Flush {
            return err
        }
        return wf.Flush()
    })
    return LazyHandler(SyncHandler(h))
}

Which is a very slightly modified copy of StreamHandler. The buffering (if any) is delegated to the passed implementation of WriteFlusher (which bufio.Writer implements). I note that this version doesn't need the Available() method, so WriteFlusher might be overspecified, TBD if we implement it for real.

@GeertJohan
Copy link
Author

In this case the *Record still contains a field Flush bool? Or has that idea been dropped?

@GeertJohan
Copy link
Author

I don't think all buffering Handlers should work with a WriteFlusher, Flush doesn't always apply to a stream of bytes. WriteFlusher won't work for the LogglyHandler. So this only solves the case for StreamFlushHandler, right?

I think we should aim Flush() to be a signal that all records must be flushed, it shouldn't matter in what form, it could even be *Record > text-to-audio > /dev/audio..

Am now working on that new approach that I think combines all our requirements and most wishes.

@ChrisHines
Copy link
Collaborator

In this case the *Record still contains a field Flush bool? Or has that idea been dropped?

Yes. I still consider that idea viable.

WriteFlusher won't work for the LogglyHandler.

It might work if we had a LogglyWriter though. Then LogglyHandler becomes:

func LogglyHandler(...) Handler {
    w := loggly.NewWriter(...)
    f := loggly.NewFormat(...)
    return StreamFlushHandler(w, f)
}

And if we had a LogglyWriter it could be used to do batch uploads like so:

f, err := os.Open("/path/to/log-file")
if err != nil {
    // handle error
}
io.Copy(loggly.NewWriter(...), f)

At that point the LogglyWriter might as well go in it's own repo because it is useful in other contexts than log15.

I think we should aim Flush() to be a signal that all records must be flushed

In my mind the Flush bool on a Record is a hint that the record should be flushed as soon as possible, which implies that all unflushed records ahead of it in the pipe also get flushed. Any Handler can inspect the flush flag and take appropriate action, so I think it works in the general case.

Am now working on that new approach that I think combines all our requirements and most wishes.

I look forward to what you come up with.

@GeertJohan
Copy link
Author

It's been a while, sorry for not responding.. Busy times..

I still see some problems with Flush bool. One is that because the Record is passed by pointer in the handler tree the bool will 'leak' through to other handlers. See this example:

h1 := log15.SomeHandler()
h1 = log15.LvlFlusher(log15.LvlDebug, h1)
h2 := log15.SomeHandler()
h2 = log15.LvlFlusher(log15.LvlError, h2)
h12 := log.MultiHandler(h1, h2)
log15.Root().SetHandler(h12)

Although the Flush bool would be a good addition, it should probably only be set before or at entry of the logging tree. The developer could decide to set the field true for whatever reason and all handlers should take appropriate action.

Interval-based flushing still doesn't make sense in the handler tree, it's goal is to fire a flush when none took place for duration . So it should be notified of flushes.

Flushing in general isn't a concept all handlers could implement. Lets say there is a BeepHandler (beeps when Record.Lvl <= log15.LvlError). It can't do anything with the flush flag. What I'm trying to point out here is that I think the flusher shouldn't go in the handler tree, but instead be plugged right before or onto compatible (flushable) handlers.

Anyway, the idea I had.... I have some actual code about how this would work internally, but lets skip that for now and just look at how the user would use/implement this. Just to see how it feels and discuss if this might be a good way to set it all up.

h := log15.BufferedFileHandler("filename.log")
h.AddFlusher(LvlFlusher(log15.LvlError))
h.AddFlusher(IntervalFlusher(15*time.Second))
log15.Root().SetHandler(h)

AddFlusher is a way to add generic flushers to a flushable handler. The handler itself could also have specific flushing rules. For example: the bulk upload limit at loggly must cause a flush. This would work with some channels to communicate between the Flusher and the Handler. There is also be a method to pass each *Record by the Flusher for inspection. AddFlusher takes a type Flusher interface so anyone can implement a custom Flusher.

The Flush bool field could still be in, but just as a way for the log15 user to force all flushable handlers in the tree to flush by setting the field when creating the Record. It shouldn't be set by a handler in the tree.

It's a while ago I worked on it but the code I have for this is almost a working example/proof-of-concept so I could work this out... Please let me know what you think.

@ChrisHines
Copy link
Collaborator

Sorry for the long delay.

Interval-based flushing still doesn't make sense in the handler tree, it's goal is to fire a flush when none took place for duration . So it should be notified of flushes.

This is the strongest argument against a flush bool field in Record.

Your suggestion seems like a lot of API surface to support flushing. At this point I think the whole concept is best left out of the core library until a simpler solution is found.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants