Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tree removal #9

Merged
merged 4 commits into from
Jan 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 134 additions & 28 deletions expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ import (
"github.com/ohler55/ojg/jp"
)

var (
ErrEvaluableNotFound = fmt.Errorf("Evaluable instance not found in aggregator")
)

// errTreeUnimplemented is used while we develop the aggregate tree library when trees
// are not yet implemented.
var errTreeUnimplemented = fmt.Errorf("tree type unimplemented")
Expand Down Expand Up @@ -70,6 +74,11 @@ func NewAggregateEvaluator(
}

type Evaluable interface {
// Identifier returns a unique identifier for the evaluable item. If there are
// two instances of the same expression, the identifier should return a unique
// string for each instance of the expression (eg. for two pauses).
Identifier() string

// Expression returns an expression as a raw string.
Expression() string
}
Expand Down Expand Up @@ -137,12 +146,21 @@ func (a *aggregator) Evaluate(ctx context.Context, data map[string]any) ([]Evalu
// are added (eg. >= operators on strings), ensure that we find the correct number of matches
// for each group ID and then skip evaluating expressions if the number of matches is <= the group
// ID's length.
seen := map[groupID]struct{}{}

for _, match := range matches {
if _, ok := seen[match.GroupID]; ok {
continue
}

atomic.AddInt32(&matched, 1)
// NOTE: We don't need to add lifted expression variables,
// because match.Parsed.Evaluable() returns the original expression
// string.
ok, evalerr := a.eval(ctx, match.Parsed.Evaluable, data)

seen[match.GroupID] = struct{}{}

if evalerr != nil {
err = errors.Join(err, evalerr)
continue
Expand All @@ -161,6 +179,12 @@ func (a *aggregator) AggregateMatch(ctx context.Context, data map[string]any) ([
a.lock.RLock()
defer a.lock.RUnlock()

// Store the number of times each GroupID has found a match. We need at least
// as many matches as stored in the group ID to consider the match.
counts := map[groupID]int{}
// Store all expression parts per group ID for returning.
found := map[groupID][]ExpressionPart{}

// Iterate through all known variables/idents in the aggregate tree to see if
// the data has those keys set. If so, we can immediately evaluate the data with
// the tree.
Expand All @@ -179,16 +203,32 @@ func (a *aggregator) AggregateMatch(ctx context.Context, data map[string]any) ([

switch cast := res[0].(type) {
case string:
found, ok := tree.Search(ctx, cast)
all, ok := tree.Search(ctx, cast)
if !ok {
continue
}
result = append(result, found.Evals...)

for _, eval := range all.Evals {
counts[eval.GroupID] += 1
if _, ok := found[eval.GroupID]; !ok {
found[eval.GroupID] = []ExpressionPart{}
}
found[eval.GroupID] = append(found[eval.GroupID], eval)
}
default:
continue
}
}

for k, count := range counts {
if int(k.Size()) > count {
// The GroupID required more comparisons to equate to true than
// we had, so this could never evaluate to true. Skip this.
continue
}
result = append(result, found[k]...)
}

return result, nil
}

Expand All @@ -202,12 +242,9 @@ func (a *aggregator) Add(ctx context.Context, eval Evaluable) (bool, error) {
return false, err
}

// NOTE: When modifying, ensure that Remove() is updated. We should reconcile
// the core loops to use the same code.

aggregateable := true
for _, g := range parsed.RootGroups() {
ok, err := a.addGroup(ctx, g, parsed)
ok, err := a.iterGroup(ctx, g, parsed, a.addNode)
if err != nil {
return false, err
}
Expand All @@ -228,7 +265,51 @@ func (a *aggregator) Add(ctx context.Context, eval Evaluable) (bool, error) {
return aggregateable, nil
}

func (a *aggregator) addGroup(ctx context.Context, node *Node, parsed *ParsedExpression) (bool, error) {
func (a *aggregator) Remove(ctx context.Context, eval Evaluable) error {
// parse the expression using our tree parser.
parsed, err := a.parser.Parse(ctx, eval)
if err != nil {
return err
}

aggregateable := true
for _, g := range parsed.RootGroups() {
ok, err := a.iterGroup(ctx, g, parsed, a.removeNode)
if err == ErrExpressionPartNotFound {
return ErrEvaluableNotFound
}
if err != nil {
return err
}
if !ok && aggregateable {
// Find the index of the evaluable in constants and yank out.
idx := -1
for n, item := range a.constants {
if item.Evaluable.Identifier() == eval.Identifier() {
idx = n
break
}
}

if idx == -1 {
return ErrEvaluableNotFound
}

a.lock.Lock()
a.constants = append(a.constants[:idx], a.constants[idx+1:]...)
a.lock.Unlock()
aggregateable = false
}
}

if aggregateable {
atomic.AddInt32(&a.len, -1)
}

return nil
}

func (a *aggregator) iterGroup(ctx context.Context, node *Node, parsed *ParsedExpression, op nodeOp) (bool, error) {
if len(node.Ors) > 0 {
// If there are additional branches, don't bother to add this to the aggregate tree.
// Mark this as a non-exhaustive addition and skip immediately.
Expand All @@ -238,16 +319,26 @@ func (a *aggregator) addGroup(ctx context.Context, node *Node, parsed *ParsedExp
return false, nil
}

// Merge all of the nodes together and check whether each node is aggregateable.
all := append(node.Ands, node)
for _, n := range all {
if !n.HasPredicate() || len(n.Ors) > 0 {
// Don't handle sub-branching for now.
return false, nil
if len(node.Ands) > 0 {
for _, n := range node.Ands {
if !n.HasPredicate() || len(n.Ors) > 0 {
// Don't handle sub-branching for now.
return false, nil
}
if !isAggregateable(n) {
return false, nil
}
}
if !isAggregateable(n) {
}

all := node.Ands

if node.Predicate != nil {
if !isAggregateable(node) {
return false, nil
}
// Merge all of the nodes together and check whether each node is aggregateable.
all = append(node.Ands, node)
}

// Create a new group ID which tracks the number of expressions that must match
Expand All @@ -258,9 +349,8 @@ func (a *aggregator) addGroup(ctx context.Context, node *Node, parsed *ParsedExp
// When checking an incoming event, we match the event against each node's
// ident/variable. Using the group ID, we can see if we've matched N necessary
// items from the same identifier. If so, the evaluation is true.
groupID := newGroupID(uint16(len(all)))
for _, n := range all {
err := a.addNode(ctx, n, groupID, parsed)
err := op(ctx, n, parsed)
if err == errTreeUnimplemented {
return false, nil
}
Expand All @@ -272,7 +362,10 @@ func (a *aggregator) addGroup(ctx context.Context, node *Node, parsed *ParsedExp
return true, nil
}

func (a *aggregator) addNode(ctx context.Context, n *Node, gid groupID, parsed *ParsedExpression) error {
// nodeOp represents an op eg. addNode or removeNode
type nodeOp func(ctx context.Context, n *Node, parsed *ParsedExpression) error

func (a *aggregator) addNode(ctx context.Context, n *Node, parsed *ParsedExpression) error {
// Don't allow anything to update in parallel. This enrues that Add() can be called
// concurrently.
a.lock.Lock()
Expand All @@ -286,7 +379,7 @@ func (a *aggregator) addNode(ctx context.Context, n *Node, gid groupID, parsed *
tree = newArtTree()
}
err := tree.Add(ctx, ExpressionPart{
GroupID: gid,
GroupID: n.GroupID,
Predicate: *n.Predicate,
Parsed: parsed,
})
Expand All @@ -299,18 +392,31 @@ func (a *aggregator) addNode(ctx context.Context, n *Node, gid groupID, parsed *
return errTreeUnimplemented
}

func (a *aggregator) Remove(ctx context.Context, eval Evaluable) error {
// parse the expression using our tree parser.
parsed, err := a.parser.Parse(ctx, eval)
if err != nil {
return err
}
func (a *aggregator) removeNode(ctx context.Context, n *Node, parsed *ParsedExpression) error {
// Don't allow anything to update in parallel. This enrues that Add() can be called
// concurrently.
a.lock.Lock()
defer a.lock.Unlock()

for _, g := range parsed.RootGroups() {
_ = g
// Each node is aggregateable, so add this to the map for fast filtering.
switch n.Predicate.TreeType() {
case TreeTypeART:
tree, ok := a.artIdents[n.Predicate.Ident]
if !ok {
tree = newArtTree()
}
err := tree.Remove(ctx, ExpressionPart{
GroupID: n.GroupID,
Predicate: *n.Predicate,
Parsed: parsed,
})
if err != nil {
return err
}
a.artIdents[n.Predicate.Ident] = tree
return nil
}

return fmt.Errorf("not implemented")
return errTreeUnimplemented
}

func isAggregateable(n *Node) bool {
Expand Down
Loading
Loading