Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: ctlstore oom #126

Closed
wants to merge 5 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 148 additions & 4 deletions ldb_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,23 @@ import (
"context"
"database/sql"
"fmt"
"github.com/segmentio/events/v2"
"github.com/segmentio/stats/v4"

"os"
"path/filepath"
"strconv"
"strings"
"sync"
"time"

"github.com/segmentio/errors-go"
"github.com/segmentio/events/v2"
"github.com/segmentio/stats/v4"

"github.com/segmentio/ctlstore/pkg/errs"
"github.com/segmentio/ctlstore/pkg/globalstats"
"github.com/segmentio/ctlstore/pkg/ldb"
"github.com/segmentio/ctlstore/pkg/scanfunc"
"github.com/segmentio/ctlstore/pkg/schema"
"github.com/segmentio/ctlstore/pkg/sqlgen"
"github.com/segmentio/errors-go"
)

// LDBReader reads data from the LDB. The external interface is
Expand Down Expand Up @@ -198,6 +198,130 @@ func (reader *LDBReader) GetRowsByKeyPrefix(ctx context.Context, familyName stri
}
}

// GetRowsByKeyPrefixLike returns a *Rows iterator that will supply all of the rows in
// the family and table match the supplied primary key prefix.
func (reader *LDBReader) GetRowsByKeyPrefixLike(ctx context.Context, familyName string, tableName string, key interface{}) (*Rows, error) {
ctx = discardContext()
start := time.Now()
defer func() {
globalstats.Observe("get_rows_by_key_prefix", time.Now().Sub(start),
stats.T("family", familyName),
stats.T("table", tableName))
}()

reader.mu.RLock()
defer reader.mu.RUnlock()
famName, err := schema.NewFamilyName(familyName)
if err != nil {
return nil, err
}
tblName, err := schema.NewTableName(tableName)
if err != nil {
return nil, err
}
ldbTable := schema.LDBTableName(famName, tblName)
pk, err := reader.getPrimaryKey(ctx, ldbTable)
if err != nil {
return nil, err
}
if pk.Zero() {
return nil, ErrTableHasNoPrimaryKey
}
//if len(key) > len(pk.Fields) {
// return nil, errors.New("too many keys supplied for table's primary key")
//}
err = convertOneKeyBeforeQuery(pk, key)
if err != nil {
return nil, err
}
stmt, err := reader.getRowsByKeyPrefixStmtLike(ctx, pk, ldbTable, 1)
if err != nil {
return nil, err
}
//if len(key) == 0 {
// globalstats.Incr("full-table-scans", familyName, tableName)
//}
// Initialize a slice to hold the modified keys
var modifiedKeys []interface{}

//for _, k := range key {
// Modify each key to include % characters
modifiedKey := "%" + key.(string) + "%" // Ensure k is converted to a string
modifiedKeys = append(modifiedKeys, modifiedKey)
//}
rows, err := stmt.QueryContext(ctx, modifiedKeys...)
switch {
case err == nil:
cols, err := schema.DBColumnMetaFromRows(rows)
if err != nil {
return nil, err
}
res := &Rows{rows: rows, cols: cols}
return res, nil
case err == sql.ErrNoRows:
return &Rows{}, nil
default:
return nil, err
}
}
func (reader *LDBReader) getRowsByKeyPrefixStmtLike(ctx context.Context, pk schema.PrimaryKey, ldbTable string, numKeys int) (*sql.Stmt, error) {
// assumes RLock is held
if reader.getRowsByKeyPrefixStmtCache == nil {
reader.mu.RUnlock()
reader.mu.Lock()
// double check because there could be a race which would result
// in us wiping out the cache
if reader.getRowsByKeyPrefixStmtCache == nil {
reader.getRowsByKeyPrefixStmtCache = make(map[prefixCacheKey]*sql.Stmt)
}
reader.mu.Unlock()
reader.mu.RLock()
}
pck := prefixCacheKey{ldbTableName: ldbTable, numKeys: numKeys}
stmt, found := reader.getRowsByKeyPrefixStmtCache[pck]
if found {
return stmt, nil
}

reader.mu.RUnlock()
defer reader.mu.RLock()
reader.mu.Lock()
defer reader.mu.Unlock()

qsTokens := []string{
"SELECT * FROM",
ldbTable,
}
if numKeys > 0 {
qsTokens = append(qsTokens, "WHERE")
events.Log("pk fields %v", pk.Fields)
//for i := 0; i < numKeys; i++ {
pkField := pk.Fields[2]
events.Log("pk fields %v", pk.Fields[2])
//if i > 0 {
// qsTokens = append(qsTokens, "AND")
//}
//operator := "="
/* for targetId using like operator */
//if i == 2 {
// operator = "LIKE"
//}
qsTokens = append(qsTokens,
pkField.Name,
"LIKE",
"?")
//}
}
qs := strings.Join(qsTokens, " ")
events.Log("query string is %v", qs)
events.Log("query token string is %v", qsTokens)
stmt, err := reader.Db.PrepareContext(ctx, qs)
if err == nil {
reader.getRowsByKeyPrefixStmtCache[pck] = stmt
}
return stmt, err
}

// GetRowByKey fetches a row from the supplied table by the key parameter,
// filling the data into the out param.
//
Expand Down Expand Up @@ -387,6 +511,26 @@ func convertKeyBeforeQuery(pk schema.PrimaryKey, key []interface{}) error {
return nil
}

// ensure that a supplied key is converted appropriately with respect
// to the type of each PK column.
func convertOneKeyBeforeQuery(pk schema.PrimaryKey, key interface{}) error {
//for i, k := range key {
// sanity check on th elength of the pk field type slice
//if i >= len(pk.Types) {
// return errors.New("insufficient key field type data")
//}
pkt := pk.Types[2]
switch k := key.(type) {
case string:
switch pkt {
case schema.FTBinary, schema.FTByteString:
// convert the key from a string -> []byte so that the
// types match, otherwise it won't find the row.
key = []byte(k)
}
}
return nil
}
func (reader *LDBReader) lock() {
reader.mu.Lock()
}
Expand Down