Skip to content

Commit

Permalink
Rename dataspace to distribution and refactor relating code (#455)
Browse files Browse the repository at this point in the history
* Add rename dataspace to distribution and refactor related code

---------
Co-authored-by: Denis Volkov <[email protected]>
  • Loading branch information
reshke authored Jan 29, 2024
1 parent 8de3b58 commit 20e52fc
Show file tree
Hide file tree
Showing 98 changed files with 3,459 additions and 2,811 deletions.
4 changes: 2 additions & 2 deletions balancer/pkg/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ func (c BalancerClient) DB() string {
return "DefaultDB"
}

func (c BalancerClient) Dataspace() string {
func (c BalancerClient) Distribution() string {
return "default"
}

func (c BalancerClient) DataspaceIsDefault() bool {
func (c BalancerClient) DistributionIsDefault() bool {
return true
}

Expand Down
24 changes: 12 additions & 12 deletions coordinator/provider/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,15 +444,15 @@ func (qc *qdbCoordinator) getAllListShardingRules(ctx context.Context) ([]*shrul
}

// TODO : unit tests
func (qc *qdbCoordinator) ListShardingRules(ctx context.Context, dataspace string) ([]*shrule.ShardingRule, error) {
rulesList, err := qc.db.ListShardingRules(ctx, dataspace)
func (qc *qdbCoordinator) ListShardingRules(ctx context.Context, distribution string) ([]*shrule.ShardingRule, error) {
rulesList, err := qc.db.ListShardingRules(ctx, distribution)
if err != nil {
return nil, err
}

shRules := make([]*shrule.ShardingRule, 0, len(rulesList))
for _, rule := range rulesList {
if rule.DataspaceId == dataspace {
if rule.DistributionId == distribution {
shRules = append(shRules, shrule.ShardingRuleFromDB(rule))
}
}
Expand Down Expand Up @@ -579,8 +579,8 @@ func (qc *qdbCoordinator) AddKeyRange(ctx context.Context, keyRange *kr.KeyRange
}

// TODO : unit tests
func (qc *qdbCoordinator) ListKeyRanges(ctx context.Context, dataspace string) ([]*kr.KeyRange, error) {
keyRanges, err := qc.db.ListKeyRanges(ctx, dataspace)
func (qc *qdbCoordinator) ListKeyRanges(ctx context.Context, distribution string) ([]*kr.KeyRange, error) {
keyRanges, err := qc.db.ListKeyRanges(ctx, distribution)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -692,11 +692,11 @@ func (qc *qdbCoordinator) Split(ctx context.Context, req *kr.SplitKeyRange) erro

krNew := kr.KeyRangeFromDB(
&qdb.KeyRange{
LowerBound: req.Bound,
UpperBound: krOld.UpperBound,
KeyRangeID: req.Krid,
ShardID: krOld.ShardID,
DataspaceId: krOld.DataspaceId,
LowerBound: req.Bound,
UpperBound: krOld.UpperBound,
KeyRangeID: req.Krid,
ShardID: krOld.ShardID,
DistributionId: krOld.DistributionId,
},
)

Expand Down Expand Up @@ -844,8 +844,8 @@ func (qc *qdbCoordinator) Unite(ctx context.Context, uniteKeyRange *kr.UniteKeyR
if err := qc.traverseRouters(ctx, func(cc *grpc.ClientConn) error {
cl := routerproto.NewKeyRangeServiceClient(cc)
resp, err := cl.MergeKeyRange(ctx, &routerproto.MergeKeyRangeRequest{
Bound: krRight.LowerBound,
Dataspace: krRight.DataspaceId,
Bound: krRight.LowerBound,
Distribution: krRight.DistributionId,
})

spqrlog.Zero.Debug().
Expand Down
8 changes: 4 additions & 4 deletions coordinator/provider/keyranges.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ type CoordinatorService struct {
// TODO : unit tests
func (c *CoordinatorService) AddKeyRange(ctx context.Context, request *protos.AddKeyRangeRequest) (*protos.ModifyReply, error) {
err := c.impl.AddKeyRange(ctx, &kr.KeyRange{
LowerBound: []byte(request.KeyRangeInfo.KeyRange.LowerBound),
ID: request.KeyRangeInfo.Krid,
ShardID: request.KeyRangeInfo.ShardId,
Dataspace: "default",
LowerBound: []byte(request.KeyRangeInfo.KeyRange.LowerBound),
ID: request.KeyRangeInfo.Krid,
ShardID: request.KeyRangeInfo.ShardId,
Distribution: "default",
})
if err != nil {
return nil, err
Expand Down
3 changes: 3 additions & 0 deletions examples/2shardproxy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ grpc_api_port: '7010'

show_notice_messages: false

time_quantiles:
- 0.75

world_shard_fallback: true
router_mode: PROXY

Expand Down
49 changes: 26 additions & 23 deletions pkg/clientinteractor/interactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/pg-sharding/spqr/pkg/models/spqrerror"

"github.com/pg-sharding/spqr/pkg"
"github.com/pg-sharding/spqr/pkg/models/dataspaces"
"github.com/pg-sharding/spqr/pkg/models/distributions"
"github.com/pg-sharding/spqr/pkg/models/topology"
"github.com/pg-sharding/spqr/pkg/pool"
"github.com/pg-sharding/spqr/pkg/shard"
Expand Down Expand Up @@ -60,13 +60,13 @@ func (pi *PSQLInteractor) CompleteMsg(rowCnt int) error {
}

// TODO : unit tests
func (pi *PSQLInteractor) GetDataspace() string {
return pi.cl.Dataspace()
func (pi *PSQLInteractor) GetDistribution() string {
return pi.cl.Distribution()
}

// TODO : unit tests
func (pi *PSQLInteractor) SetDataspace(dataspace string) {
pi.cl.SetDataspace(dataspace)
func (pi *PSQLInteractor) SetDistribution(distribution string) {
pi.cl.SetDistribution(distribution)
}

// TEXTOID https://github.com/postgres/postgres/blob/master/src/include/catalog/pg_type.dat#L81
Expand Down Expand Up @@ -206,7 +206,7 @@ func (pi *PSQLInteractor) KeyRanges(krs []*kr.KeyRange) error {
&pgproto3.RowDescription{Fields: []pgproto3.FieldDescription{
TextOidFD("Key range ID"),
TextOidFD("Shard ID"),
TextOidFD("Dataspace ID"),
TextOidFD("Distribution ID"),
TextOidFD("Lower bound"),
},
},
Expand All @@ -222,7 +222,7 @@ func (pi *PSQLInteractor) KeyRanges(krs []*kr.KeyRange) error {
Values: [][]byte{
[]byte(keyRange.ID),
[]byte(keyRange.ShardID),
[]byte(keyRange.Dataspace),
[]byte(keyRange.Distribution),
keyRange.LowerBound,
},
}); err != nil {
Expand Down Expand Up @@ -494,7 +494,7 @@ func (pi *PSQLInteractor) ShardingRules(ctx context.Context, rules []*shrule.Sha
for _, msg := range []pgproto3.BackendMessage{
&pgproto3.RowDescription{Fields: []pgproto3.FieldDescription{
TextOidFD("Sharding Rule ID"),
TextOidFD("Dataspace ID"),
TextOidFD("Distribution ID"),
TextOidFD("Table Name"),
TextOidFD("Columns"),
TextOidFD("Hash Function"),
Expand Down Expand Up @@ -526,7 +526,7 @@ func (pi *PSQLInteractor) ShardingRules(ctx context.Context, rules []*shrule.Sha
if err := pi.cl.Send(&pgproto3.DataRow{
Values: [][]byte{
[]byte(rule.Id),
[]byte(rule.Dataspace),
[]byte(rule.Distribution),
[]byte(tableName),
[]byte(entries.String()),
[]byte(hashFunctions.String()),
Expand All @@ -541,21 +541,21 @@ func (pi *PSQLInteractor) ShardingRules(ctx context.Context, rules []*shrule.Sha
}

// TODO : unit tests
func (pi *PSQLInteractor) Dataspaces(ctx context.Context, dataspaces []*dataspaces.Dataspace) error {
func (pi *PSQLInteractor) Distributions(ctx context.Context, distributions []*distributions.Distribution) error {
for _, msg := range []pgproto3.BackendMessage{
&pgproto3.RowDescription{Fields: []pgproto3.FieldDescription{
TextOidFD("Dataspace ID"),
TextOidFD("Distribution ID"),
}},
} {
if err := pi.cl.Send(msg); err != nil {
spqrlog.Zero.Error().Err(err).Msg("")
return err
}
}
for _, dataspace := range dataspaces {
for _, distribution := range distributions {
if err := pi.cl.Send(&pgproto3.DataRow{
Values: [][]byte{
[]byte(dataspace.Id),
[]byte(distribution.Id),
},
}); err != nil {
spqrlog.Zero.Error().Err(err).Msg("")
Expand Down Expand Up @@ -763,28 +763,28 @@ func (pi *PSQLInteractor) DropKeyRange(ctx context.Context, ids []string) error
}

// TODO : unit tests
func (pi *PSQLInteractor) AddDataspace(ctx context.Context, ks *dataspaces.Dataspace) error {
if err := pi.WriteHeader("add dataspace"); err != nil {
func (pi *PSQLInteractor) AddDistribution(ctx context.Context, ks *distributions.Distribution) error {
if err := pi.WriteHeader("add distribution"); err != nil {
spqrlog.Zero.Error().Err(err).Msg("")
return err
}

if err := pi.WriteDataRow(fmt.Sprintf("created dataspace with id %s", ks.ID())); err != nil {
if err := pi.WriteDataRow(fmt.Sprintf("created distribution with id %s", ks.ID())); err != nil {
spqrlog.Zero.Error().Err(err).Msg("")
return err
}
return pi.CompleteMsg(0)
}

// TODO : unit tests
func (pi *PSQLInteractor) DropDataspace(ctx context.Context, ids []string) error {
if err := pi.WriteHeader("drop dataspace"); err != nil {
func (pi *PSQLInteractor) DropDistribution(ctx context.Context, ids []string) error {
if err := pi.WriteHeader("drop distribution"); err != nil {
spqrlog.Zero.Error().Err(err).Msg("")
return err
}

for _, id := range ids {
if err := pi.WriteDataRow(fmt.Sprintf("drop dataspace %s", id)); err != nil {
if err := pi.WriteDataRow(fmt.Sprintf("drop distribution %s", id)); err != nil {
spqrlog.Zero.Error().Err(err).Msg("")
return err
}
Expand All @@ -794,16 +794,19 @@ func (pi *PSQLInteractor) DropDataspace(ctx context.Context, ids []string) error
}

// TODO : unit tests
func (pi *PSQLInteractor) AttachTable(ctx context.Context, table string, ds *dataspaces.Dataspace) error {
func (pi *PSQLInteractor) AlterDistributionAttach(ctx context.Context, id string, ds []*distributions.DistributedRelation) error {
if err := pi.WriteHeader("attach table"); err != nil {
spqrlog.Zero.Error().Err(err).Msg("")
return err
}

if err := pi.WriteDataRow(fmt.Sprintf("attached table %s to dataspace %s", table, ds.ID())); err != nil {
spqrlog.Zero.Error().Err(err).Msg("")
return err
for _, r := range ds {
if err := pi.WriteDataRow(fmt.Sprintf("attached relation %s to distribution %s", r.Name, id)); err != nil {
spqrlog.Zero.Error().Err(err).Msg("")
return err
}
}

return pi.CompleteMsg(0)
}

Expand Down
Loading

0 comments on commit 20e52fc

Please sign in to comment.