Skip to content

Commit

Permalink
test: fix client unittest (#643)
Browse files Browse the repository at this point in the history
# Description

fix unittest bug: The downstream be written twice in unittest case, fix
to read the recorder and assert the read result twice.
  • Loading branch information
woorui authored Oct 20, 2023
1 parent 82e17ff commit d4c8685
Showing 1 changed file with 31 additions and 14 deletions.
45 changes: 31 additions & 14 deletions core/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ func TestFrameRoundTrip(t *testing.T) {
ctx := context.Background()

var (
observedTag = frame.Tag(1)
backflowTag = frame.Tag(2)
observedTag = frame.Tag(0x13)
backflowTag = frame.Tag(0x14)
payload = []byte("hello data frame")
backflow = []byte("hello backflow frame")
)
Expand Down Expand Up @@ -129,11 +129,21 @@ func TestFrameRoundTrip(t *testing.T) {
exited = checkClientExited(sfn, time.Second)
assert.False(t, exited, "sfn stream should not exited")

sfnMetaBytes, _ := NewDefaultMetadata(source.clientID, "tid", "sid", false).Encode()
sfnMd := NewDefaultMetadata(source.clientID, "sfn-tid", "sfn-sid", false)

err = sfn.WriteFrame(&frame.DataFrame{Tag: backflowTag, Metadata: sfnMetaBytes, Payload: backflow})
sfnMetaBytes, _ := sfnMd.Encode()

dataFrame := &frame.DataFrame{
Tag: backflowTag,
Metadata: sfnMetaBytes,
Payload: backflow,
}

err = sfn.WriteFrame(dataFrame)
assert.NoError(t, err)

assertDownstreamDataFrame(t, dataFrame.Tag, sfnMd, dataFrame.Payload, recorder)

stats := server.StatsFunctions()
nameList := []string{}
for _, name := range stats {
Expand All @@ -149,7 +159,7 @@ func TestFrameRoundTrip(t *testing.T) {
)
sourceMetaBytes, _ := md.Encode()

dataFrame := &frame.DataFrame{
dataFrame = &frame.DataFrame{
Tag: observedTag,
Metadata: sourceMetaBytes,
Payload: payload,
Expand All @@ -158,12 +168,7 @@ func TestFrameRoundTrip(t *testing.T) {
err = source.WriteFrame(dataFrame)
assert.NoError(t, err, "source write dataFrame must be success")

time.Sleep(2 * time.Second)

recordTag, recordMD, recordPayload := recorder.dataFrameContent()
assert.True(t, recordTag == dataFrame.Tag || recordTag == backflowTag)
assert.Equal(t, GetSourceIDFromMetadata(recordMD), source.clientID)
assert.True(t, bytes.Equal(recordPayload, dataFrame.Payload) || bytes.Equal(recordPayload, backflow))
assertDownstreamDataFrame(t, dataFrame.Tag, md, dataFrame.Payload, recorder)

assert.NoError(t, source.Close(), "source client.Close() should not return error")
assert.NoError(t, sfn.Close(), "sfn client.Close() should not return error")
Expand Down Expand Up @@ -257,17 +262,29 @@ func (w *frameWriterRecorder) WriteFrame(f frame.Frame) error {
defer w.mu.Unlock()

b, _ := w.codec.Encode(f)
_, err := w.buf.Write(b)
err := w.packetReader.WritePacket(w.buf, f.Type(), b)

return err
}

func (w *frameWriterRecorder) dataFrameContent() (frame.Tag, metadata.M, []byte) {
func (w *frameWriterRecorder) ReadFrameContent() (frame.Tag, metadata.M, []byte) {
w.mu.Lock()
defer w.mu.Unlock()

dataFrame := new(frame.DataFrame)
y3codec.Codec().Decode(w.buf.Bytes(), dataFrame)

_, bytes, _ := w.packetReader.ReadPacket(w.buf)
w.codec.Decode(bytes, dataFrame)
md, _ := metadata.Decode(dataFrame.Metadata)
return dataFrame.Tag, md, dataFrame.Payload
}

func assertDownstreamDataFrame(t *testing.T, tag uint32, md metadata.M, payload []byte, recorder *frameWriterRecorder) {
// wait for the downstream to finish writing.
time.Sleep(time.Second)

recordTag, recordMD, recordPayload := recorder.ReadFrameContent()
assert.Equal(t, recordTag, tag)
assert.Equal(t, recordMD, md)
assert.Equal(t, recordPayload, payload)
}

1 comment on commit d4c8685

@vercel
Copy link

@vercel vercel bot commented on d4c8685 Oct 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

yomo – ./

yomo.vercel.app
yomo-yomorun.vercel.app
yomo-git-master-yomorun.vercel.app
www.yomo.run
yomo.run

Please sign in to comment.