Skip to content

Commit

Permalink
simple meta load testing
Browse files Browse the repository at this point in the history
  • Loading branch information
HarrisChu committed May 29, 2024
1 parent 183053c commit d8a7bb5
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 8 deletions.
11 changes: 11 additions & 0 deletions example/meta.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import meta from 'k6/x/nebulameta';

let client = meta.open("192.168.15.31", 10510)

export default function(data) {
client.auth("root", "nebula", "192.168.15.31:10210")
};

export function teardown() {
meta.close()
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/sirupsen/logrus v1.9.0
github.com/spf13/cobra v1.4.0
github.com/stretchr/testify v1.8.2
github.com/vesoft-inc/fbthrift v0.0.0-20230214024353-fa2f34755b28
github.com/vesoft-inc/nebula-go/v3 v3.6.1
go.k6.io/k6 v0.45.1
)
Expand All @@ -31,7 +32,6 @@ require (
github.com/serenize/snaker v0.0.0-20201027110005-a7ad2135616e // indirect
github.com/spf13/afero v1.1.2 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/vesoft-inc/fbthrift v0.0.0-20230214024353-fa2f34755b28 // indirect
golang.org/x/net v0.12.0 // indirect
golang.org/x/sys v0.10.0 // indirect
golang.org/x/text v0.11.0 // indirect
Expand Down
6 changes: 5 additions & 1 deletion pkg/common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type (
CsvOption `json:",inline"`
RetryOption `json:",inline"`
SSLOption `json:",inline"`
Http2Option `json:",inline"`
}

PoolOption struct {
Expand All @@ -68,7 +69,6 @@ type (
Username string `json:"username"`
Password string `json:"password"`
Space string `json:"space"`
UseHttp bool `json:"use_http"`
}

OutputOption struct {
Expand All @@ -94,6 +94,10 @@ type (
RetryIntervalUs int `json:"retry_interval_us"`
RetryTimeoutUs int `json:"retry_timeout_us"`
}
Http2Option struct {
HttpEnable bool `json:"http_enable"`
HttpHeader map[string][]string `json:"http_header"`
}
)

const (
Expand Down
11 changes: 5 additions & 6 deletions pkg/nebulagraph/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,9 @@ func (gp *GraphPool) initConnectionPool() error {
conf.MinConnPoolSize = gp.graphOption.MinSize
conf.TimeOut = time.Duration(gp.graphOption.TimeoutUs) * time.Microsecond
conf.IdleTime = time.Duration(gp.graphOption.IdleTimeUs) * time.Microsecond
conf.UseHTTP2 = gp.graphOption.HttpEnable
conf.HttpHeader = gp.graphOption.HttpHeader

var sslConfig *tls.Config
if gp.graphOption.SslCaPemPath != "" {
var err error
Expand All @@ -181,10 +184,6 @@ func (gp *GraphPool) initConnectionPool() error {
return err
}
}
if gp.graphOption.UseHttp {
conf.UseHTTP2 = true
}

pool, err := graph.NewSslConnectionPool(hosts, conf, sslConfig, graph.DefaultLogger{})
if err != nil {
return err
Expand Down Expand Up @@ -221,7 +220,8 @@ func (gp *GraphPool) initSessionPool() error {
graph.WithMaxSize(gp.graphOption.MaxSize),
graph.WithMinSize(gp.graphOption.MinSize),
graph.WithSSLConfig(sslConfig),
graph.WithHTTP2(gp.graphOption.UseHttp),
graph.WithHTTP2(gp.graphOption.HttpEnable),
graph.WithHttpHeader(gp.graphOption.HttpHeader),
)
if err != nil {
return err
Expand Down Expand Up @@ -383,7 +383,6 @@ func (gc *GraphClient) executeRetry(stmt string) (*graph.ResultSet, error) {

// Execute executes nebula query
func (gc *GraphClient) Execute(stmt string) (common.IGraphResponse, error) {
stmt = common.ProcessStmt(stmt)
start := time.Now()
var (
o *output
Expand Down
92 changes: 92 additions & 0 deletions pkg/nebulameta/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package nebulameta

import (
"fmt"
"net"
"strconv"

"github.com/vesoft-inc/fbthrift/thrift/lib/go/thrift"
"github.com/vesoft-inc/nebula-go/v3/nebula"
"github.com/vesoft-inc/nebula-go/v3/nebula/meta"
)

type module struct {
clients []*metaClient
}

type metaClient struct {
client *meta.MetaServiceClient
host string
port int
}

func New() *module {
return &module{}
}

func (c *module) Open(host string, port int) (*metaClient, error) {
client := &metaClient{
host: host,
port: port,
}
if err := client.open(); err != nil {
return nil, err
}
c.clients = append(c.clients, client)
return client, nil
}

func (c *metaClient) open() error {
newAdd := net.JoinHostPort(c.host, strconv.Itoa(c.port))
sock, err := thrift.NewSocket(thrift.SocketAddr(newAdd))
if err != nil {
return err
}
// Set transport
bufferSize := 128 << 10
bufferedTranFactory := thrift.NewBufferedTransportFactory(bufferSize)
transport := thrift.NewHeaderTransport(bufferedTranFactory.GetTransport(sock))
pf := thrift.NewHeaderProtocolFactory()

c.client = meta.NewMetaServiceClientFactory(transport, pf)
if err := transport.Open(); err != nil {
return err
}
return nil
}

func (c *metaClient) Auth(username, password string, graphAddr string) error {
req := meta.NewCreateSessionReq()
req.User = []byte(username)
h, p, err := net.SplitHostPort(graphAddr)
if err != nil {
return err
}
port, err := strconv.Atoi(p)
if err != nil {
return err
}
req.GraphAddr = &nebula.HostAddr{
Host: h,
Port: int32(port),
}
resp, err := c.client.CreateSession(req)

if err != nil {
return err
}
if resp.GetCode() != nebula.ErrorCode_SUCCEEDED {
return fmt.Errorf("auth failed, code: %d", resp.GetCode())
}
return nil
}

func (c *metaClient) Close() error {
return c.client.Close()
}

func (m *module) Close() {
for _, c := range m.clients {
c.Close()
}
}
2 changes: 2 additions & 0 deletions register.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package k6plugin
import (
"github.com/vesoft-inc/k6-plugin/pkg/aggcsv"
"github.com/vesoft-inc/k6-plugin/pkg/nebulagraph"
"github.com/vesoft-inc/k6-plugin/pkg/nebulameta"
"go.k6.io/k6/js/modules"
"go.k6.io/k6/output"
)

func init() {
modules.Register("k6/x/nebulagraph", nebulagraph.NewModule())
modules.Register("k6/x/nebulameta", nebulameta.New())
output.RegisterExtension("aggcsv", func(p output.Params) (output.Output, error) {
return aggcsv.New(p)
})
Expand Down

0 comments on commit d8a7bb5

Please sign in to comment.