Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce learn #95

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/reduce/applier/reducer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,6 @@ import (
// ReduceApplier applies the GRPCBasedReduceUDF on the stream of read messages and gives back a new message.
type ReduceApplier interface {
// ApplyReduce applies the reduce UDF on the stream of window requests and streams the timed window response.
// doesn't wait for the response for all the keys in the window, before sending the response back.
// It doesn't wait for the response for all the keys in the window, before sending the response back.
ApplyReduce(ctx context.Context, partitionID *partition.ID, messageStream <-chan *window.TimedWindowRequest) (<-chan *window.TimedWindowResponse, <-chan error)
}
4 changes: 2 additions & 2 deletions pkg/reduce/data_forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (df *DataForward) ReplayPersistedMessages(ctx context.Context) error {
df.log.Infow("Partitions to be replayed ", zap.Int("count", len(partitions)), zap.Any("partitions", partitions))

// for aligned windows, we can create/assign a partition based on the timestamp in the payload, but this is not
// neccessarily true for unaligned windows.
// necessarily true for unaligned windows.

for _, p := range partitions {
// create a window for each partition and insert it to the windower
Expand Down Expand Up @@ -418,7 +418,7 @@ messagesLoop:

err := df.writeToPBQ(ctx, winOp)
// there is no point continuing because we are seeing an error.
// this error will ONLY BE set if we are in a error loop and ctx.Done() has been invoked.
// this error will ONLY BE set if we are in an error loop and ctx.Done() has been invoked.
if err != nil {
df.log.Errorw("Failed to write message, asked to stop trying", zap.Any("msgOffSet", message.ReadOffset.String()), zap.String("partitionID", winOp.ID.String()), zap.Error(err))
failedMessages = append(failedMessages, messages[i:]...)
Expand Down
5 changes: 3 additions & 2 deletions pkg/reduce/pbq/partition/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

// partition is a tuple containing (start, end) time and an optional slot.
// Window contains a partition because Window containts the keys too.
// Package partition is a tuple containing (start, end) time and an optional slot.
// Window contains a partition because Window contains the keys too.
// A partition is used to map a message to a pbq instance.
package partition

import (
Expand Down
4 changes: 2 additions & 2 deletions pkg/reduce/pnf/processandforward.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ outerLoop:
for {
select {
case err := <-errCh:
if err == ctx.Err() {
if errors.Is(err, ctx.Err()) {
return
}
if err != nil {
Expand Down Expand Up @@ -170,7 +170,7 @@ outerLoop:
for {
select {
case err := <-errCh:
if err == ctx.Err() {
if errors.Is(err, ctx.Err()) {
return
}
if err != nil {
Expand Down
9 changes: 7 additions & 2 deletions pkg/window/sorted_window_endtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ func (s *SortedWindowListByEndTime) Delete(window TimedWindow) (deleted bool) {
return deleted
}

// RemoveWindows removes a set of windows smaller than or equal to the given time.
// RemoveWindows removes a set of windows whose end time is smaller than or equal to the given time.
// It returns the removed windows.
func (s *SortedWindowListByEndTime) RemoveWindows(t time.Time) []TimedWindow {
s.lock.Lock()
defer s.lock.Unlock()
Expand Down Expand Up @@ -241,7 +242,7 @@ func (s *SortedWindowListByEndTime) WindowToBeMerged(window TimedWindow) (TimedW
return !s.windows[i].EndTime().Before(window.EndTime())
})

// since its only sorted by end time, we have to check for all the windows with the same end time greater than or equal to the given window
// since it's only sorted by end time, we have to check for all the windows with end time greater than or equal to the given window
// for example if the windows are (60, 70), (50, 90) and (30, 100), if we are given a window (35,45) we should return (30,100)
// if we just check the window at index 0, we will return (60,70) which is incorrect
for i := index; i < len(s.windows); i++ {
Expand All @@ -260,6 +261,10 @@ func (s *SortedWindowListByEndTime) WindowToBeMerged(window TimedWindow) (TimedW
return nil, false
}

// compareKeys returns true if the keys are equal.
// The order of the keys matters, e.g. ["a", "b"] != ["b", "a"].
// A concrete example can be that we are tracking API calls among applications, each call can be represented as keys [client_id, service_id].
// In this case, ["app_1", "app_2"] != ["app_2", "app_1"] because app_1 calling app_2 is different from app_2 calling app_1.
func compareKeys(a, b []string) bool {
if len(a) != len(b) {
return false
Expand Down
29 changes: 17 additions & 12 deletions pkg/window/strategy/fixed/fixed.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ func NewFixedWindow(length time.Duration, message *isb.ReadMessage) window.Timed
}
}

var _ window.TimedWindow = (*fixedWindow)(nil)

func (w *fixedWindow) StartTime() time.Time {
return w.startTime
}
Expand All @@ -76,21 +78,21 @@ func (w *fixedWindow) Partition() *partition.ID {
}
}

func (w *fixedWindow) Merge(tw window.TimedWindow) {
// never be invokved for Aligned Window
func (w *fixedWindow) Merge(_ window.TimedWindow) {
// never be invoked for Aligned Window
}

func (w *fixedWindow) Expand(endTime time.Time) {
// never be invokved for Aligned Window
func (w *fixedWindow) Expand(_ time.Time) {
// never be invoked for Aligned Window
}

// Windower is a implementation of TimedWindower of fixed window, windower is responsible for assigning
// Windower is an implementation of TimedWindower of fixed window, windower is responsible for assigning
// windows to the incoming messages and closing the windows that are past the watermark.
type Windower struct {
// Length is the temporal length of the window.
length time.Duration
// we track all the active windows, we store the windows sorted by end time
// so its easy to find the window
// so it's easy to find the window
activeWindows *window.SortedWindowListByEndTime
// closedWindows is a list of closed windows which are yet to be GCed
// we need to track the close windows because while publishing the watermark
Expand All @@ -117,9 +119,8 @@ func (w *Windower) Type() window.Type {
}

// AssignWindows assigns the event to the window based on give window configuration.
// AssignWindows returns a array of TimedWindoweRequest to window message. Partition id is used to
// identify the pbq instance to which the message should be assigned. fixedWindow message contains
// the isb message and the window operation, where the operation can be OPEN | APPEND.
// For fixed window, the message is assigned to one single window.
// The operation can be either OPEN or APPEND, depending on whether the window is already present or not.
func (w *Windower) AssignWindows(message *isb.ReadMessage) []*window.TimedWindowRequest {
win, isPresent := w.activeWindows.InsertIfNotPresent(NewFixedWindow(w.length, message))

Expand All @@ -138,7 +139,7 @@ func (w *Windower) AssignWindows(message *isb.ReadMessage) []*window.TimedWindow
return []*window.TimedWindowRequest{winOp}
}

// InsertWindow inserts window to the list of active windows
// InsertWindow inserts a window to the list of active windows
func (w *Windower) InsertWindow(tw window.TimedWindow) {
w.activeWindows.InsertIfNotPresent(tw)
}
Expand All @@ -147,12 +148,15 @@ func (w *Windower) InsertWindow(tw window.TimedWindow) {
// returns a list of TimedWindowRequests, each request contains the window operation and the window
// which needs to be closed.
func (w *Windower) CloseWindows(time time.Time) []*window.TimedWindowRequest {
// FIXME - we are updating both active and closed windows.
// We need to make this method atomic.
// Same for other windower methods.
winOperations := make([]*window.TimedWindowRequest, 0)
closedWindows := w.activeWindows.RemoveWindows(time)
for _, win := range closedWindows {
winOp := &window.TimedWindowRequest{
ReadMessage: nil,
// we can call Delete here because in Aligned window, we are sure that COB has been called for all the keys
// we can call Delete here because in an aligned window, we are sure that COB has been called for all the keys
Operation: window.Delete,
Windows: []window.TimedWindow{win},
ID: win.Partition(),
Expand All @@ -173,7 +177,8 @@ func (w *Windower) DeleteClosedWindow(response *window.TimedWindowResponse) {
w.closedWindows.Delete(response.Window)
}

// OldestWindowEndTime returns the end time of the oldest window.
// OldestWindowEndTime returns the end time of the oldest window among both active and closed windows.
// If there are no windows, it returns -1.
func (w *Windower) OldestWindowEndTime() time.Time {
if win := w.closedWindows.Front(); win != nil {
return win.EndTime()
Expand Down
20 changes: 10 additions & 10 deletions pkg/window/strategy/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ func NewSessionWindow(startTime time.Time, gap time.Duration, message *isb.ReadM
}
}

var _ window.TimedWindow = (*sessionWindow)(nil)

func (w *sessionWindow) StartTime() time.Time {
return w.startTime
}
Expand Down Expand Up @@ -105,17 +107,17 @@ func cloneWindow(win window.TimedWindow) *sessionWindow {
}
}

// Expand expands the window. An interesting property of Unaligned windows :).
// Expand expands the window end time to the new endTime. An interesting property of Unaligned windows :).
func (w *sessionWindow) Expand(endTime time.Time) {
if endTime.After(w.endTime) {
w.endTime = endTime
}
}

// Windower is a implementation of TimedWindower of session window, windower is responsible for assigning
// Windower is an implementation of TimedWindower of session window, windower is responsible for assigning
// windows to the incoming messages and closing the windows that are past the watermark.
type Windower struct {
// gap is the duration after which the session is marked as closed.
// gap is the duration of inactivity after which a session window is marked as closed.
gap time.Duration

// activeWindows is a map of keys to list of active windows
Expand Down Expand Up @@ -152,9 +154,8 @@ func (*Windower) Type() window.Type {
// AssignWindows assigns the event to the window based on give window configuration. This assignment could trigger the following
// - New window Creation
// - Expand an existing window
// - Append to an existing window (event time has the event-time such that gap + event-time is < window end time).
// - Append to an existing window (the message has the event-time such that gap + event-time is < window end time).
func (w *Windower) AssignWindows(message *isb.ReadMessage) []*window.TimedWindowRequest {

var (
combinedKey = strings.Join(message.Keys, delimiter)
windowOperations = make([]*window.TimedWindowRequest, 0)
Expand Down Expand Up @@ -214,7 +215,7 @@ func (w *Windower) AssignWindows(message *isb.ReadMessage) []*window.TimedWindow
return windowOperations
}

// InsertWindow inserts window to the list of active windows
// InsertWindow inserts a window to the list of active windows.
func (w *Windower) InsertWindow(tw window.TimedWindow) {
combinedKey := strings.Join(tw.Keys(), delimiter)
if list, ok := w.activeWindows[combinedKey]; !ok {
Expand All @@ -228,7 +229,7 @@ func (w *Windower) InsertWindow(tw window.TimedWindow) {

func createWindowOperation(message *isb.ReadMessage, event window.Operation, windows []window.TimedWindow, id *partition.ID) *window.TimedWindowRequest {
// clone the windows because the windows might be updated after the operation is sent to the server.
// we do inplace updates, but those will be merged later on, hence correctness won't be affected.
// we do in-place updates, but those will be merged later on, hence correctness won't be affected.
var clonedWindows = make([]window.TimedWindow, 0)
for _, win := range windows {
clonedWindows = append(clonedWindows, cloneWindow(win))
Expand Down Expand Up @@ -323,9 +324,8 @@ func (w *Windower) DeleteClosedWindow(response *window.TimedWindowResponse) {
w.closedWindows.Delete(response.Window)
}

// OldestWindowEndTime returns the end time of the oldest window.
// if there are no closed windows, it returns the end time of the oldest active window
// if there are no windows, it returns -1
// OldestWindowEndTime returns the end time of the oldest window among both active and closed windows.
// If there are no windows, it returns -1.
func (w *Windower) OldestWindowEndTime() time.Time {
if win := w.closedWindows.Front(); win != nil {
return win.EndTime()
Expand Down
23 changes: 14 additions & 9 deletions pkg/window/strategy/sliding/sliding.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ func NewSlidingWindow(startTime time.Time, endTime time.Time) window.TimedWindow
}
}

var _ window.TimedWindow = (*slidingWindow)(nil)

func (w *slidingWindow) StartTime() time.Time {
return w.startTime
}
Expand All @@ -75,15 +77,15 @@ func (w *slidingWindow) Partition() *partition.ID {
}

// Merge merges the given window with the current window.
func (w *slidingWindow) Merge(tw window.TimedWindow) {
// never be invokved for Aligned Window
func (w *slidingWindow) Merge(_ window.TimedWindow) {
// never be invoked for Aligned Window
}

func (w *slidingWindow) Expand(endTime time.Time) {
// never be invokved for Aligned Window
func (w *slidingWindow) Expand(_ time.Time) {
// never be invoked for Aligned Window
}

// Windower is a implementation of TimedWindower of sliding window, windower is responsible for assigning
// Windower is an implementation of TimedWindower of sliding window, windower is responsible for assigning
// windows to the incoming messages and closing the windows that are past the watermark.
type Windower struct {
// Length is the temporal length of the window.
Expand Down Expand Up @@ -118,12 +120,14 @@ func (*Windower) Type() window.Type {
}

// AssignWindows assigns the event to the window based on give window configuration.
// For sliding window, the message can be assigned to multiple windows.
// The operation can be either OPEN or APPEND, depending on whether the window is already present or not.
func (w *Windower) AssignWindows(message *isb.ReadMessage) []*window.TimedWindowRequest {
windowOperations := make([]*window.TimedWindowRequest, 0)

// use the highest integer multiple of slide length which is less than the eventTime
// as the start time for the window. For example if the eventTime is 810 and slide
// length is 70, use 770 as the startTime of the window. In that way we can be guarantee
// as the start time for the window. For example, if the eventTime is 810 and slide
// length is 70, use 770 as the startTime of the window. In that way, we can guarantee
// consistency while assigning the messages to the windows.
startTime := time.UnixMilli((message.EventTime.UnixMilli() / w.slide.Milliseconds()) * w.slide.Milliseconds())
endTime := startTime.Add(w.length)
Expand Down Expand Up @@ -160,7 +164,7 @@ func (w *Windower) AssignWindows(message *isb.ReadMessage) []*window.TimedWindow
return windowOperations
}

// InsertWindow inserts window to the list of active windows
// InsertWindow inserts a window to the list of active windows.
func (w *Windower) InsertWindow(tw window.TimedWindow) {
w.activeWindows.InsertIfNotPresent(tw)
}
Expand Down Expand Up @@ -192,7 +196,8 @@ func (w *Windower) DeleteClosedWindow(response *window.TimedWindowResponse) {
w.closedWindows.Delete(response.Window)
}

// OldestWindowEndTime returns the end time of the oldest window.
// OldestWindowEndTime returns the end time of the oldest window among both active and closed windows.
// If there are no windows, it returns -1.
func (w *Windower) OldestWindowEndTime() time.Time {
if win := w.closedWindows.Front(); win != nil {
return win.EndTime()
Expand Down
12 changes: 8 additions & 4 deletions pkg/window/windower.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ import (
)

// TimedWindower is the interface for windowing strategy.
// It manages the lifecycle of timed windows for a reduce vertex.
// It maintains a list of timed windows locally, generates window requests to be sent to the reduce UDF,
// and reflects the changes to the list of timed windows based on the response from the UDF.
type TimedWindower interface {
// Strategy returns the window strategy.
Strategy() Strategy
Expand All @@ -33,17 +36,18 @@ type TimedWindower interface {
AssignWindows(message *isb.ReadMessage) []*TimedWindowRequest
// CloseWindows closes the windows that are past the watermark.
CloseWindows(time time.Time) []*TimedWindowRequest
// InsertWindow inserts window to the list of active windows
// InsertWindow inserts a window to the list of active windows.
InsertWindow(tw TimedWindow)
// NextWindowToBeClosed returns the next window yet to be closed.
NextWindowToBeClosed() TimedWindow
// DeleteClosedWindow deletes the window from the closed windows list.
DeleteClosedWindow(response *TimedWindowResponse)
// OldestWindowEndTime returns the end time of the oldest window.
// OldestWindowEndTime returns the end time of the oldest window among both active and closed windows.
// If there are no windows, it returns -1.
OldestWindowEndTime() time.Time
}

// TimedWindow represents a time based window.
// TimedWindow represents a time-based window.
type TimedWindow interface {
// StartTime returns the start time of the window.
StartTime() time.Time
Expand Down Expand Up @@ -149,7 +153,7 @@ type TimedWindowRequest struct {
Windows []TimedWindow
}

// TimedWindowResponse is the response from the UDF based on how the result is propogated back.
// TimedWindowResponse is the response from the UDF based on how the result is propagated back.
// It could be one or more responses based on how many results the user is streaming out.
type TimedWindowResponse struct {
// WriteMessage represents the isb message
Expand Down
20 changes: 16 additions & 4 deletions test/reduce-e2e/reduce_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
//go:build test

/*
Copyright 2022 The Numaproj Authors.
Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -197,7 +195,7 @@ func (r *ReduceSuite) TestSimpleReducePipelineFailOverUsingWAL() {
done <- struct{}{}
}

// two reduce vertex(keyed and non keyed) followed by a sliding window vertex
// two reduce vertices (keyed and non-keyed) followed by a sliding window vertex
func (r *ReduceSuite) TestComplexSlidingWindowPipeline() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
Expand All @@ -222,17 +220,31 @@ func (r *ReduceSuite) TestComplexSlidingWindowPipeline() {
case <-done:
return
default:
// send the number "1" and "2" to the pipeline every second
eventTime := strconv.Itoa(startTime + i*1000)
w.SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("1")).WithHeader("X-Numaflow-Event-Time", eventTime)).
SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("2")).WithHeader("X-Numaflow-Event-Time", eventTime))
}
}
}()

// At the keyed reduce vertex, the 5-second fixed window produces output every 5 seconds as
// {key: "even", value: 10(2*5s=10)} and {key: "odd", value: 5(1*5s=5)}.
// At the non-keyed reduce vertex, the 10-second fixed window produces output every 10 seconds as
// {value: 30} ((10+5)*(10s/5s) = 30)
// At the non-keyed sliding window vertex,
// the sliding window is configured with length 60s.
// At the first 10s, the window output is {value: 30}
// At the second 10s, the output is {value: 60} (30+30 = 60)
// It goes on like this, and at the 6th 10s, the output is {value: 180}
// At the 7th 10s, the output remains 180 as the window slides forward.
w.Expect().
SinkContains("sink", "30").
SinkContains("sink", "60").
SinkContains("sink", "180")
SinkNotContains("sink", "80").
SinkContains("sink", "90").
SinkContains("sink", "180").
SinkNotContains("sink", "210")
done <- struct{}{}
}

Expand Down
Loading