From 193972925de4460e094009be1285077bcd531a33 Mon Sep 17 00:00:00 2001 From: Preety Papneja Date: Tue, 10 Oct 2023 15:35:40 +0530 Subject: [PATCH 1/5] test: Testing --- ldb_reader.go | 115 ++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 111 insertions(+), 4 deletions(-) diff --git a/ldb_reader.go b/ldb_reader.go index 1aea253c..c8c03c64 100644 --- a/ldb_reader.go +++ b/ldb_reader.go @@ -4,6 +4,9 @@ import ( "context" "database/sql" "fmt" + "github.com/segmentio/events/v2" + "github.com/segmentio/stats/v4" + "os" "path/filepath" "strconv" @@ -11,16 +14,13 @@ import ( "sync" "time" - "github.com/segmentio/errors-go" - "github.com/segmentio/events/v2" - "github.com/segmentio/stats/v4" - "github.com/segmentio/ctlstore/pkg/errs" "github.com/segmentio/ctlstore/pkg/globalstats" "github.com/segmentio/ctlstore/pkg/ldb" "github.com/segmentio/ctlstore/pkg/scanfunc" "github.com/segmentio/ctlstore/pkg/schema" "github.com/segmentio/ctlstore/pkg/sqlgen" + "github.com/segmentio/errors-go" ) // LDBReader reads data from the LDB. The external interface is @@ -198,6 +198,113 @@ func (reader *LDBReader) GetRowsByKeyPrefix(ctx context.Context, familyName stri } } +// GetRowsByKeyPrefixLike returns a *Rows iterator that will supply all of the rows in +// the family and table match the supplied primary key prefix. +func (reader *LDBReader) GetRowsByKeyPrefixLike(ctx context.Context, familyName string, tableName string, key ...interface{}) (*Rows, error) { + ctx = discardContext() + start := time.Now() + defer func() { + globalstats.Observe("get_rows_by_key_prefix", time.Now().Sub(start), + stats.T("family", familyName), + stats.T("table", tableName)) + }() + + reader.mu.RLock() + defer reader.mu.RUnlock() + famName, err := schema.NewFamilyName(familyName) + if err != nil { + return nil, err + } + tblName, err := schema.NewTableName(tableName) + if err != nil { + return nil, err + } + ldbTable := schema.LDBTableName(famName, tblName) + pk, err := reader.getPrimaryKey(ctx, ldbTable) + if err != nil { + return nil, err + } + if pk.Zero() { + return nil, ErrTableHasNoPrimaryKey + } + if len(key) > len(pk.Fields) { + return nil, errors.New("too many keys supplied for table's primary key") + } + err = convertKeyBeforeQuery(pk, key) + if err != nil { + return nil, err + } + stmt, err := reader.getRowsByKeyPrefixStmtLike(ctx, pk, ldbTable, len(key)) + if err != nil { + return nil, err + } + if len(key) == 0 { + globalstats.Incr("full-table-scans", familyName, tableName) + } + rows, err := stmt.QueryContext(ctx, key...) + switch { + case err == nil: + cols, err := schema.DBColumnMetaFromRows(rows) + if err != nil { + return nil, err + } + res := &Rows{rows: rows, cols: cols} + return res, nil + case err == sql.ErrNoRows: + return &Rows{}, nil + default: + return nil, err + } +} +func (reader *LDBReader) getRowsByKeyPrefixStmtLike(ctx context.Context, pk schema.PrimaryKey, ldbTable string, numKeys int) (*sql.Stmt, error) { + // assumes RLock is held + if reader.getRowsByKeyPrefixStmtCache == nil { + reader.mu.RUnlock() + reader.mu.Lock() + // double check because there could be a race which would result + // in us wiping out the cache + if reader.getRowsByKeyPrefixStmtCache == nil { + reader.getRowsByKeyPrefixStmtCache = make(map[prefixCacheKey]*sql.Stmt) + } + reader.mu.Unlock() + reader.mu.RLock() + } + pck := prefixCacheKey{ldbTableName: ldbTable, numKeys: numKeys} + stmt, found := reader.getRowsByKeyPrefixStmtCache[pck] + if found { + return stmt, nil + } + + reader.mu.RUnlock() + defer reader.mu.RLock() + reader.mu.Lock() + defer reader.mu.Unlock() + + qsTokens := []string{ + "SELECT * FROM", + ldbTable, + } + if numKeys > 0 { + qsTokens = append(qsTokens, "WHERE") + for i := 0; i < numKeys; i++ { + pkField := pk.Fields[i] + if i > 0 { + qsTokens = append(qsTokens, "AND") + } + qsTokens = append(qsTokens, + pkField.Name, + "LIKE", + "?") + } + } + qs := strings.Join(qsTokens, " ") + stmt, err := reader.Db.PrepareContext(ctx, qs) + if err == nil { + reader.getRowsByKeyPrefixStmtCache[pck] = stmt + } + return stmt, err +} + // GetRowByKey fetches a row from the supplied table by the key parameter, // filling the data into the out param. // From 4d81616a0931582e5da16d7853c70d659cca1a1d Mon Sep 17 00:00:00 2001 From: Preety Papneja Date: Tue, 10 Oct 2023 21:56:51 +0530 Subject: [PATCH 2/5] test: Testing --- ldb_reader.go | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/ldb_reader.go b/ldb_reader.go index c8c03c64..6fc62451 100644 --- a/ldb_reader.go +++ b/ldb_reader.go @@ -241,7 +241,15 @@ func (reader *LDBReader) GetRowsByKeyPrefixLike(ctx context.Context, familyName if len(key) == 0 { globalstats.Incr("full-table-scans", familyName, tableName) } - rows, err := stmt.QueryContext(ctx, key...) + // Initialize a slice to hold the modified keys + var modifiedKeys []interface{} + + for _, k := range key { + // Modify each key to include % characters + modifiedKey := "%" + k.(string) + "%" // Ensure k is converted to a string + modifiedKeys = append(modifiedKeys, modifiedKey) + } + rows, err := stmt.QueryContext(ctx, modifiedKeys...) switch { case err == nil: cols, err := schema.DBColumnMetaFromRows(rows) @@ -298,6 +306,8 @@ func (reader *LDBReader) getRowsByKeyPrefixStmtLike(ctx context.Context, pk sche } } qs := strings.Join(qsTokens, " ") + events.Log("query string is %v", qs) + events.Log("query token string is %v", qsTokens) stmt, err := reader.Db.PrepareContext(ctx, qs) if err == nil { reader.getRowsByKeyPrefixStmtCache[pck] = stmt From cefcc142b902acaf8dbe594eb085d88f758b5619 Mon Sep 17 00:00:00 2001 From: Preety Papneja Date: Tue, 10 Oct 2023 22:47:03 +0530 Subject: [PATCH 3/5] test: Testing --- ldb_reader.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ldb_reader.go b/ldb_reader.go index 6fc62451..59d49d8a 100644 --- a/ldb_reader.go +++ b/ldb_reader.go @@ -294,13 +294,13 @@ func (reader *LDBReader) getRowsByKeyPrefixStmtLike(ctx context.Context, pk sche } if numKeys > 0 { qsTokens = append(qsTokens, "WHERE") + events.Log("pk fields %v", pk.Fields) for i := 0; i < numKeys; i++ { - pkField := pk.Fields[i] if i > 0 { qsTokens = append(qsTokens, "AND") } qsTokens = append(qsTokens, - pkField.Name, + "target_id", "LIKE", "?") } From edb9a2d87fedc59f9bd36af53cdaf757994aa5c4 Mon Sep 17 00:00:00 2001 From: Preety Papneja Date: Wed, 11 Oct 2023 01:34:55 +0530 Subject: [PATCH 4/5] test: Testing --- ldb_reader.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/ldb_reader.go b/ldb_reader.go index 59d49d8a..f4e5b1e0 100644 --- a/ldb_reader.go +++ b/ldb_reader.go @@ -296,11 +296,13 @@ func (reader *LDBReader) getRowsByKeyPrefixStmtLike(ctx context.Context, pk sche qsTokens = append(qsTokens, "WHERE") events.Log("pk fields %v", pk.Fields) for i := 0; i < numKeys; i++ { + pkField := pk.Fields[2] + events.Log("pk fields %v", pk.Fields[i]) if i > 0 { qsTokens = append(qsTokens, "AND") } qsTokens = append(qsTokens, - "target_id", + pkField.Name, "LIKE", "?") } From c6253d25b83a65cad4247594b135fcb7e4ee99e9 Mon Sep 17 00:00:00 2001 From: Preety Papneja Date: Wed, 11 Oct 2023 23:38:57 +0530 Subject: [PATCH 5/5] test: Testing --- ldb_reader.go | 75 ++++++++++++++++++++++++++++++++++----------------- 1 file changed, 50 insertions(+), 25 deletions(-) diff --git a/ldb_reader.go b/ldb_reader.go index f4e5b1e0..a49690a8 100644 --- a/ldb_reader.go +++ b/ldb_reader.go @@ -200,7 +200,7 @@ func (reader *LDBReader) GetRowsByKeyPrefix(ctx context.Context, familyName stri // GetRowsByKeyPrefixLike returns a *Rows iterator that will supply all of the rows in // the family and table match the supplied primary key prefix. -func (reader *LDBReader) GetRowsByKeyPrefixLike(ctx context.Context, familyName string, tableName string, key ...interface{}) (*Rows, error) { +func (reader *LDBReader) GetRowsByKeyPrefixLike(ctx context.Context, familyName string, tableName string, key interface{}) (*Rows, error) { ctx = discardContext() start := time.Now() defer func() { @@ -227,28 +227,28 @@ func (reader *LDBReader) GetRowsByKeyPrefixLike(ctx context.Context, familyName if pk.Zero() { return nil, ErrTableHasNoPrimaryKey } - if len(key) > len(pk.Fields) { - return nil, errors.New("too many keys supplied for table's primary key") - } - err = convertKeyBeforeQuery(pk, key) + //if len(key) > len(pk.Fields) { + // return nil, errors.New("too many keys supplied for table's primary key") + //} + err = convertOneKeyBeforeQuery(pk, key) if err != nil { return nil, err } - stmt, err := reader.getRowsByKeyPrefixStmtLike(ctx, pk, ldbTable, len(key)) + stmt, err := reader.getRowsByKeyPrefixStmtLike(ctx, pk, ldbTable, 1) if err != nil { return nil, err } - if len(key) == 0 { - globalstats.Incr("full-table-scans", familyName, tableName) - } + //if len(key) == 0 { + // globalstats.Incr("full-table-scans", familyName, tableName) + //} // Initialize a slice to hold the modified keys var modifiedKeys []interface{} - for _, k := range key { - // Modify each key to include % characters - modifiedKey := "%" + k.(string) + "%" // Ensure k is converted to a string - modifiedKeys = append(modifiedKeys, modifiedKey) - } + //for _, k := range key { + // Modify each key to include % characters + modifiedKey := "%" + key.(string) + "%" // Ensure k is converted to a string + modifiedKeys = append(modifiedKeys, modifiedKey) + //} rows, err := stmt.QueryContext(ctx, modifiedKeys...) switch { case err == nil: @@ -295,17 +295,22 @@ func (reader *LDBReader) getRowsByKeyPrefixStmtLike(ctx context.Context, pk sche if numKeys > 0 { qsTokens = append(qsTokens, "WHERE") events.Log("pk fields %v", pk.Fields) - for i := 0; i < numKeys; i++ { - pkField := pk.Fields[2] - events.Log("pk fields %v", pk.Fields[i]) - if i > 0 { - qsTokens = append(qsTokens, "AND") - } - qsTokens = append(qsTokens, - pkField.Name, - "LIKE", - "?") - } + //for i := 0; i < numKeys; i++ { + pkField := pk.Fields[2] + events.Log("pk fields %v", pk.Fields[2]) + //if i > 0 { + // qsTokens = append(qsTokens, "AND") + //} + //operator := "=" + /* for targetId using like operator */ + //if i == 2 { + // operator = "LIKE" + //} + qsTokens = append(qsTokens, + pkField.Name, + "LIKE", + "?") + //} } qs := strings.Join(qsTokens, " ") events.Log("query string is %v", qs) @@ -506,6 +511,26 @@ func convertKeyBeforeQuery(pk schema.PrimaryKey, key []interface{}) error { return nil } +// ensure that a supplied key is converted appropriately with respect +// to the type of each PK column. +func convertOneKeyBeforeQuery(pk schema.PrimaryKey, key interface{}) error { + //for i, k := range key { + // sanity check on th elength of the pk field type slice + //if i >= len(pk.Types) { + // return errors.New("insufficient key field type data") + //} + pkt := pk.Types[2] + switch k := key.(type) { + case string: + switch pkt { + case schema.FTBinary, schema.FTByteString: + // convert the key from a string -> []byte so that the + // types match, otherwise it won't find the row. + key = []byte(k) + } + } + return nil +} func (reader *LDBReader) lock() { reader.mu.Lock() }