Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(log): better log outputs when dispatching to downstreams #644

Merged
merged 7 commits into from
Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 32 additions & 40 deletions core/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ import (
// Client is the abstraction of a YoMo-Client. a YoMo-Client can be
// Source, Upstream Zipper or StreamFunction.
type Client struct {
zipperAddr string
name string // name of the client
clientID string // id of the client
clientType ClientType // type of the client
processor func(*frame.DataFrame) // function to invoke when data arrived
receiver func(*frame.BackflowFrame) // function to invoke when data is processed
errorfn func(error) // function to invoke when error occured
opts *clientOptions
logger *slog.Logger
Logger *slog.Logger
tracerProvider oteltrace.TracerProvider

// ctx and ctxCancel manage the lifecycle of client.
Expand All @@ -39,30 +40,27 @@ type Client struct {
}

// NewClient creates a new YoMo-Client.
func NewClient(appName string, clientType ClientType, opts ...ClientOption) *Client {
func NewClient(appName, zipperAddr string, clientType ClientType, opts ...ClientOption) *Client {
option := defaultClientOption()

for _, o := range opts {
o(option)
}
clientID := id.New()

logger := option.logger.With("component", clientType.String(), "client_id", clientID, "client_name", appName)

if option.credential != nil {
logger.Info("use credential", "credential_name", option.credential.Name())
}
logger := option.logger

ctx, ctxCancel := context.WithCancelCause(context.Background())

return &Client{
zipperAddr: zipperAddr,
name: appName,
clientID: clientID,
processor: func(df *frame.DataFrame) { logger.Warn("the processor has not been set") },
receiver: func(bf *frame.BackflowFrame) { logger.Warn("the receiver has not been set") },
clientType: clientType,
opts: option,
logger: logger,
Logger: logger,
tracerProvider: option.tracerProvider,
errorfn: func(err error) { logger.Error("client err", "err", err) },
writeFrameChan: make(chan frame.Frame),
Expand All @@ -85,8 +83,8 @@ func newConnectResult(conn quic.Connection, fs *FrameStream, err error) *connect
}
}

func (c *Client) connect(ctx context.Context, addr string) *connectResult {
conn, err := quic.DialAddr(ctx, addr, c.opts.tlsConfig, c.opts.quicConfig)
func (c *Client) connect(ctx context.Context) *connectResult {
conn, err := quic.DialAddr(ctx, c.zipperAddr, c.opts.tlsConfig, c.opts.quicConfig)
if err != nil {
return newConnectResult(conn, nil, err)
}
Expand All @@ -98,6 +96,10 @@ func (c *Client) connect(ctx context.Context, addr string) *connectResult {

fs := NewFrameStream(stream, y3codec.Codec(), y3codec.PacketReadWriter())

if credential := c.opts.credential; credential != nil {
c.Logger.Info("use credential", "credential_name", credential.Name())
}

hf := &frame.HandshakeFrame{
Name: c.name,
ID: c.clientID,
Expand Down Expand Up @@ -130,7 +132,7 @@ func (c *Client) connect(ctx context.Context, addr string) *connectResult {
}
}

func (c *Client) runBackground(ctx context.Context, addr string, conn quic.Connection, fs *FrameStream) {
func (c *Client) runBackground(ctx context.Context, conn quic.Connection, fs *FrameStream) {
reconnection := make(chan struct{})

go c.handleReadFrames(fs, reconnection)
Expand All @@ -149,12 +151,12 @@ func (c *Client) runBackground(ctx context.Context, addr string, conn quic.Conne
}
case <-reconnection:
reconnect:
cr := c.connect(ctx, addr)
cr := c.connect(ctx)
if err := cr.err; err != nil {
if errors.As(err, new(ErrAuthenticateFailed)) {
return
}
c.logger.Error("reconnect to zipper error", "err", cr.err)
c.Logger.Error("reconnect to zipper error", "err", cr.err)
time.Sleep(time.Second)
goto reconnect
}
Expand All @@ -165,27 +167,21 @@ func (c *Client) runBackground(ctx context.Context, addr string, conn quic.Conne
}

// Connect connect client to server.
func (c *Client) Connect(ctx context.Context, addr string) error {
if c.clientType == ClientTypeStreamFunction && len(c.opts.observeDataTags) == 0 {
return errors.New("yomo: streamFunction cannot observe data because the required tag has not been set")
}

c.logger = c.logger.With("zipper_addr", addr)

func (c *Client) Connect(ctx context.Context) error {
connect:
result := c.connect(ctx, addr)
result := c.connect(ctx)
if result.err != nil {
if c.opts.connectUntilSucceed {
c.logger.Error("failed to connect to zipper, trying to reconnect", "err", result.err)
c.Logger.Error("failed to connect to zipper, trying to reconnect", "err", result.err)
time.Sleep(time.Second)
goto connect
}
c.logger.Error("can not connect to zipper", "err", result.err)
c.Logger.Error("can not connect to zipper", "err", result.err)
return result.err
}
c.logger.Info("connected to zipper")
c.Logger.Info("connected to zipper")

go c.runBackground(ctx, addr, result.conn, result.fs)
go c.runBackground(ctx, result.conn, result.fs)

return nil
}
Expand Down Expand Up @@ -217,7 +213,7 @@ func (c *Client) nonBlockWriteFrame(f frame.Frame) error {
return nil
default:
err := errors.New("yomo: client has lost connection")
c.logger.Debug("failed to write frame", "frame_type", f.Type().String(), "error", err)
c.Logger.Debug("failed to write frame", "frame_type", f.Type().String(), "error", err)
return err
}
}
Expand Down Expand Up @@ -275,7 +271,7 @@ func (c *Client) handleReadFrames(fs *FrameStream, reconnection chan struct{}) {
buf = buf[:runtime.Stack(buf, false)]

perr := fmt.Errorf("%v", e)
c.logger.Error("stream panic", "err", perr)
c.Logger.Error("stream panic", "err", perr)
c.errorfn(fmt.Errorf("yomo: stream panic: %v\n%s", perr, buf))
}
}()
Expand All @@ -287,14 +283,14 @@ func (c *Client) handleReadFrames(fs *FrameStream, reconnection chan struct{}) {
func (c *Client) handleFrame(f frame.Frame) {
switch ff := f.(type) {
case *frame.RejectedFrame:
c.logger.Error("rejected error", "err", ff.Message)
c.Logger.Error("rejected error", "err", ff.Message)
_ = c.Close()
case *frame.DataFrame:
c.processor(ff)
case *frame.BackflowFrame:
c.receiver(ff)
default:
c.logger.Warn("received unexpected frame", "frame_type", f.Type().String())
c.Logger.Warn("received unexpected frame", "frame_type", f.Type().String())
}
}

Expand All @@ -313,15 +309,10 @@ func (c *Client) SetObserveDataTags(tag ...frame.Tag) {
c.opts.observeDataTags = tag
}

// Logger get client's logger instance, you can customize this using `yomo.WithLogger`
func (c *Client) Logger() *slog.Logger {
return c.logger
}

// SetErrorHandler set error handler
func (c *Client) SetErrorHandler(fn func(err error)) {
c.errorfn = fn
c.logger.Debug("the error handler has been set")
c.Logger.Debug("the error handler has been set")
}

// ClientID returns the ID of client.
Expand All @@ -330,13 +321,14 @@ func (c *Client) ClientID() string { return c.clientID }
// Name returns the name of client.
func (c *Client) Name() string { return c.name }

// FrameWriterConnection represents a frame writer that can connect to an addr.
type FrameWriterConnection interface {
// Downstream represents a frame writer that can connect to an addr.
type Downstream interface {
frame.Writer
ClientID() string
Name() string
ID() string
LocalName() string
RemoteName() string
Close() error
Connect(context.Context, string) error
Connect(context.Context) error
}

// TracerProvider returns the tracer provider of client.
Expand Down
48 changes: 26 additions & 22 deletions core/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/yomorun/yomo/core/router"
"github.com/yomorun/yomo/core/ylog"
"github.com/yomorun/yomo/pkg/frame-codec/y3codec"
"github.com/yomorun/yomo/pkg/id"
)

const testaddr = "127.0.0.1:19999"
Expand All @@ -29,8 +28,8 @@ var discardingLogger = ylog.NewFromConfig(ylog.Config{Output: "/dev/null", Error
func TestClientDialNothing(t *testing.T) {
ctx := context.Background()

client := NewClient("source", ClientTypeSource, WithLogger(discardingLogger))
err := client.Connect(ctx, testaddr)
client := NewClient("source", testaddr, ClientTypeSource, WithLogger(discardingLogger))
err := client.Connect(ctx)

qerr := net.ErrClosed
assert.ErrorAs(t, err, &qerr, "dial must timeout")
Expand Down Expand Up @@ -60,22 +59,23 @@ func TestFrameRoundTrip(t *testing.T) {
server.SetBeforeHandlers(ht.beforeHandler)
server.SetAfterHandlers(ht.afterHandler)

recorder := newFrameWriterRecorder("mockClient")
server.AddDownstreamServer("mockAddr", recorder)
recorder := newFrameWriterRecorder("mockID", "mockClientLocal", "mockClientRemote")
server.AddDownstreamServer(recorder)

assert.Equal(t, server.Downstreams()["mockAddr"], recorder.ClientID())
assert.Equal(t, server.Downstreams()["mockID"], recorder.ID())

go func() {
err := server.ListenAndServe(ctx, testaddr)
fmt.Println(err)
}()

illegalTokenSource := NewClient("source", ClientTypeSource, WithCredential("token:error-token"), WithLogger(discardingLogger))
err := illegalTokenSource.Connect(ctx, testaddr)
illegalTokenSource := NewClient("source", testaddr, ClientTypeSource, WithCredential("token:error-token"), WithLogger(discardingLogger))
err := illegalTokenSource.Connect(ctx)
assert.Equal(t, "authentication failed: client credential name is token", err.Error())

source := NewClient(
"source",
testaddr,
ClientTypeSource,
WithCredential("token:auth-token"),
WithClientQuicConfig(DefalutQuicConfig),
Expand All @@ -90,10 +90,10 @@ func TestFrameRoundTrip(t *testing.T) {
assert.Equal(t, string(backflow), string(bf.Carriage))
})

err = source.Connect(ctx, testaddr)
err = source.Connect(ctx)
assert.NoError(t, err, "source connect must be success")
closeEarlySfn := createTestStreamFunction("close-early-sfn", observedTag)
closeEarlySfn.Connect(ctx, testaddr)
closeEarlySfn := createTestStreamFunction("close-early-sfn", testaddr, observedTag)
closeEarlySfn.Connect(ctx)
assert.Equal(t, nil, err)

// test close early.
Expand All @@ -104,7 +104,7 @@ func TestFrameRoundTrip(t *testing.T) {
assert.True(t, exited, "close-early-sfn should exited")

// sfn to zipper.
sfn := createTestStreamFunction("sfn-1", observedTag)
sfn := createTestStreamFunction("sfn-1", testaddr, observedTag)
sfn.SetDataFrameObserver(func(bf *frame.DataFrame) {
assert.Equal(t, string(payload), string(bf.Payload))

Expand All @@ -123,7 +123,7 @@ func TestFrameRoundTrip(t *testing.T) {
}
})

err = sfn.Connect(ctx, testaddr)
err = sfn.Connect(ctx)
assert.NoError(t, err, "sfn connect should replace the old sfn stream")

exited = checkClientExited(sfn, time.Second)
Expand Down Expand Up @@ -211,9 +211,10 @@ func (a *hookTester) afterHandler(ctx *Context) error {
return nil
}

func createTestStreamFunction(name string, observedTag frame.Tag) *Client {
func createTestStreamFunction(name string, zipperAddr string, observedTag frame.Tag) *Client {
sfn := NewClient(
name,
zipperAddr,
ClientTypeStreamFunction,
WithCredential("token:auth-token"),
WithLogger(discardingLogger),
Expand All @@ -226,27 +227,30 @@ func createTestStreamFunction(name string, observedTag frame.Tag) *Client {
// frameWriterRecorder frames be writen.
type frameWriterRecorder struct {
id string
name string
localName string
remoteName string
codec frame.Codec
packetReader frame.PacketReadWriter
mu sync.Mutex
buf *bytes.Buffer
}

func newFrameWriterRecorder(name string) *frameWriterRecorder {
func newFrameWriterRecorder(id, localName, remoteName string) *frameWriterRecorder {
return &frameWriterRecorder{
id: id.New(),
name: name,
id: id,
localName: localName,
remoteName: remoteName,
codec: y3codec.Codec(),
packetReader: y3codec.PacketReadWriter(),
buf: new(bytes.Buffer),
}
}

func (w *frameWriterRecorder) ClientID() string { return w.id }
func (w *frameWriterRecorder) Name() string { return w.name }
func (w *frameWriterRecorder) Close() error { return nil }
func (w *frameWriterRecorder) Connect(_ context.Context, _ string) error { return nil }
func (w *frameWriterRecorder) ID() string { return w.id }
func (w *frameWriterRecorder) LocalName() string { return w.localName }
func (w *frameWriterRecorder) RemoteName() string { return w.remoteName }
func (w *frameWriterRecorder) Close() error { return nil }
func (w *frameWriterRecorder) Connect(_ context.Context) error { return nil }

func (w *frameWriterRecorder) WriteFrame(f frame.Frame) error {
w.mu.Lock()
Expand Down
15 changes: 0 additions & 15 deletions core/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package core

import (
"github.com/yomorun/yomo/core/metadata"
"golang.org/x/exp/slog"
)

const (
Expand Down Expand Up @@ -68,17 +67,3 @@ func SetTracedToMetadata(m metadata.M, traced bool) {
}
m.Set(MetaTraced, tracedString)
}

// MetadataSlogAttr returns slog.Attr from metadata.
func MetadataSlogAttr(md metadata.M) slog.Attr {
kvStrings := make([]any, len(md)*2)
i := 0
for k, v := range md {
kvStrings[i] = k
i++
kvStrings[i] = v
i++
}

return slog.Group("metadata", kvStrings...)
}
Loading