diff --git a/benchmarks/dumb-server/.gitignore b/benchmarks/dumb-server/.gitignore new file mode 100644 index 0000000..d287cd3 --- /dev/null +++ b/benchmarks/dumb-server/.gitignore @@ -0,0 +1 @@ +debug diff --git a/benchmarks/dumb-server/debug/main.go b/benchmarks/dumb-server/debug/main.go deleted file mode 100644 index 5cacaa0..0000000 --- a/benchmarks/dumb-server/debug/main.go +++ /dev/null @@ -1,37 +0,0 @@ -package main - -import ( - "os" - - "golang.org/x/net/http2" -) - -func main() { - f, _ := os.Open("/tmp/debug") - framer := http2.NewFramer(nil, f) - for { - fr, err := framer.ReadFrame() - if err != nil { - panic(err) - } - println(fr.Header().String()) - } -} - -// func main2() { -// conn, err := grpc.Dial("localhost:9090", grpc.WithTransportCredentials(insecure.NewCredentials())) -// if err != nil { -// panic(err) -// } -// -// client := pb.NewTestApiClient(conn) -// ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) -// defer cancel() -// -// for i := 0; i < 10_000; i++ { -// _, err := client.Test(ctx, &pb.TestRequest{}) -// if err != nil { -// panic(err) -// } -// } -// } diff --git a/benchmarks/dumb-server/pb/api.pb.go b/benchmarks/dumb-server/pb/api.pb.go index aeac744..a494dfe 100644 --- a/benchmarks/dumb-server/pb/api.pb.go +++ b/benchmarks/dumb-server/pb/api.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.28.1 +// protoc-gen-go v1.34.2 // protoc v3.21.12 // source: api.proto @@ -133,7 +133,7 @@ func file_api_proto_rawDescGZIP() []byte { } var file_api_proto_msgTypes = make([]protoimpl.MessageInfo, 2) -var file_api_proto_goTypes = []interface{}{ +var file_api_proto_goTypes = []any{ (*TestRequest)(nil), // 0: test.api.TestRequest (*EmptyResponse)(nil), // 1: test.api.EmptyResponse } @@ -153,7 +153,7 @@ func file_api_proto_init() { return } if !protoimpl.UnsafeEnabled { - file_api_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + file_api_proto_msgTypes[0].Exporter = func(v any, i int) any { switch v := v.(*TestRequest); i { case 0: return &v.state @@ -165,7 +165,7 @@ func file_api_proto_init() { return nil } } - file_api_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + file_api_proto_msgTypes[1].Exporter = func(v any, i int) any { switch v := v.(*EmptyResponse); i { case 0: return &v.state diff --git a/benchmarks/dumb-server/pb/api_grpc.pb.go b/benchmarks/dumb-server/pb/api_grpc.pb.go index 400e061..c109b14 100644 --- a/benchmarks/dumb-server/pb/api_grpc.pb.go +++ b/benchmarks/dumb-server/pb/api_grpc.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: -// - protoc-gen-go-grpc v1.4.0 +// - protoc-gen-go-grpc v1.3.0 // - protoc v3.21.12 // source: api.proto @@ -15,8 +15,8 @@ import ( // This is a compile-time assertion to ensure that this generated file // is compatible with the grpc package it is being compiled against. -// Requires gRPC-Go v1.62.0 or later. -const _ = grpc.SupportPackageIsVersion8 +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 const ( TestApi_Test_FullMethodName = "/test.api.TestApi/Test" @@ -38,9 +38,8 @@ func NewTestApiClient(cc grpc.ClientConnInterface) TestApiClient { } func (c *testApiClient) Test(ctx context.Context, in *TestRequest, opts ...grpc.CallOption) (*EmptyResponse, error) { - cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) out := new(EmptyResponse) - err := c.cc.Invoke(ctx, TestApi_Test_FullMethodName, in, out, cOpts...) + err := c.cc.Invoke(ctx, TestApi_Test_FullMethodName, in, out, opts...) if err != nil { return nil, err } diff --git a/benchmarks/jmeter-java-dsl/docker-compose.yaml b/benchmarks/jmeter-java-dsl/docker-compose.yaml index e073000..625db22 100644 --- a/benchmarks/jmeter-java-dsl/docker-compose.yaml +++ b/benchmarks/jmeter-java-dsl/docker-compose.yaml @@ -17,4 +17,4 @@ services: resources: limits: cpus: '2' - memory: 3G + memory: 2G diff --git a/cmd/framer/cmd_convert.go b/cmd/framer/cmd_convert.go index 6366b4c..9dc42f1 100644 --- a/cmd/framer/cmd_convert.go +++ b/cmd/framer/cmd_convert.go @@ -64,7 +64,7 @@ func (c *ConvertCommand) Run(ctx context.Context) error { var r8n reflection.DynamicMessagesStore if c.ReflectionAddr != "" { - conn, err := grpc.NewClient( + conn, err := grpc.Dial( c.ReflectionAddr, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithUserAgent("framer"), diff --git a/cmd/framer/cmd_load.go b/cmd/framer/cmd_load.go index ab48667..c622cf4 100644 --- a/cmd/framer/cmd_load.go +++ b/cmd/framer/cmd_load.go @@ -132,18 +132,18 @@ func (c *LoadCommand) Run( g, ctx := errgroup.WithContext(ctx) - timeout := consts.DefaultTimeout - var reporter types.Reporter = supersimpleReporter.New(timeout) + var reporter types.Reporter = supersimpleReporter.New() if c.Phout != "" { f, err := os.Create(c.Phout) if err != nil { return fmt.Errorf("creating phout file(%s): %w", c.Phout, err) } - phoutReporter := phoutReporter.New(f, timeout) + phoutReporter := phoutReporter.New(f) reporter = multi.NewMutli(phoutReporter, reporter) } g.Go(reporter.Run) + timeout := consts.DefaultTimeout loaders := make([]*loader.Loader, clients) for i := 0; i < clients; i++ { conn, err := createConn(ctx, timeout, addr) @@ -154,6 +154,7 @@ func (c *LoadCommand) Run( conn, reporter, timeout, + false, log, ) if err != nil { diff --git a/cmd/framer/main_benchmark_test.go b/cmd/framer/main_benchmark_test.go index 5ad1d1a..8f94c43 100644 --- a/cmd/framer/main_benchmark_test.go +++ b/cmd/framer/main_benchmark_test.go @@ -66,6 +66,7 @@ func BenchmarkE2E(b *testing.B) { conn, reporter, consts.DefaultTimeout, + false, zaptest.NewLogger(b), ) if err != nil { @@ -129,6 +130,7 @@ func BenchmarkE2EInMemDatasource(b *testing.B) { conn, reporter, consts.DefaultTimeout, + false, zaptest.NewLogger(b), ) a.NoError(err) diff --git a/consts/consts.go b/consts/consts.go index d6939a7..6417caf 100644 --- a/consts/consts.go +++ b/consts/consts.go @@ -1,9 +1,11 @@ package consts -import "time" +import ( + "math" + "time" +) const ( - ChunksBufferSize = 2048 RecieveBufferSize = 2048 SendBatchTimeout = time.Millisecond RecieveBatchTimeout = time.Millisecond @@ -11,4 +13,5 @@ const ( DefaultInitialWindowSize = 65_535 DefaultTimeout = 11 * time.Second DefaultMaxFrameSize = 16384 // Максимальная длина пейлоада фрейма в grpc. У http2 ограничение больше. + DefaultMaxHeaderListSize = math.MaxUint32 ) diff --git a/datasource/decoder/decoder.go b/datasource/decoder/decoder.go new file mode 100644 index 0000000..4af614e --- /dev/null +++ b/datasource/decoder/decoder.go @@ -0,0 +1,52 @@ +package decoder + +import ( + "bytes" + "fmt" + + "github.com/ozontech/framer/utils/lru" +) + +type Decoder struct { + metaUnmarshaler *SafeMultiValDecoder + + tagsLRU *lru.LRU + methodsLRU *lru.LRU +} + +func NewDecoder() *Decoder { + return &Decoder{ + metaUnmarshaler: NewSafeMultiValDecoder(), + + tagsLRU: lru.New(1024), + methodsLRU: lru.New(1024), + } +} + +func (decoder *Decoder) Unmarshal(d *Data, b []byte) error { + d.Reset() + + tagB, b := nextLine(b) + d.Tag = decoder.tagsLRU.GetOrAdd(tagB) + + methodB, b := nextLine(b) + d.Method = decoder.methodsLRU.GetOrAdd(methodB) + + metaBytes, b := nextLine(b) + var err error + d.Metadata, err = decoder.metaUnmarshaler.UnmarshalAppend(d.Metadata, metaBytes) + if err != nil { + return fmt.Errorf("meta unmarshal error: %w", err) + } + + d.Message = b + return nil +} + +func nextLine(in []byte) ([]byte, []byte) { + index := bytes.IndexByte(in, '\n') + if index == -1 { + return []byte{}, []byte{} + } + return in[:index], in[index+1:] +} diff --git a/datasource/decoder/jsonkv_safe.go b/datasource/decoder/jsonkv_safe.go new file mode 100644 index 0000000..db05930 --- /dev/null +++ b/datasource/decoder/jsonkv_safe.go @@ -0,0 +1,47 @@ +package decoder + +import ( + "github.com/mailru/easyjson/jlexer" + "github.com/ozontech/framer/utils/lru" +) + +const ( + kLRUSize = 1 << 10 + vLRUSize = 1 << 16 +) + +type SafeMultiValDecoder struct { + kLRU *lru.LRU + vLRU *lru.LRU +} + +func NewSafeMultiValDecoder() *SafeMultiValDecoder { + return &SafeMultiValDecoder{ + lru.New(kLRUSize), + lru.New(vLRUSize), + } +} + +func (d *SafeMultiValDecoder) UnmarshalAppend(buf []Meta, bytes []byte) ([]Meta, error) { + in := jlexer.Lexer{Data: bytes} + + in.Delim('{') + for !in.IsDelim('}') { + key := d.kLRU.GetOrAdd(in.UnsafeBytes()) + in.WantColon() + + in.Delim('[') + for !in.IsDelim(']') { + val := d.kLRU.GetOrAdd(in.UnsafeBytes()) + buf = append(buf, Meta{Name: key, Value: val}) + in.WantComma() + } + in.Delim(']') + + in.WantComma() + } + in.Delim('}') + in.Consumed() + + return buf, in.Error() +} diff --git a/datasource/decoder/model.go b/datasource/decoder/model.go new file mode 100644 index 0000000..adf4aab --- /dev/null +++ b/datasource/decoder/model.go @@ -0,0 +1,20 @@ +package decoder + +type Meta struct { + Name string + Value string +} + +type Data struct { + Tag string + Method string // "/" {service name} "/" {method name} + Metadata []Meta + Message []byte +} + +func (d *Data) Reset() { + d.Tag = "" + d.Method = "" + d.Metadata = d.Metadata[:0] + d.Message = d.Message[:0] +} diff --git a/datasource/file.go b/datasource/file.go index f508a20..bcb6008 100644 --- a/datasource/file.go +++ b/datasource/file.go @@ -5,6 +5,7 @@ import ( "io" "sync" + "github.com/ozontech/framer/datasource/decoder" "github.com/ozontech/framer/formats/grpc/ozon/binary" "github.com/ozontech/framer/formats/model" "github.com/ozontech/framer/loader/types" @@ -12,7 +13,9 @@ import ( ) type FileDataSource struct { - format *model.InputFormat + reader model.PooledRequestReader + decoder *decoder.Decoder + pool *pool.SlicePool[*fileRequest] factory *RequestAdapterFactory mu sync.Mutex @@ -20,7 +23,9 @@ type FileDataSource struct { func NewFileDataSource(r io.Reader, factoryOptions ...Option) *FileDataSource { ds := &FileDataSource{ - binary.NewInput(r), + binary.NewInput(r).Reader, + decoder.NewDecoder(), + pool.NewSlicePoolSize[*fileRequest](100), NewRequestAdapterFactory(factoryOptions...), sync.Mutex{}, @@ -32,7 +37,7 @@ func (ds *FileDataSource) Fetch() (types.Req, error) { r, ok := ds.pool.Acquire() if !ok { r = &fileRequest{ - bytesPool: ds.format.Reader, + bytesPool: ds.reader, pool: ds.pool, RequestAdapter: ds.factory.Build(), } @@ -40,17 +45,17 @@ func (ds *FileDataSource) Fetch() (types.Req, error) { var err error ds.mu.Lock() - r.bytes, err = ds.format.Reader.ReadNext() + r.bytes, err = ds.reader.ReadNext() ds.mu.Unlock() if err != nil { return nil, fmt.Errorf("read next request: %w", err) } - return r, ds.format.Decoder.Unmarshal(&r.data, r.bytes) + return r, ds.decoder.Unmarshal(&r.data, r.bytes) } type fileRequest struct { - bytesPool model.PooledRequestReder + bytesPool model.PooledRequestReader bytes []byte pool *pool.SlicePool[*fileRequest] *RequestAdapter diff --git a/datasource/file_benchmark_test.go b/datasource/file_benchmark_test.go index 5054497..e41d8d7 100644 --- a/datasource/file_benchmark_test.go +++ b/datasource/file_benchmark_test.go @@ -12,8 +12,8 @@ import ( type noopHpackFieldWriter struct{} -func (*noopHpackFieldWriter) WriteField(string, string) error { return nil } -func (*noopHpackFieldWriter) SetWriter(io.Writer) {} +func (*noopHpackFieldWriter) WriteField(string, string) {} +func (*noopHpackFieldWriter) SetWriter(io.Writer) {} func BenchmarkFileDataSource(b *testing.B) { f, err := os.Open("../test_files/requests") @@ -28,7 +28,7 @@ func BenchmarkFileDataSource(b *testing.B) { go func() { defer close(done) for r := range rr { - r.SetUp(consts.DefaultMaxFrameSize, 0, &noopHpackFieldWriter{}) + r.SetUp(consts.DefaultMaxFrameSize, consts.DefaultMaxHeaderListSize, 0, &noopHpackFieldWriter{}) b.SetBytes(int64(r.Size())) r.Release() } @@ -61,7 +61,7 @@ func BenchmarkRequestSetupNoop(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - r.SetUp(consts.DefaultMaxFrameSize, 0, &noopHpackFieldWriter{}) + r.SetUp(consts.DefaultMaxFrameSize, consts.DefaultMaxHeaderListSize, 0, &noopHpackFieldWriter{}) b.SetBytes(int64(r.Size())) } } @@ -82,7 +82,7 @@ func BenchmarkRequestSetupHpack(b *testing.B) { hpackwrapper := hpackwrapper.NewWrapper() b.ResetTimer() for i := 0; i < b.N; i++ { - r.SetUp(consts.DefaultMaxFrameSize, 0, hpackwrapper) + r.SetUp(consts.DefaultMaxFrameSize, consts.DefaultMaxHeaderListSize, 0, hpackwrapper) b.SetBytes(int64(r.Size())) } } diff --git a/datasource/inmem.go b/datasource/inmem.go index 5e7ea3f..20a57a4 100644 --- a/datasource/inmem.go +++ b/datasource/inmem.go @@ -6,6 +6,7 @@ import ( "io" "sync/atomic" + "github.com/ozontech/framer/datasource/decoder" "github.com/ozontech/framer/formats/grpc/ozon/binary" "github.com/ozontech/framer/formats/model" "github.com/ozontech/framer/loader/types" @@ -13,16 +14,20 @@ import ( ) type InmemDataSource struct { - format *model.InputFormat + reader model.PooledRequestReader + decoder *decoder.Decoder + factory *RequestAdapterFactory pool *pool.SlicePool[*memRequest] i atomic.Int32 - datas []model.Data + datas []decoder.Data } func NewInmemDataSource(r io.Reader, factoryOptions ...Option) *InmemDataSource { return &InmemDataSource{ - binary.NewInput(r), + binary.NewInput(r).Reader, + decoder.NewDecoder(), + NewRequestAdapterFactory(factoryOptions...), pool.NewSlicePoolSize[*memRequest](100), atomic.Int32{}, @@ -34,15 +39,15 @@ func (ds *InmemDataSource) Init() error { var b []byte var err error for { - b, err = ds.format.Reader.ReadNext() + b, err = ds.reader.ReadNext() if err != nil { if errors.Is(err, io.EOF) { break } return fmt.Errorf("read next request: %w", err) } - var data model.Data - err = ds.format.Decoder.Unmarshal(&data, b) + var data decoder.Data + err = ds.decoder.Unmarshal(&data, b) if err != nil { return fmt.Errorf("read next request: %w", err) } diff --git a/datasource/inmem_benchmark_test.go b/datasource/inmem_benchmark_test.go index 0247766..10b1f69 100644 --- a/datasource/inmem_benchmark_test.go +++ b/datasource/inmem_benchmark_test.go @@ -34,7 +34,7 @@ func BenchmarkInmemDataSource(b *testing.B) { }() for r := range rr { - r.SetUp(consts.DefaultMaxFrameSize, 0, &noopHpackFieldWriter{}) + r.SetUp(consts.DefaultMaxFrameSize, consts.DefaultMaxHeaderListSize, 0, &noopHpackFieldWriter{}) b.SetBytes(int64(r.Size())) r.Release() } diff --git a/datasource/meta_middleware.go b/datasource/meta_middleware.go new file mode 100644 index 0000000..2bd473a --- /dev/null +++ b/datasource/meta_middleware.go @@ -0,0 +1,83 @@ +package datasource + +import ( + "strings" + + "github.com/ozontech/framer/loader/types" +) + +type MetaMiddleware interface { + IsAllowed(key string) bool + WriteAdditional(types.HPackFieldWriter) +} + +type noopMiddleware struct{} + +func (noopMiddleware) IsAllowed(key string) bool { return true } +func (noopMiddleware) WriteAdditional(types.HPackFieldWriter) {} + +type defaultMiddleware struct { + staticPseudo []string + staticRegular []string + next MetaMiddleware +} + +func newDefaultMiddleware(next MetaMiddleware, additionalHeaders ...string) *defaultMiddleware { + m := &defaultMiddleware{ + staticPseudo: []string{ + ":method", "POST", + ":scheme", "http", + }, + staticRegular: []string{ + "content-type", "application/grpc", + "te", "trailers", + }, + next: noopMiddleware{}, + } + + for i := 0; i < len(additionalHeaders); i += 2 { + k, v := additionalHeaders[i], additionalHeaders[i+1] + if strings.HasPrefix(k, ":") { + m.staticPseudo = append(m.staticPseudo, k, v) + } else { + m.staticRegular = append(m.staticRegular, k, v) + } + } + + return m +} + +func (f *defaultMiddleware) IsAllowed(k string) (allowed bool) { + if k == "" { + return false + } + // отфильтровываем псевдохедеры из меты т.к. + // стандартный клиент также псевдохедеры не пропускает + // и псевдохедерами можно легко сломать стрельбу + if k[0] == ':' { + return false + } + switch k { + case "content-type", "te", "grpc-timeout": + return false + } + return f.next.IsAllowed(k) +} + +func (m *defaultMiddleware) WriteAdditional(hpack types.HPackFieldWriter) { + // добавляем статичные псевдохедеры + staticPseudo := m.staticPseudo + for i := 0; i < len(staticPseudo); i += 2 { + //nolint:errcheck // пишем в буфер, это безопасно + hpack.WriteField(staticPseudo[i], staticPseudo[i+1]) + } + + m.next.WriteAdditional(hpack) + + // добавляем статичные хедеры + staticRegular := m.staticRegular + for i := 0; i < len(staticRegular); i += 2 { + //nolint:errcheck // пишем в буфер, это безопасно + hpack.WriteField(staticRegular[i], staticRegular[i+1]) + } +} diff --git a/datasource/request.go b/datasource/request.go index 9d81fad..1edbc0a 100644 --- a/datasource/request.go +++ b/datasource/request.go @@ -2,68 +2,44 @@ package datasource import ( "bytes" - "strings" + "fmt" "time" - "unsafe" - "github.com/ozontech/framer/formats/model" + "github.com/ozontech/framer/datasource/decoder" "github.com/ozontech/framer/frameheader" "github.com/ozontech/framer/loader/types" grpcutil "github.com/ozontech/framer/utils/grpc" "golang.org/x/net/http2" ) +type config struct { + additionalHeaders []string + metaMiddleware MetaMiddleware +} + type Option interface { - apply(f *RequestAdapterFactory) + apply(f *config) } type RequestAdapterFactory struct { - staticPseudoHeaders []string - staticRegularHeaders []string - headerFilter func(k string) (mustSkip bool) + metaMiddleware MetaMiddleware } func NewRequestAdapterFactory(ops ...Option) *RequestAdapterFactory { - f := &RequestAdapterFactory{ - headerFilter: func(k string) (allowed bool) { return true }, - staticPseudoHeaders: []string{ - ":method", "POST", - ":scheme", "http", - }, - staticRegularHeaders: []string{ - "content-type", "application/grpc", - "te", "trailers", - }, + c := config{ + metaMiddleware: noopMiddleware{}, } for _, o := range ops { - o.apply(f) + o.apply(&c) } - return f -} -func (f *RequestAdapterFactory) Build() *RequestAdapter { - return NewRequestAdapter( - f.isAllowedMeta, - f.staticPseudoHeaders, - f.staticRegularHeaders, - ) + return &RequestAdapterFactory{ + metaMiddleware: newDefaultMiddleware(c.metaMiddleware, c.additionalHeaders...), + } } -func (f *RequestAdapterFactory) isAllowedMeta(k string) (allowed bool) { - if k == "" { - return false - } - // отфильтровываем псевдохедеры из меты т.к. - // стандартный клиент также псевдохедеры не пропускает - // и псевдохедерами можно легко сломать стрельбу - if k[0] == ':' { - return false - } - switch k { - case "content-type", "te", "grpc-timeout": - return false - } - return f.headerFilter(k) +func (f *RequestAdapterFactory) Build() *RequestAdapter { + return NewRequestAdapter(f.metaMiddleware) } type frameHeaders []byte @@ -86,80 +62,54 @@ func (fp *frameHeaders) Get() frameheader.FrameHeader { } type RequestAdapter struct { - isAllowedMeta func(k string) bool - staticPseudo []string - staticRegular []string - size int + metaMiddleware MetaMiddleware + size int frameHeaders *frameHeaders payloadPrefix [5]byte frames []types.Frame headersBuf *bytes.Buffer - data model.Data + data decoder.Data } -func NewRequestAdapter( - isAllowedMeta func(k string) bool, - staticPseudo []string, - staticRegular []string, -) *RequestAdapter { +func NewRequestAdapter(metaMiddleware MetaMiddleware) *RequestAdapter { return &RequestAdapter{ - frameHeaders: newFrameHeaders(), - frames: make([]types.Frame, 2), - isAllowedMeta: isAllowedMeta, - staticPseudo: staticPseudo, - staticRegular: staticRegular, - headersBuf: bytes.NewBuffer(nil), + frameHeaders: newFrameHeaders(), + frames: make([]types.Frame, 2), + metaMiddleware: metaMiddleware, + headersBuf: bytes.NewBuffer(nil), } } -func (a *RequestAdapter) setData(data model.Data) { a.data = data } -func (a *RequestAdapter) FullMethodName() string { return unsafeString(a.data.Method) } -func (a *RequestAdapter) Tag() string { return unsafeString(a.data.Tag) } - -// TODO(pgribanov): после реализации собственной системы энкодинга хедеров, -// отказаться от unsafe -func unsafeString(b []byte) string { - //nolint:gosec - return unsafe.String(&b[0], len(b)) -} +func (a *RequestAdapter) setData(data decoder.Data) { a.data = data } +func (a *RequestAdapter) FullMethodName() string { return a.data.Method } +func (a *RequestAdapter) Tag() string { return a.data.Tag } func (a *RequestAdapter) setUpHeaders( maxFramePayloadLen int, + maxHeaderListSize int, // https://datatracker.ietf.org/doc/html/rfc9113#section-6.5.2-2.12.1 streamID uint32, hpack types.HPackFieldWriter, -) { +) error { a.size = 0 + var headerListSize int // https://datatracker.ietf.org/doc/html/rfc9113#section-6.5.2-2.12.1 hpack.SetWriter(a.headersBuf) data := a.data - //nolint:errcheck // пишем в буфер, это безопасно - hpack.WriteField(":path", unsafeString(data.Method)) - - // добавляем статичные псевдохедеры - staticPseudo := a.staticPseudo - for i := 0; i < len(staticPseudo); i += 2 { - //nolint:errcheck // пишем в буфер, это безопасно - hpack.WriteField(staticPseudo[i], staticPseudo[i+1]) - } - - // добавляем статичные хедеры - staticRegular := a.staticRegular - for i := 0; i < len(staticRegular); i += 2 { - //nolint:errcheck // пишем в буфер, это безопасно - hpack.WriteField(staticRegular[i], staticRegular[i+1]) - } + headerListSize += len(":path") + len(data.Method) + hpack.WriteField(":path", data.Method) + a.metaMiddleware.WriteAdditional(hpack) // добавляем мету из запроса for _, m := range data.Metadata { - k := unsafeString(m.Name) - v := unsafeString(m.Value) - if !a.isAllowedMeta(k) { + if !a.metaMiddleware.IsAllowed(m.Name) { continue } - - //nolint:errcheck // пишем в буфер, это безопасно - hpack.WriteField(k, v) + headerListSize += len(m.Name) + len(m.Value) + hpack.WriteField(m.Name, m.Value) + } + if headerListSize > maxHeaderListSize { + return fmt.Errorf("header list size exeed limit: %d > %d", headerListSize, maxHeaderListSize) } for { @@ -190,6 +140,7 @@ func (a *RequestAdapter) setUpHeaders( break } } + return nil } func (a *RequestAdapter) setUpPayload( @@ -269,41 +220,51 @@ func (a *RequestAdapter) Size() int { func (a *RequestAdapter) SetUp( maxFramePayloadLen int, + maxHeaderListSize int, streamID uint32, hpackFieldWriter types.HPackFieldWriter, -) []types.Frame { +) ([]types.Frame, error) { a.headersBuf.Reset() a.frameHeaders.Reset() a.frames = a.frames[:0] - a.setUpHeaders(maxFramePayloadLen, streamID, hpackFieldWriter) + err := a.setUpHeaders(maxFramePayloadLen, maxHeaderListSize, streamID, hpackFieldWriter) a.setUpPayload(maxFramePayloadLen, streamID) - return a.frames + return a.frames, err } func WithAdditionalHeader(k, v string) Option { - return additionalHeadersOpts([]string{k, v}) + return fnOption(func(c *config) { + c.additionalHeaders = append(c.additionalHeaders, k, v) + }) } func WithAdditionalHeaders(headers []string) Option { - return additionalHeadersOpts(headers) + return fnOption(func(c *config) { + c.additionalHeaders = append(c.additionalHeaders, headers...) + }) } func WithTimeout(t time.Duration) Option { - return additionalHeadersOpts([]string{"grpc-timeout", grpcutil.EncodeDuration(t)}) + return fnOption(func(c *config) { + c.additionalHeaders = append(c.additionalHeaders, "grpc-timeout", grpcutil.EncodeDuration(t)) + }) } -type additionalHeadersOpts []string +func WithMetaMiddleware(mw MetaMiddleware) Option { + return fnOption(func(c *config) { c.metaMiddleware = mw }) +} -func (h additionalHeadersOpts) apply(f *RequestAdapterFactory) { - for i := 0; i < len(h); i += 2 { - k, v := h[i], h[i+1] - if strings.HasPrefix(k, ":") { - f.staticPseudoHeaders = append(f.staticPseudoHeaders, k, v) - } else { - f.staticRegularHeaders = append(f.staticRegularHeaders, k, v) - } - } +type metaMiddlewareOpt struct { + mw MetaMiddleware +} + +func (o metaMiddlewareOpt) apply(f *RequestAdapterFactory) { + f.metaMiddleware = o.mw } + +type fnOption func(c *config) + +func (fn fnOption) apply(c *config) { fn(c) } diff --git a/datasource/request_test.go b/datasource/request_test.go index eaa8231..51be94d 100644 --- a/datasource/request_test.go +++ b/datasource/request_test.go @@ -10,7 +10,8 @@ import ( "golang.org/x/net/http2/hpack" "github.com/ozontech/framer/consts" - "github.com/ozontech/framer/formats/model" + "github.com/ozontech/framer/datasource/decoder" + "github.com/ozontech/framer/loader/types" hpackwrapper "github.com/ozontech/framer/utils/hpack_wrapper" ) @@ -48,14 +49,19 @@ func TestFrameHeadersPool(t *testing.T) { } } +type metaMW struct{} + +func (metaMW) IsAllowed(key string) bool { return true } +func (metaMW) WriteAdditional(fw types.HPackFieldWriter) { + fw.WriteField(":method", "POST") + fw.WriteField(":authority", ":authority-v") + fw.WriteField("regular-k", "regular-v") +} + func TestRequest1(t *testing.T) { t.Parallel() a := assert.New(t) - r := NewRequestAdapter( - func(string) bool { return true }, - []string{":method", "POST", ":authority", ":authority-v"}, - []string{"regular-k", "regular-v"}, - ) + r := NewRequestAdapter(metaMW{}) buf := bytes.NewBuffer(nil) framer := http2.NewFramer(nil, buf) framer.ReadMetaHeaders = hpack.NewDecoder(4098, nil) @@ -63,12 +69,12 @@ func TestRequest1(t *testing.T) { const interations = 10 for i := 0; i < interations; i++ { message := []byte("this is message") - r.data = model.Data{ - Tag: []byte("tag"), - Method: []byte("/method"), - Metadata: []model.Meta{ - {Name: []byte("k1"), Value: []byte("v1")}, - {Name: []byte("k2"), Value: []byte("v2")}, + r.data = decoder.Data{ + Tag: "tag", + Method: "/method", + Metadata: []decoder.Meta{ + {Name: "k1", Value: "v1"}, + {Name: "k2", Value: "v2"}, }, Message: message, } @@ -77,7 +83,8 @@ func TestRequest1(t *testing.T) { hpw := hpackwrapper.NewWrapper() const streamID uint32 = 123 - frames := r.SetUp(consts.DefaultMaxFrameSize, streamID, hpw) + frames, err := r.SetUp(consts.DefaultMaxFrameSize, consts.DefaultMaxHeaderListSize, streamID, hpw) + a.NoError(err) a.Len(frames, 2) for _, f := range frames { for _, c := range f.Chunks { diff --git a/formats/grpc/ozon/binary/io/io.go b/formats/grpc/ozon/binary/io/io.go index abda3bd..b48fd41 100644 --- a/formats/grpc/ozon/binary/io/io.go +++ b/formats/grpc/ozon/binary/io/io.go @@ -9,14 +9,21 @@ import ( const nChar = '\n' type Reader struct { - buf [4096]byte + buf []byte unprocessed []byte r io.Reader } -func NewReader(r io.Reader) *Reader { - return &Reader{r: r} +func NewReader(r io.Reader, bufSize ...int) *Reader { + size := 4096 + if len(bufSize) > 0 { + size = bufSize[0] + } + return &Reader{ + buf: make([]byte, size), + r: r, + } } func (r *Reader) fillUnprocessed() error { @@ -72,11 +79,7 @@ func (r *Reader) ReadNext(b []byte) ([]byte, error) { r.unprocessed = r.unprocessed[n:] } - for len(r.unprocessed) > 0 { - if r.unprocessed[0] != nChar { - break - } - r.unprocessed = r.unprocessed[1:] + for { if len(r.unprocessed) == 0 { err = r.fillUnprocessed() if err == io.EOF { @@ -85,6 +88,11 @@ func (r *Reader) ReadNext(b []byte) ([]byte, error) { if err != nil { return nil, err } + } else { + if r.unprocessed[0] != nChar { + break + } + r.unprocessed = r.unprocessed[1:] } } diff --git a/formats/grpc/ozon/json/encoding/decoder.go b/formats/grpc/ozon/json/encoding/decoder.go index 8f03825..19b5f9c 100644 --- a/formats/grpc/ozon/json/encoding/decoder.go +++ b/formats/grpc/ozon/json/encoding/decoder.go @@ -5,14 +5,12 @@ import ( "errors" "fmt" + "github.com/jhump/protoreflect/dynamic" "github.com/mailru/easyjson/jlexer" "github.com/ozontech/framer/formats/grpc/ozon/json/encoding/reflection" "github.com/ozontech/framer/formats/internal/kv" jsonkv "github.com/ozontech/framer/formats/internal/kv/json" "github.com/ozontech/framer/formats/model" - "google.golang.org/protobuf/encoding/protojson" - "google.golang.org/protobuf/proto" - "google.golang.org/protobuf/types/dynamicpb" ) type DecoderOption func(*Decoder) @@ -48,7 +46,7 @@ func (decoder *Decoder) Unmarshal(d *model.Data, b []byte) error { var ( jsonPayload []byte - dynamicMessage *dynamicpb.Message + dynamicMessage *dynamic.Message ) in.Delim('{') for !in.IsDelim('}') { @@ -86,13 +84,13 @@ func (decoder *Decoder) Unmarshal(d *model.Data, b []byte) error { return fmt.Errorf(`"call" is required`) } - if err = protojson.Unmarshal(jsonPayload, dynamicMessage); err != nil { + if err = dynamicMessage.UnmarshalJSON(jsonPayload); err != nil { return fmt.Errorf( "unmarshalling json payload for %s: %w", d.Method, err, ) } - d.Message, err = proto.MarshalOptions{}.MarshalAppend(d.Message, dynamicMessage) + d.Message, err = dynamicMessage.MarshalAppend(d.Message) if err != nil { return fmt.Errorf("marshaling grpc message into binary: %w", err) } diff --git a/formats/grpc/ozon/json/encoding/encoder.go b/formats/grpc/ozon/json/encoding/encoder.go index 8404745..cd97dab 100644 --- a/formats/grpc/ozon/json/encoding/encoder.go +++ b/formats/grpc/ozon/json/encoding/encoder.go @@ -9,8 +9,6 @@ import ( "github.com/ozontech/framer/formats/internal/kv" jsonkv "github.com/ozontech/framer/formats/internal/kv/json" "github.com/ozontech/framer/formats/model" - "google.golang.org/protobuf/encoding/protojson" - "google.golang.org/protobuf/proto" ) type Encoder struct { @@ -37,13 +35,14 @@ func (encoder *Encoder) MarshalAppend(b []byte, d *model.Data) ([]byte, error) { b = encoder.metaMarshaler.MarshalAppend(b, d.Metadata) b = append(b, `,"payload":`...) - if err := proto.Unmarshal(d.Message, message); err != nil { + if err := message.Unmarshal(d.Message); err != nil { return b, fmt.Errorf("unmarshaling binary payload for %s: %w", d.Method, err) } - b, err := protojson.MarshalOptions{}.MarshalAppend(b, message) + jsonBody, err := message.MarshalJSON() if err != nil { return b, fmt.Errorf("protojson marshal: %w", err) } + b = append(b, jsonBody...) return append(b, '}'), nil } diff --git a/formats/grpc/ozon/json/encoding/reflection/reflector.go b/formats/grpc/ozon/json/encoding/reflection/reflector.go index 5e7f725..b0086ce 100644 --- a/formats/grpc/ozon/json/encoding/reflection/reflector.go +++ b/formats/grpc/ozon/json/encoding/reflection/reflector.go @@ -10,6 +10,7 @@ import ( "github.com/jhump/protoreflect/desc/protoparse" "github.com/jhump/protoreflect/grpcreflect" "google.golang.org/grpc" + reflectpb "google.golang.org/grpc/reflection/grpc_reflection_v1alpha" ) type Fetcher interface { @@ -92,7 +93,7 @@ func (f *RemoteFetcher) Warnings() []string { } func (f *RemoteFetcher) Fetch(ctx context.Context) (DynamicMessagesStore, error) { - refClient := grpcreflect.NewClientAuto(ctx, f.conn) + refClient := grpcreflect.NewClient(ctx, reflectpb.NewServerReflectionClient(f.conn)) listServices, err := refClient.ListServices() if err != nil { return nil, fmt.Errorf("reflection fetching: %w", err) diff --git a/formats/grpc/ozon/json/encoding/reflection/store.go b/formats/grpc/ozon/json/encoding/reflection/store.go index 88981db..3843d2c 100644 --- a/formats/grpc/ozon/json/encoding/reflection/store.go +++ b/formats/grpc/ozon/json/encoding/reflection/store.go @@ -5,24 +5,24 @@ import ( "sync" "github.com/jhump/protoreflect/desc" - "google.golang.org/protobuf/types/dynamicpb" + "github.com/jhump/protoreflect/dynamic" ) type DynamicMessagesStore interface { // must return nil if not found - Get(methodName []byte) (message *dynamicpb.Message, release func()) + Get(methodName []byte) (message *dynamic.Message, release func()) } type dynamicMessagesStore struct { items map[string]*sync.Pool } -func (s *dynamicMessagesStore) Get(methodName []byte) (*dynamicpb.Message, func()) { +func (s *dynamicMessagesStore) Get(methodName []byte) (*dynamic.Message, func()) { pool, ok := s.items[string(methodName)] if !ok { return nil, nil } - message := pool.Get().(*dynamicpb.Message) + message := pool.Get().(*dynamic.Message) return message, func() { pool.Put(message) } } @@ -33,7 +33,7 @@ func NewDynamicMessagesStore(descriptors []*desc.MethodDescriptor) DynamicMessag fqn := string(NormalizeMethod([]byte(descriptor.GetFullyQualifiedName()))) items[fqn] = &sync.Pool{New: func() interface{} { - return dynamicpb.NewMessage(descriptor.GetInputType().UnwrapMessage()) + return dynamic.NewMessage(descriptor.GetInputType()) }} } return &dynamicMessagesStore{items} diff --git a/formats/model/model.go b/formats/model/model.go index dda8d39..4019475 100644 --- a/formats/model/model.go +++ b/formats/model/model.go @@ -31,7 +31,7 @@ type RequestReader interface { ReadNext([]byte) ([]byte, error) } -type PooledRequestReder interface { +type PooledRequestReader interface { ReadNext() ([]byte, error) Release([]byte) } @@ -41,7 +41,7 @@ type RequestWriter interface { } type InputFormat struct { - Reader PooledRequestReder + Reader PooledRequestReader Decoder Unmarshaler } diff --git a/go.mod b/go.mod index 3a1aa56..f3f4622 100644 --- a/go.mod +++ b/go.mod @@ -1,37 +1,34 @@ module github.com/ozontech/framer -go 1.22.2 +go 1.21 require ( github.com/alecthomas/kong v0.9.0 github.com/alecthomas/mango-kong v0.1.0 github.com/dustin/go-humanize v1.0.1 - github.com/jhump/protoreflect v1.16.0 + github.com/jhump/protoreflect v1.12.0 github.com/mailru/easyjson v0.7.7 github.com/stretchr/testify v1.9.0 go.uber.org/multierr v1.8.0 go.uber.org/zap v1.24.0 - golang.org/x/net v0.25.0 + golang.org/x/net v0.27.0 golang.org/x/sync v0.7.0 - google.golang.org/grpc v1.64.0 - google.golang.org/protobuf v1.33.1-0.20240408130810-98873a205002 + google.golang.org/grpc v1.52.0 + google.golang.org/protobuf v1.34.2 ) require ( github.com/benbjohnson/clock v1.1.0 // indirect - github.com/bufbuild/protocompile v0.10.0 // indirect + github.com/bufbuild/protocompile v0.4.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/golang/protobuf v1.5.4 // indirect github.com/josharian/intern v1.0.0 // indirect - github.com/kr/pretty v0.3.0 // indirect github.com/muesli/mango v0.1.1-0.20220205060214-77e2058169ab // indirect github.com/muesli/roff v0.1.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - go.uber.org/atomic v1.10.0 // indirect - go.uber.org/goleak v1.1.12 // indirect - golang.org/x/sys v0.20.0 // indirect - golang.org/x/text v0.15.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 // indirect - gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect + go.uber.org/atomic v1.7.0 // indirect + golang.org/x/sys v0.22.0 // indirect + golang.org/x/text v0.16.0 // indirect + google.golang.org/genproto v0.0.0-20230209215440-0dfe4f8abfcc // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index b02a3be..dd6e6e7 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/alecthomas/assert/v2 v2.6.0 h1:o3WJwILtexrEUk3cUVal3oiQY2tfgr/FHWiz/v2n4FU= github.com/alecthomas/assert/v2 v2.6.0/go.mod h1:Bze95FyfUr7x34QZrjL+XP+0qgp/zg8yS+TtBj1WA3k= github.com/alecthomas/kong v0.9.0 h1:G5diXxc85KvoV2f0ZRVuMsi45IrBgx9zDNGNj165aPA= @@ -8,32 +10,58 @@ github.com/alecthomas/repr v0.4.0 h1:GhI2A8MACjfegCPVq9f1FLvIBS+DrQ2KQBFZP1iFzXc github.com/alecthomas/repr v0.4.0/go.mod h1:Fr0507jx4eOXV7AlPV6AVZLYrLIuIeSOWtW57eE/O/4= github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= -github.com/bufbuild/protocompile v0.10.0 h1:+jW/wnLMLxaCEG8AX9lD0bQ5v9h1RUiMKOBOT5ll9dM= -github.com/bufbuild/protocompile v0.10.0/go.mod h1:G9qQIQo0xZ6Uyj6CMNz0saGmx2so+KONo8/KrELABiY= -github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/bufbuild/protocompile v0.4.0 h1:LbFKd2XowZvQ/kajzguUp2DC9UEIQhIq77fZZlaQsNA= +github.com/bufbuild/protocompile v0.4.0/go.mod h1:3v93+mbWn/v3xzN+31nwkJfrEpAUwp+BagBSZWx+TP8= +github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= +github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= +github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= +github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= -github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= -github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM= github.com/hexops/gotextdiff v1.0.3/go.mod h1:pSWU5MAI3yDq+fZBTazCSJysOMbxWL1BSow5/V2vxeg= -github.com/jhump/protoreflect v1.16.0 h1:54fZg+49widqXYQ0b+usAFHbMkBGR4PpXrsHc8+TBDg= -github.com/jhump/protoreflect v1.16.0/go.mod h1:oYPd7nPvcBw/5wlDfm/AVmU9zH9BgqGCI469pGxfj/8= +github.com/jhump/gopoet v0.0.0-20190322174617-17282ff210b3/go.mod h1:me9yfT6IJSlOL3FCfrg+L6yzUEZ+5jW6WHt4Sk+UPUI= +github.com/jhump/gopoet v0.1.0/go.mod h1:me9yfT6IJSlOL3FCfrg+L6yzUEZ+5jW6WHt4Sk+UPUI= +github.com/jhump/goprotoc v0.5.0/go.mod h1:VrbvcYrQOrTi3i0Vf+m+oqQWk9l72mjkJCYo7UvLHRQ= +github.com/jhump/protoreflect v1.11.0/go.mod h1:U7aMIjN0NWq9swDP7xDdoMfRHb35uiuTd3Z9nFXJf5E= +github.com/jhump/protoreflect v1.12.0 h1:1NQ4FpWMgn3by/n1X0fbeKEUxP1wBt7+Oitpv01HR10= +github.com/jhump/protoreflect v1.12.0/go.mod h1:JytZfP5d0r8pVNLZvai7U/MCuTWITgrI4tTg7puQFKI= +github.com/jhump/protoreflect v1.15.1 h1:HUMERORf3I3ZdX05WaQ6MIpd/NJ434hTp5YiKgfCL6c= +github.com/jhump/protoreflect v1.15.1/go.mod h1:jD/2GMKKE6OqX8qTjhADU1e6DShO+gavG9e0Q693nKo= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= -github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= -github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= -github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= -github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= -github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/muesli/mango v0.1.1-0.20220205060214-77e2058169ab h1:m7QFONkzLK0fVXCjwX5tANcnj1yXxTnYQtnfJiY3tcA= @@ -44,68 +72,93 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k= -github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= +github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= +go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= -go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ= -go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= -go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA= -go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= +go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= +go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/multierr v1.8.0 h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8= go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= -golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= -golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= +golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= -golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac= -golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= +golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= +golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys= +golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= -golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI= +golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk= -golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= +golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= -golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= -golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= -golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 h1:NnYq6UN9ReLM9/Y01KWNOWyI5xQ9kbIms5GGJVwS/Yc= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY= -google.golang.org/grpc v1.64.0 h1:KH3VH9y/MgNQg1dE7b3XfVK0GsPSIzJwdF617gUSbvY= -google.golang.org/grpc v1.64.0/go.mod h1:oxjF8E3FBnjp+/gVFYdWacaLDx9na1aqy9oovLpxQYg= -google.golang.org/protobuf v1.33.1-0.20240408130810-98873a205002 h1:V7Da7qt0MkY3noVANIMVBk28nOnijADeOR3i5Hcvpj4= -google.golang.org/protobuf v1.33.1-0.20240408130810-98873a205002/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= +google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= +google.golang.org/genproto v0.0.0-20230209215440-0dfe4f8abfcc h1:ijGwO+0vL2hJt5gaygqP2j6PfflOBrRot0IczKbmtio= +google.golang.org/genproto v0.0.0-20230209215440-0dfe4f8abfcc/go.mod h1:RGgjbofJ8xD9Sq1VVhDM1Vok1vRONV+rg+CjzG4SZKM= +google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= +google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= +google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= +google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM= +google.golang.org/grpc v1.52.0 h1:kd48UiU7EHsV4rnLyOJRuP/Il/UHE7gdDAQ+SZI7nZk= +google.golang.org/grpc v1.52.0/go.mod h1:pu6fVzoFb+NBYNAvQL08ic+lvB2IojljRYuun5vorUY= +google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= +google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= +google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= +google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= +google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= -gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= -gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/loader/e2e_test.go b/loader/e2e_test.go index 93f384e..55ef7af 100644 --- a/loader/e2e_test.go +++ b/loader/e2e_test.go @@ -28,7 +28,7 @@ func TestE2E(t *testing.T) { a := assert.New(t) clientConn, serverConn := net.Pipe() l := newLoader( - clientConn, nooReporter{}, + clientConn, noopReporter{}, loaderConfig{timeout: consts.DefaultTimeout}, log, ) @@ -162,18 +162,22 @@ func TestE2E(t *testing.T) { a.NoError(g.Wait()) } -type nooReporter struct{} +type noopReporter struct{} -func (a nooReporter) Acquire(string) types.StreamState { +func (a noopReporter) Acquire(string, uint32) types.StreamState { return streamState{} } type streamState struct{} -func (s streamState) SetSize(int) {} -func (s streamState) Reset(string) {} -func (s streamState) OnHeader(string, string) {} -func (s streamState) IoError(error) {} -func (s streamState) RSTStream(http2.ErrCode) {} -func (s streamState) GoAway(http2.ErrCode) {} -func (s streamState) End() {} +func (s streamState) FirstByteSent() {} +func (s streamState) LastByteSent() {} +func (s streamState) RequestError(error) {} +func (s streamState) SetSize(int) {} +func (s streamState) Reset(string) {} +func (s streamState) OnHeader(string, string) {} +func (s streamState) IoError(error) {} +func (s streamState) RSTStream(http2.ErrCode) {} +func (s streamState) GoAway(http2.ErrCode, []byte) {} +func (s streamState) Timeout() {} +func (s streamState) End() {} diff --git a/loader/loader.go b/loader/loader.go index f98cce2..bfdc916 100644 --- a/loader/loader.go +++ b/loader/loader.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "io" - "math" "net" "sync/atomic" "time" @@ -19,6 +18,7 @@ import ( fc "github.com/ozontech/framer/loader/flowcontrol" "github.com/ozontech/framer/loader/reciever" "github.com/ozontech/framer/loader/sender" + "github.com/ozontech/framer/loader/streams/limiter" streamsPool "github.com/ozontech/framer/loader/streams/pool" streamsStore "github.com/ozontech/framer/loader/streams/store" "github.com/ozontech/framer/loader/types" @@ -46,20 +46,33 @@ type Loader struct { log *zap.Logger } +var loaderID atomic.Uint32 + func NewLoader( conn net.Conn, - reporter types.Reporter, + reporter types.LoaderReporter, timeout time.Duration, + disableSendBatching bool, log *zap.Logger, ) (*Loader, error) { - conf := loaderConfig{timeout: timeout} + loaderID := loaderID.Add(1) + log = log.Named("loader").With(zap.Uint32("loader-id", loaderID)) + conf := loaderConfig{ + timeout: timeout, + disableSendBatching: disableSendBatching, + } err := conn.SetDeadline(time.Now().Add(conf.timeout)) if err != nil { return nil, fmt.Errorf("set conn deadline: %w", err) } - err = setupHTTP2(conn, conn, &conf) + setupLog := log + if loaderID != 1 { + setupLog = log.WithOptions(zap.IncreaseLevel(zap.ErrorLevel)) + } + + err = setupHTTP2(conn, conn, &conf, setupLog) if err != nil { return nil, err } @@ -68,11 +81,14 @@ func NewLoader( } type loaderConfig struct { - timeout time.Duration + disableSendBatching bool + timeout time.Duration + maxConcurrentStreams uint32 initialWindowSize uint32 maxDymanicTableSize uint32 maxFrameSize uint32 + maxHeaderListSize uint32 } var i int32 @@ -87,25 +103,18 @@ func newLoader( conf.timeout = consts.DefaultTimeout } loaderID := atomic.AddInt32(&i, 1) - log = log.Named("loader").With(zap.Int32("loader-id", loaderID)) log.Debug("loader created") - streamsStore := streamsStore.NewShardedStreamsMap(16, func() types.StreamStore { - return streamsStore.NewStreamsMap() - }) timeoutQueue := NewTimeoutQueue(conf.timeout) fcConn := fc.NewFlowControl(consts.DefaultInitialWindowSize) // для соединения (по спеке игнорирует SETTINGS_INITIAL_WINDOW_SIZE) var streamPoolOpts []streamsPool.Opt - if conf.maxConcurrentStreams != 0 { - streamPoolOpts = append(streamPoolOpts, streamsPool.WithMaxConcurrentStreams(conf.maxConcurrentStreams)) - } if conf.initialWindowSize != 0 { streamPoolOpts = append(streamPoolOpts, streamsPool.WithInitialWindowSize(conf.initialWindowSize)) } streamsPool := streamsPool.NewStreamsPool(reporter, streamPoolOpts...) - var hpackWrapperOpts []hpackwrapper.Opt + hpackWrapperOpts := []hpackwrapper.Opt{} if conf.maxDymanicTableSize != 0 { hpackWrapperOpts = append(hpackWrapperOpts, hpackwrapper.WithMaxDynamicTableSize(conf.maxDymanicTableSize)) } @@ -116,22 +125,36 @@ func newLoader( maxFrameSize = int(conf.maxFrameSize) } + maxHeaderListSize := consts.DefaultMaxHeaderListSize + if conf.maxHeaderListSize != 0 { + maxHeaderListSize = int(conf.maxHeaderListSize) + } + priorityFramesCh := make(chan []byte, 1) + streams := types.Streams{ + Pool: streamsPool, + Limiter: limiter.New(conf.maxConcurrentStreams), + Store: streamsStore.NewShardedStreamsMap(16, func() types.StreamStore { + return streamsStore.NewStreamsMap(1) + }), + } return &Loader{ conn: conn, timeoutQueue: timeoutQueue, log: log, loaderID: loaderID, - streamsStore: streamsStore, + streamsStore: streams.Store, streamsPool: streamsPool, sender: sender.NewSender( - conn, fcConn, priorityFramesCh, streamsPool, - streamsStore, hpackWrapper, maxFrameSize, + log, + conn, fcConn, priorityFramesCh, streams, hpackWrapper, + maxFrameSize, maxHeaderListSize, + conf.disableSendBatching, ), reciever: reciever.NewReciever( - conn, fcConn, priorityFramesCh, streamsStore, + conn, fcConn, priorityFramesCh, streams, ), } } @@ -237,9 +260,14 @@ func (l *Loader) runReciever(ctx context.Context) (err error) { return err } + l.log.Info( + "got goaway", + zap.Uint32("last_stream_id", goAwayErr.LastStreamID), + zap.ByteString("debug_data", goAwayErr.DebugData), + ) l.streamsStore.Each(func(s types.Stream) { if s.ID() > goAwayErr.LastStreamID { - s.GoAway(goAwayErr.Code) + s.GoAway(goAwayErr.Code, goAwayErr.DebugData) s.End() } }) @@ -261,10 +289,19 @@ func (l *Loader) runSender(ctx context.Context) (err error) { } func (l *Loader) DoRequest(req types.Req) { + defer func() { + r := recover() + if r == nil { + return + } + + l.log.Error("shoot panicked", zap.Any("panic", r)) + panic(r) + }() l.sender.Send(req) } -func setupHTTP2(r io.Reader, w io.Writer, conf *loaderConfig) error { +func setupHTTP2(r io.Reader, w io.Writer, conf *loaderConfig, log *zap.Logger) error { // we should not check n, because Write must return error on n < len(clientPreface) _, err := w.Write(clientPreface) if err != nil { @@ -284,8 +321,10 @@ func setupHTTP2(r io.Reader, w io.Writer, conf *loaderConfig) error { return errors.New("protocol error: first frame from server is not settings") } + var logFields []zap.Field for i := 0; i < sf.NumSettings(); i++ { s := sf.Setting(i) + logFields = append(logFields, zap.Uint32("setting_"+s.ID.String(), s.Val)) switch s.ID { case http2.SettingInitialWindowSize: conf.initialWindowSize = s.Val @@ -295,15 +334,21 @@ func setupHTTP2(r io.Reader, w io.Writer, conf *loaderConfig) error { conf.maxDymanicTableSize = s.Val case http2.SettingMaxFrameSize: conf.maxFrameSize = s.Val + case http2.SettingMaxHeaderListSize: + conf.maxHeaderListSize = s.Val default: - return fmt.Errorf("got not supported setting: %s (%d)", s.ID.String(), s.Val) + log.Sugar().Warnf("got not supported setting: %s (%d)", s.ID.String(), s.Val) } } - - err = framer.WriteSettings(http2.Setting{ - ID: http2.SettingInitialWindowSize, - Val: math.MaxUint32 & 0x7fffffff, // mask off high reserved bit - }) + log.Info("got settings", logFields...) + + err = framer.WriteSettings( + // TODO: C# have problems and disconnect with FLOW_CONTROL_ERROR + // http2.Setting{ + // ID: http2.SettingInitialWindowSize, + // Val: math.MaxUint32 & 0x7fffffff, // mask off high reserved bit + // }, + ) if err != nil { return fmt.Errorf("write settings frame: %w", err) } @@ -313,10 +358,11 @@ func setupHTTP2(r io.Reader, w io.Writer, conf *loaderConfig) error { return fmt.Errorf("write settings ack: %w", err) } - err = framer.WriteWindowUpdate(0, math.MaxUint32&0x7fffffff) - if err != nil { - return fmt.Errorf("write window update frame: %w", err) - } + // TODO: C# have problems and disconnect with FLOW_CONTROL_ERROR + // err = framer.WriteWindowUpdate(0, math.MaxUint32&0x7fffffff) + // if err != nil { + // return fmt.Errorf("write window update frame: %w", err) + // } return nil } diff --git a/loader/reciever/mock_generate_test.go b/loader/reciever/mock_generate_test.go index 0e71140..381ce1c 100644 --- a/loader/reciever/mock_generate_test.go +++ b/loader/reciever/mock_generate_test.go @@ -1,3 +1,3 @@ -//go:generate moq -pkg reciever -out ./mock_loader_types_test.go ../types StreamStore Stream FlowControl -//go:generate moq -pkg reciever -fmt goimports -out ./mock_reciever_test.go . FrameTypeProcessor +//go:generate moq -pkg reciever -out ./mock_loader_types_test.go ../types StreamStore StreamsLimiter Stream FlowControl +//go:generate moq -pkg reciever -out ./mock_reciever_test.go . FrameTypeProcessor package reciever diff --git a/loader/reciever/mock_loader_types_test.go b/loader/reciever/mock_loader_types_test.go index 535c416..8cd5cb8 100644 --- a/loader/reciever/mock_loader_types_test.go +++ b/loader/reciever/mock_loader_types_test.go @@ -257,6 +257,102 @@ func (mock *StreamStoreMock) SetCalls() []struct { return calls } +// Ensure, that StreamsLimiterMock does implement types.StreamsLimiter. +// If this is not the case, regenerate this file with moq. +var _ types.StreamsLimiter = &StreamsLimiterMock{} + +// StreamsLimiterMock is a mock implementation of types.StreamsLimiter. +// +// func TestSomethingThatUsesStreamsLimiter(t *testing.T) { +// +// // make and configure a mocked types.StreamsLimiter +// mockedStreamsLimiter := &StreamsLimiterMock{ +// ReleaseFunc: func() { +// panic("mock out the Release method") +// }, +// WaitAllowFunc: func() { +// panic("mock out the WaitAllow method") +// }, +// } +// +// // use mockedStreamsLimiter in code that requires types.StreamsLimiter +// // and then make assertions. +// +// } +type StreamsLimiterMock struct { + // ReleaseFunc mocks the Release method. + ReleaseFunc func() + + // WaitAllowFunc mocks the WaitAllow method. + WaitAllowFunc func() + + // calls tracks calls to the methods. + calls struct { + // Release holds details about calls to the Release method. + Release []struct { + } + // WaitAllow holds details about calls to the WaitAllow method. + WaitAllow []struct { + } + } + lockRelease sync.RWMutex + lockWaitAllow sync.RWMutex +} + +// Release calls ReleaseFunc. +func (mock *StreamsLimiterMock) Release() { + if mock.ReleaseFunc == nil { + panic("StreamsLimiterMock.ReleaseFunc: method is nil but StreamsLimiter.Release was just called") + } + callInfo := struct { + }{} + mock.lockRelease.Lock() + mock.calls.Release = append(mock.calls.Release, callInfo) + mock.lockRelease.Unlock() + mock.ReleaseFunc() +} + +// ReleaseCalls gets all the calls that were made to Release. +// Check the length with: +// +// len(mockedStreamsLimiter.ReleaseCalls()) +func (mock *StreamsLimiterMock) ReleaseCalls() []struct { +} { + var calls []struct { + } + mock.lockRelease.RLock() + calls = mock.calls.Release + mock.lockRelease.RUnlock() + return calls +} + +// WaitAllow calls WaitAllowFunc. +func (mock *StreamsLimiterMock) WaitAllow() { + if mock.WaitAllowFunc == nil { + panic("StreamsLimiterMock.WaitAllowFunc: method is nil but StreamsLimiter.WaitAllow was just called") + } + callInfo := struct { + }{} + mock.lockWaitAllow.Lock() + mock.calls.WaitAllow = append(mock.calls.WaitAllow, callInfo) + mock.lockWaitAllow.Unlock() + mock.WaitAllowFunc() +} + +// WaitAllowCalls gets all the calls that were made to WaitAllow. +// Check the length with: +// +// len(mockedStreamsLimiter.WaitAllowCalls()) +func (mock *StreamsLimiterMock) WaitAllowCalls() []struct { +} { + var calls []struct { + } + mock.lockWaitAllow.RLock() + calls = mock.calls.WaitAllow + mock.lockWaitAllow.RUnlock() + return calls +} + // Ensure, that StreamMock does implement types.Stream. // If this is not the case, regenerate this file with moq. var _ types.Stream = &StreamMock{} @@ -273,7 +369,10 @@ var _ types.Stream = &StreamMock{} // FCFunc: func() types.FlowControl { // panic("mock out the FC method") // }, -// GoAwayFunc: func(code http2.ErrCode) { +// FirstByteSentFunc: func() { +// panic("mock out the FirstByteSent method") +// }, +// GoAwayFunc: func(code http2.ErrCode, debugData []byte) { // panic("mock out the GoAway method") // }, // IDFunc: func() uint32 { @@ -282,15 +381,24 @@ var _ types.Stream = &StreamMock{} // IoErrorFunc: func(err error) { // panic("mock out the IoError method") // }, +// LastByteSentFunc: func() { +// panic("mock out the LastByteSent method") +// }, // OnHeaderFunc: func(name string, value string) { // panic("mock out the OnHeader method") // }, // RSTStreamFunc: func(code http2.ErrCode) { // panic("mock out the RSTStream method") // }, +// RequestErrorFunc: func(err error) { +// panic("mock out the RequestError method") +// }, // SetSizeFunc: func(n int) { // panic("mock out the SetSize method") // }, +// TimeoutFunc: func() { +// panic("mock out the Timeout method") +// }, // } // // // use mockedStream in code that requires types.Stream @@ -304,8 +412,11 @@ type StreamMock struct { // FCFunc mocks the FC method. FCFunc func() types.FlowControl + // FirstByteSentFunc mocks the FirstByteSent method. + FirstByteSentFunc func() + // GoAwayFunc mocks the GoAway method. - GoAwayFunc func(code http2.ErrCode) + GoAwayFunc func(code http2.ErrCode, debugData []byte) // IDFunc mocks the ID method. IDFunc func() uint32 @@ -313,15 +424,24 @@ type StreamMock struct { // IoErrorFunc mocks the IoError method. IoErrorFunc func(err error) + // LastByteSentFunc mocks the LastByteSent method. + LastByteSentFunc func() + // OnHeaderFunc mocks the OnHeader method. OnHeaderFunc func(name string, value string) // RSTStreamFunc mocks the RSTStream method. RSTStreamFunc func(code http2.ErrCode) + // RequestErrorFunc mocks the RequestError method. + RequestErrorFunc func(err error) + // SetSizeFunc mocks the SetSize method. SetSizeFunc func(n int) + // TimeoutFunc mocks the Timeout method. + TimeoutFunc func() + // calls tracks calls to the methods. calls struct { // End holds details about calls to the End method. @@ -330,10 +450,15 @@ type StreamMock struct { // FC holds details about calls to the FC method. FC []struct { } + // FirstByteSent holds details about calls to the FirstByteSent method. + FirstByteSent []struct { + } // GoAway holds details about calls to the GoAway method. GoAway []struct { // Code is the code argument value. Code http2.ErrCode + // DebugData is the debugData argument value. + DebugData []byte } // ID holds details about calls to the ID method. ID []struct { @@ -343,6 +468,9 @@ type StreamMock struct { // Err is the err argument value. Err error } + // LastByteSent holds details about calls to the LastByteSent method. + LastByteSent []struct { + } // OnHeader holds details about calls to the OnHeader method. OnHeader []struct { // Name is the name argument value. @@ -355,20 +483,32 @@ type StreamMock struct { // Code is the code argument value. Code http2.ErrCode } + // RequestError holds details about calls to the RequestError method. + RequestError []struct { + // Err is the err argument value. + Err error + } // SetSize holds details about calls to the SetSize method. SetSize []struct { // N is the n argument value. N int } + // Timeout holds details about calls to the Timeout method. + Timeout []struct { + } } - lockEnd sync.RWMutex - lockFC sync.RWMutex - lockGoAway sync.RWMutex - lockID sync.RWMutex - lockIoError sync.RWMutex - lockOnHeader sync.RWMutex - lockRSTStream sync.RWMutex - lockSetSize sync.RWMutex + lockEnd sync.RWMutex + lockFC sync.RWMutex + lockFirstByteSent sync.RWMutex + lockGoAway sync.RWMutex + lockID sync.RWMutex + lockIoError sync.RWMutex + lockLastByteSent sync.RWMutex + lockOnHeader sync.RWMutex + lockRSTStream sync.RWMutex + lockRequestError sync.RWMutex + lockSetSize sync.RWMutex + lockTimeout sync.RWMutex } // End calls EndFunc. @@ -425,20 +565,49 @@ func (mock *StreamMock) FCCalls() []struct { return calls } +// FirstByteSent calls FirstByteSentFunc. +func (mock *StreamMock) FirstByteSent() { + if mock.FirstByteSentFunc == nil { + panic("StreamMock.FirstByteSentFunc: method is nil but Stream.FirstByteSent was just called") + } + callInfo := struct { + }{} + mock.lockFirstByteSent.Lock() + mock.calls.FirstByteSent = append(mock.calls.FirstByteSent, callInfo) + mock.lockFirstByteSent.Unlock() + mock.FirstByteSentFunc() +} + +// FirstByteSentCalls gets all the calls that were made to FirstByteSent. +// Check the length with: +// +// len(mockedStream.FirstByteSentCalls()) +func (mock *StreamMock) FirstByteSentCalls() []struct { +} { + var calls []struct { + } + mock.lockFirstByteSent.RLock() + calls = mock.calls.FirstByteSent + mock.lockFirstByteSent.RUnlock() + return calls +} + // GoAway calls GoAwayFunc. -func (mock *StreamMock) GoAway(code http2.ErrCode) { +func (mock *StreamMock) GoAway(code http2.ErrCode, debugData []byte) { if mock.GoAwayFunc == nil { panic("StreamMock.GoAwayFunc: method is nil but Stream.GoAway was just called") } callInfo := struct { - Code http2.ErrCode + Code http2.ErrCode + DebugData []byte }{ - Code: code, + Code: code, + DebugData: debugData, } mock.lockGoAway.Lock() mock.calls.GoAway = append(mock.calls.GoAway, callInfo) mock.lockGoAway.Unlock() - mock.GoAwayFunc(code) + mock.GoAwayFunc(code, debugData) } // GoAwayCalls gets all the calls that were made to GoAway. @@ -446,10 +615,12 @@ func (mock *StreamMock) GoAway(code http2.ErrCode) { // // len(mockedStream.GoAwayCalls()) func (mock *StreamMock) GoAwayCalls() []struct { - Code http2.ErrCode + Code http2.ErrCode + DebugData []byte } { var calls []struct { - Code http2.ErrCode + Code http2.ErrCode + DebugData []byte } mock.lockGoAway.RLock() calls = mock.calls.GoAway @@ -516,6 +687,33 @@ func (mock *StreamMock) IoErrorCalls() []struct { return calls } +// LastByteSent calls LastByteSentFunc. +func (mock *StreamMock) LastByteSent() { + if mock.LastByteSentFunc == nil { + panic("StreamMock.LastByteSentFunc: method is nil but Stream.LastByteSent was just called") + } + callInfo := struct { + }{} + mock.lockLastByteSent.Lock() + mock.calls.LastByteSent = append(mock.calls.LastByteSent, callInfo) + mock.lockLastByteSent.Unlock() + mock.LastByteSentFunc() +} + +// LastByteSentCalls gets all the calls that were made to LastByteSent. +// Check the length with: +// +// len(mockedStream.LastByteSentCalls()) +func (mock *StreamMock) LastByteSentCalls() []struct { +} { + var calls []struct { + } + mock.lockLastByteSent.RLock() + calls = mock.calls.LastByteSent + mock.lockLastByteSent.RUnlock() + return calls +} + // OnHeader calls OnHeaderFunc. func (mock *StreamMock) OnHeader(name string, value string) { if mock.OnHeaderFunc == nil { @@ -584,6 +782,38 @@ func (mock *StreamMock) RSTStreamCalls() []struct { return calls } +// RequestError calls RequestErrorFunc. +func (mock *StreamMock) RequestError(err error) { + if mock.RequestErrorFunc == nil { + panic("StreamMock.RequestErrorFunc: method is nil but Stream.RequestError was just called") + } + callInfo := struct { + Err error + }{ + Err: err, + } + mock.lockRequestError.Lock() + mock.calls.RequestError = append(mock.calls.RequestError, callInfo) + mock.lockRequestError.Unlock() + mock.RequestErrorFunc(err) +} + +// RequestErrorCalls gets all the calls that were made to RequestError. +// Check the length with: +// +// len(mockedStream.RequestErrorCalls()) +func (mock *StreamMock) RequestErrorCalls() []struct { + Err error +} { + var calls []struct { + Err error + } + mock.lockRequestError.RLock() + calls = mock.calls.RequestError + mock.lockRequestError.RUnlock() + return calls +} + // SetSize calls SetSizeFunc. func (mock *StreamMock) SetSize(n int) { if mock.SetSizeFunc == nil { @@ -616,6 +846,33 @@ func (mock *StreamMock) SetSizeCalls() []struct { return calls } +// Timeout calls TimeoutFunc. +func (mock *StreamMock) Timeout() { + if mock.TimeoutFunc == nil { + panic("StreamMock.TimeoutFunc: method is nil but Stream.Timeout was just called") + } + callInfo := struct { + }{} + mock.lockTimeout.Lock() + mock.calls.Timeout = append(mock.calls.Timeout, callInfo) + mock.lockTimeout.Unlock() + mock.TimeoutFunc() +} + +// TimeoutCalls gets all the calls that were made to Timeout. +// Check the length with: +// +// len(mockedStream.TimeoutCalls()) +func (mock *StreamMock) TimeoutCalls() []struct { +} { + var calls []struct { + } + mock.lockTimeout.RLock() + calls = mock.calls.Timeout + mock.lockTimeout.RUnlock() + return calls +} + // Ensure, that FlowControlMock does implement types.FlowControl. // If this is not the case, regenerate this file with moq. var _ types.FlowControl = &FlowControlMock{} diff --git a/loader/reciever/mock_reciever_test.go b/loader/reciever/mock_reciever_test.go index e6443bc..de4cced 100644 --- a/loader/reciever/mock_reciever_test.go +++ b/loader/reciever/mock_reciever_test.go @@ -4,9 +4,8 @@ package reciever import ( - "sync" - "github.com/ozontech/framer/frameheader" + "sync" ) // Ensure, that FrameTypeProcessorMock does implement FrameTypeProcessor. diff --git a/loader/reciever/processor.go b/loader/reciever/processor.go index 0f12598..aa34a28 100644 --- a/loader/reciever/processor.go +++ b/loader/reciever/processor.go @@ -27,21 +27,22 @@ func NewProcessor(subprocessors []FrameTypeProcessor) *Processor { } func NewDefaultProcessor( - streams types.StreamStore, + streamsStore types.StreamStore, + streamsLimiter types.StreamsLimiter, fcConn types.FlowControl, priorityFramesChan chan<- []byte, ) *Processor { - headersFrameProcessor := newHeadersFrameProcessor(streams) + headersFrameProcessor := newHeadersFrameProcessor(streamsStore, streamsLimiter) return NewProcessor([]FrameTypeProcessor{ - http2.FrameData: newDataFrameProcessor(priorityFramesChan, streams), + http2.FrameData: newDataFrameProcessor(priorityFramesChan, streamsStore, streamsLimiter), http2.FrameHeaders: headersFrameProcessor, // http2.FramePriority not supported - http2.FrameRSTStream: newRSTStreamFrameProcessor(streams), + http2.FrameRSTStream: newRSTStreamFrameProcessor(streamsStore, streamsLimiter), http2.FrameSettings: settingsProcessor{}, // http2.FramePushPromise not supported http2.FramePing: newPingFrameProcessor(priorityFramesChan), http2.FrameGoAway: newGoAwayFrameProcessor(), - http2.FrameWindowUpdate: newWindowUpdateFrameProcessor(streams, fcConn), + http2.FrameWindowUpdate: newWindowUpdateFrameProcessor(streamsStore, fcConn), http2.FrameContinuation: headersFrameProcessor, }) } @@ -119,19 +120,25 @@ func (p *pingFrameProcessor) Process(_ frameheader.FrameHeader, payload []byte, } type dataFrameProcessor struct { - outFramesChan chan<- []byte - streams types.StreamStore + outFramesChan chan<- []byte + streamsStore types.StreamStore + streamsLimiter types.StreamsLimiter currentWindowUpdateFrame []byte nextWindowUpdateFrame []byte windowUpdateAcc int } -func newDataFrameProcessor(outFramesChan chan<- []byte, streams types.StreamStore) *dataFrameProcessor { +func newDataFrameProcessor( + outFramesChan chan<- []byte, + streamsStore types.StreamStore, + streamsLimiter types.StreamsLimiter, +) *dataFrameProcessor { windowUpdateFrame := [9 + 4]byte{0, 0, 4, byte(http2.FrameWindowUpdate)} return &dataFrameProcessor{ outFramesChan, - streams, + streamsStore, + streamsLimiter, windowUpdateFrame[:], bytes.Clone(windowUpdateFrame[:]), 0, @@ -162,7 +169,8 @@ func (p *dataFrameProcessor) Process(header frameheader.FrameHeader, _ []byte, i } if header.Flags().Has(http2.FlagDataEndStream) { - stream := p.streams.GetAndDelete(header.StreamID()) + p.streamsLimiter.Release() + stream := p.streamsStore.GetAndDelete(header.StreamID()) if stream != nil { stream.End() } @@ -172,13 +180,18 @@ func (p *dataFrameProcessor) Process(header frameheader.FrameHeader, _ []byte, i } type headersFrameProcessor struct { - streams types.StreamStore + streamsStore types.StreamStore + streamsLimiter types.StreamsLimiter + hpackDecoder *hpack.Decoder currentStream types.Stream } -func newHeadersFrameProcessor(streams types.StreamStore) *headersFrameProcessor { - p := &headersFrameProcessor{streams: streams} +func newHeadersFrameProcessor(store types.StreamStore, limiter types.StreamsLimiter) *headersFrameProcessor { + p := &headersFrameProcessor{ + streamsStore: store, + streamsLimiter: limiter, + } p.hpackDecoder = hpack.NewDecoder(4096, p.OnHeader) return p } @@ -189,7 +202,7 @@ func (p *headersFrameProcessor) OnHeader(f hpack.HeaderField) { func (p *headersFrameProcessor) Process(header frameheader.FrameHeader, payload []byte, incomplete bool) error { streamID := header.StreamID() - stream := p.streams.Get(streamID) + stream := p.streamsStore.Get(streamID) p.currentStream = stream p.hpackDecoder.SetEmitEnabled(stream != nil) @@ -204,18 +217,23 @@ func (p *headersFrameProcessor) Process(header frameheader.FrameHeader, payload if header.Flags().Has(http2.FlagHeadersEndStream) && stream != nil { stream.End() - p.streams.Delete(streamID) + p.streamsStore.Delete(streamID) + p.streamsLimiter.Release() } return nil } type rstStreamFrameProcessor struct { - streams types.StreamStore - errCode uint32 + streamsStore types.StreamStore + streamsLimiter types.StreamsLimiter + errCode uint32 } -func newRSTStreamFrameProcessor(streams types.StreamStore) *rstStreamFrameProcessor { - return &rstStreamFrameProcessor{streams, 0} +func newRSTStreamFrameProcessor( + streamsStore types.StreamStore, + streamsLimiter types.StreamsLimiter, +) *rstStreamFrameProcessor { + return &rstStreamFrameProcessor{streamsStore, streamsLimiter, 0} } func (p *rstStreamFrameProcessor) Process(header frameheader.FrameHeader, payload []byte, incomplete bool) error { @@ -229,7 +247,8 @@ func (p *rstStreamFrameProcessor) Process(header frameheader.FrameHeader, payloa errCode := http2.ErrCode(p.errCode) streamID := header.StreamID() - stream := p.streams.GetAndDelete(streamID) + p.streamsLimiter.Release() + stream := p.streamsStore.GetAndDelete(streamID) if stream != nil { stream.RSTStream(errCode) stream.End() @@ -238,15 +257,15 @@ func (p *rstStreamFrameProcessor) Process(header frameheader.FrameHeader, payloa } type windowUpdateFrameProcessor struct { - fcConn types.FlowControl - streams types.StreamStore - increment uint32 + fcConn types.FlowControl + streamsStore types.StreamStore + increment uint32 } func newWindowUpdateFrameProcessor( - streams types.StreamStore, fcConn types.FlowControl, + streamsStore types.StreamStore, fcConn types.FlowControl, ) *windowUpdateFrameProcessor { - return &windowUpdateFrameProcessor{fcConn, streams, 0} + return &windowUpdateFrameProcessor{fcConn, streamsStore, 0} } func (p *windowUpdateFrameProcessor) Process(header frameheader.FrameHeader, payload []byte, incomplete bool) error { @@ -263,7 +282,7 @@ func (p *windowUpdateFrameProcessor) Process(header frameheader.FrameHeader, pay if streamID == 0 { fc = p.fcConn } else { - stream := p.streams.Get(streamID) + stream := p.streamsStore.Get(streamID) if stream == nil { return nil } @@ -299,18 +318,20 @@ func (p *goAwayFrameProcessor) Process(_ frameheader.FrameHeader, payload []byte payload = payload[1:] p.errCode = (p.errCode << 8) | uint32(b) } + p.debugData = append(p.debugData, payload...) if incomplete { return nil } - code := http2.ErrCode(p.errCode) err := GoAwayError{ - Code: code, + Code: http2.ErrCode(p.errCode), LastStreamID: p.lastStreamID, DebugData: bytes.Clone(p.debugData), } + + p.index = 0 p.errCode = 0 p.lastStreamID = 0 p.debugData = p.debugData[:0] diff --git a/loader/reciever/processor_test.go b/loader/reciever/processor_test.go index a41f57a..8c3db08 100644 --- a/loader/reciever/processor_test.go +++ b/loader/reciever/processor_test.go @@ -131,14 +131,14 @@ func TestDataFrameProcessor(t *testing.T) { // do nothing on incomplete { - fp := newDataFrameProcessor(nil, nil) + fp := newDataFrameProcessor(nil, nil, nil) a.NoError(fp.Process(frameheader.FrameHeader{}, nil, true)) } // publish window update frame on acummulating more than trigger length { priorityFramesChan := make(chan []byte, 1) - fp := newDataFrameProcessor(priorityFramesChan, nil) + fp := newDataFrameProcessor(priorityFramesChan, nil, nil) fh := frameheader.NewFrameHeader() const streamID uint32 = 123 fh.SetStreamID(streamID) @@ -171,7 +171,7 @@ func TestDataFrameProcessor(t *testing.T) { } } - // do nothing on incomplete + // release stream on end flag { const streamID uint32 = 184921 stream := &StreamMock{ @@ -183,7 +183,10 @@ func TestDataFrameProcessor(t *testing.T) { return stream }, } - fp := newDataFrameProcessor(nil, streams) + streamsLimiter := &StreamsLimiterMock{ + ReleaseFunc: func() {}, + } + fp := newDataFrameProcessor(nil, streams, streamsLimiter) fh := frameheader.NewFrameHeader() fh.SetStreamID(streamID) fh.SetFlags(http2.FlagDataEndStream) @@ -209,6 +212,8 @@ func TestHeadersFrameProcessor(t *testing.T) { a.Equal(streamID, s) return stream }, + }, &StreamsLimiterMock{ + ReleaseFunc: func() {}, }) b := bytes.NewBuffer(nil) @@ -248,6 +253,8 @@ func TestHeadersFrameProcessor(t *testing.T) { return stream }, DeleteFunc: func(s uint32) { a.Equal(streamID, s) }, + }, &StreamsLimiterMock{ + ReleaseFunc: func() {}, }) fh := frameheader.NewFrameHeader() @@ -277,7 +284,9 @@ func TestRSTStreamFrameProcessor(t *testing.T) { return stream }, } - fp := newRSTStreamFrameProcessor(streams) + fp := newRSTStreamFrameProcessor(streams, &StreamsLimiterMock{ + ReleaseFunc: func() {}, + }) framer := newFramer() a.NoError(framer.WriteRSTStream(streamID, errCode)) diff --git a/loader/reciever/reciever.go b/loader/reciever/reciever.go index 10c3bab..8bb7f2e 100644 --- a/loader/reciever/reciever.go +++ b/loader/reciever/reciever.go @@ -45,13 +45,13 @@ func NewReciever( conn net.Conn, fcConn types.FlowControl, priorityFramesChan chan<- []byte, - streams types.StreamStore, + streams types.Streams, ) *Reciever { return &Reciever{ conn, make([]byte, consts.RecieveBufferSize), make([]byte, consts.RecieveBufferSize), - NewDefaultProcessor(streams, fcConn, priorityFramesChan), + NewDefaultProcessor(streams.Store, streams.Limiter, fcConn, priorityFramesChan), } } diff --git a/loader/sender/sender.go b/loader/sender/sender.go index 1a30be9..1a77819 100644 --- a/loader/sender/sender.go +++ b/loader/sender/sender.go @@ -7,9 +7,9 @@ import ( "time" "github.com/ozontech/framer/consts" - streamsPool "github.com/ozontech/framer/loader/streams/pool" "github.com/ozontech/framer/loader/types" hpackwrapper "github.com/ozontech/framer/utils/hpack_wrapper" + "go.uber.org/zap" ) type writeCmd struct { @@ -19,13 +19,15 @@ type writeCmd struct { type frame struct { chunks [3][]byte - types.Releaser + cbs [3]func() } type Sender struct { - maxFrameSize int - streamPool *streamsPool.StreamsPool - streamStore types.StreamStore + log *zap.Logger + + maxFrameSize int + maxHeaderListSize int + streams types.Streams fcConn types.FlowControl conn io.Writer @@ -36,36 +38,56 @@ type Sender struct { priorityFrameChan chan []byte frameChan chan frame + + disableSendBatching bool } func NewSender( + log *zap.Logger, conn io.Writer, fcConn types.FlowControl, - priorityChunkChan chan []byte, - streamPool *streamsPool.StreamsPool, - streamStore types.StreamStore, + priorityFrameChan chan []byte, + streams types.Streams, hpackEncWrapper *hpackwrapper.Wrapper, maxFrameSize int, + maxHeaderListSize int, + disableSendBatching bool, ) *Sender { return &Sender{ - maxFrameSize, - streamPool, - streamStore, + log, + maxFrameSize, maxHeaderListSize, + + streams, fcConn, conn, hpackEncWrapper, 1, make(chan writeCmd), - priorityChunkChan, - make(chan frame, 1000), + priorityFrameChan, + make(chan frame, 16), + disableSendBatching, } } func (s *Sender) Run(ctx context.Context) error { ctx, cancel := context.WithCancel(ctx) defer cancel() - go s.processLoop(ctx) + if s.disableSendBatching { + go instantProcessor( + ctx, + s.writeCmdChan, + s.priorityFrameChan, + s.frameChan, + ) + } else { + go batchedProcessor( + ctx, + s.writeCmdChan, + s.priorityFrameChan, + s.frameChan, + ) + } return s.sendLoop(ctx) } @@ -84,48 +106,115 @@ func (s *Sender) sendLoop(ctx context.Context) error { } } -func (s *Sender) processLoop(ctx context.Context) { - var ( - writeCmdChan chan<- writeCmd = s.writeCmdChan - priorityChunkChan <-chan []byte = s.priorityFrameChan - frameChan <-chan frame = s.frameChan - ) +func instantProcessor( + ctx context.Context, + writeCmdChan chan<- writeCmd, + priorityChunkChan <-chan []byte, + frameChan <-chan frame, +) { + buffers, cbs := make([][]byte, 0, 3), make(multiCB, 0, 3) + buffersNext, cbsNext := make([][]byte, 0, 3), make(multiCB, 0, 3) - rotator := newRotator() - buffers, releasers := rotator.Rotate() + doWrite := func() { + writeCmdChan <- writeCmd{buffers, cbs.Call} + buffers, cbs, buffersNext, cbsNext = buffersNext[:0], cbsNext[:0], buffers, cbs + } + +l: + for { + select { + case b := <-priorityChunkChan: + buffers = append(buffers, b) + default: + select { + case b := <-priorityChunkChan: + buffers = append(buffers, b) + case f, ok := <-frameChan: + if !ok { + break l + } + + for _, c := range f.chunks { + if c == nil { + break + } + buffers = append(buffers, c) + } + for _, cb := range f.cbs { + if cb != nil { + cbs = append(cbs, cb) + } + } + case <-ctx.Done(): + return + } + } + doWrite() + } + + for { + select { + case b := <-priorityChunkChan: + buffers = append(buffers, b) + case <-ctx.Done(): + return + } + doWrite() + } +} + +func batchedProcessor( + ctx context.Context, + writeCmdChan chan<- writeCmd, + priorityChunkChan <-chan []byte, + frameChan <-chan frame, +) { + buffers, cbs := make([][]byte, 1, 16), make(multiCB, 0, 16) + buffersNext, cbsNext := make([][]byte, 1, 16), make(multiCB, 0, 16) + + realTimer := time.NewTimer(consts.SendBatchTimeout) + var noopTimerChan <-chan time.Time + timerChan := noopTimerChan lastWrite := time.Now() - doWrite := func(b net.Buffers) { + doWrite := func(withFirst bool) { + var b net.Buffers + if withFirst { + b = buffers[:] + } else { + b = buffers[1:] + } if len(b) == 0 { return } - writeCmdChan <- writeCmd{b, releasers.Release} - buffers, releasers = rotator.Rotate() + writeCmdChan <- writeCmd{b, cbs.Call} + buffers, cbs, buffersNext, cbsNext = buffersNext[:1], cbsNext[:0], buffers, cbs lastWrite = time.Now() + timerChan = noopTimerChan } - timer := time.NewTimer(consts.SendBatchTimeout) l: for { - timer.Reset(consts.SendBatchTimeout - time.Since(lastWrite)) select { case b := <-priorityChunkChan: // если получили приритетный фрейм, то он должен записаться первым buffers[0] = b - doWrite(buffers) + doWrite(true) default: select { case b := <-priorityChunkChan: buffers[0] = b - doWrite(buffers) + doWrite(true) // тут обязательно реслайситься т.к. net.Buffers изменяет слайс case f, ok := <-frameChan: + timerChan = realTimer.C + if !ok { - doWrite(buffers[1:]) + doWrite(false) break l } - if len(f.chunks)+len(buffers) > cap(buffers) { - doWrite(buffers[1:]) + if len(f.chunks)+len(buffers) > 2048 { + doWrite(false) } for _, c := range f.chunks { @@ -135,18 +224,21 @@ l: buffers = append(buffers, c) } - if f.Releaser != nil { - releasers = append(releasers, f.Releaser) + for _, cb := range f.cbs { + if cb != nil { + cbs = append(cbs, cb) + } } case <-ctx.Done(): - doWrite(buffers[1:]) + doWrite(false) return // time.After аллоцирует память в куче т.к. на каждый вызов создает канал // => сильно медленее переиспользования таймера - case <-timer.C: - doWrite(buffers[1:]) + case <-timerChan: + doWrite(false) } } + realTimer.Reset(consts.SendBatchTimeout - time.Since(lastWrite)) } for { @@ -154,7 +246,7 @@ l: case b := <-priorityChunkChan: // если получили приритетный фрейм, то он должен записаться первым buffers[0] = b - doWrite(buffers) + doWrite(true) case <-ctx.Done(): return } @@ -165,79 +257,42 @@ func (s *Sender) Send(a types.Req) { s.send(a) } -// var n atomic.Int64 -// -// func init() { -// go func() { -// p := message.NewPrinter(language.English) -// for range time.Tick(time.Second) { -// p.Printf("%d\n", n.Swap(0)) -// } -// }() -// } -// -// const ( -// acquire = true -// setupRequest = true -// saveToStore = true -// ) -// -// func (s *Sender) sendNoop1(a types.Req) { -// n.Add(1) -// -// s.streamID += 2 -// if setupRequest { -// a.SetUp(s.streamID, s.hpackEncWrapper) -// } -// a.Release() -// -// if acquire { -// stream := s.streamPool.Acquire(s.streamID, a.Tag()) -// stream.SetSize(a.Size()) -// stream.End() -// -// if saveToStore { -// s.streamStore.Set(s.streamID, stream) -// s.streamStore.Delete(s.streamID) -// } -// } -// } -// -// func (s *Sender) sendNoop2(a types.Req) { -// n.Add(1) -// s.streamID += 2 -// // a.SetUp(s.streamID, s.hpackEncWrapper) -// -// stream := s.streamPool.Acquire(s.streamID, a.Tag()) -// a.Release() -// stream.End() -// } - func (s *Sender) send(a types.Req) { // n.Add(1) s.streamID += 2 - frames := a.SetUp(s.maxFrameSize, s.streamID, s.hpackEncWrapper) - stream := s.streamPool.Acquire(s.streamID, a.Tag()) - stream.SetSize(a.Size()) - s.streamStore.Set(s.streamID, stream) + s.streams.Limiter.WaitAllow() + stream := s.streams.Pool.Acquire(s.streamID, a.Tag()) - var ( - i int - lastIndex = len(frames) - 1 + frames, err := a.SetUp( + s.maxFrameSize, s.maxHeaderListSize, + s.streamID, s.hpackEncWrapper, ) - for { + if err != nil { + stream.RequestError(err) + stream.End() + return + } + stream.SetSize(a.Size()) + + s.streams.Store.Set(s.streamID, stream) + + lastIndex := len(frames) - 1 + for i := 0; i <= lastIndex; i++ { f := frames[i] if !stream.FC().Wait(f.FlowControlPrice) || !s.fcConn.Wait(f.FlowControlPrice) { return } + frame := frame{chunks: f.Chunks} + if i == 0 { + frame.cbs[0] = stream.FirstByteSent + } if i == lastIndex { - s.frameChan <- frame{chunks: f.Chunks, Releaser: a} - break + frame.cbs[1] = stream.LastByteSent + frame.cbs[2] = a.Release } - s.frameChan <- frame{chunks: f.Chunks} - i++ + s.frameChan <- frame } } @@ -245,30 +300,10 @@ func (s *Sender) Flush() { close(s.frameChan) } -type rotatorItem struct { - buffers [consts.ChunksBufferSize][]byte - releasers [consts.ChunksBufferSize]types.Releaser -} -type rotator struct { - // net.Buffers.WriteTo может уменьшать капасити слайса, - // поэтому, чтобы переиспользовать память используется массив, с которого создается слайс - current *rotatorItem - next *rotatorItem -} - -func newRotator() *rotator { - return &rotator{new(rotatorItem), new(rotatorItem)} -} - -func (r *rotator) Rotate() ([][]byte, releasers) { - r.current, r.next = r.next, r.current - return r.current.buffers[:1], r.current.releasers[:0] -} - -type releasers []types.Releaser +type multiCB []func() -func (rr releasers) Release() { - for _, r := range rr { - r.Release() +func (m multiCB) Call() { + for _, cb := range m { + cb() } } diff --git a/loader/streams/limiter/limiter.go b/loader/streams/limiter/limiter.go new file mode 100644 index 0000000..8cd5558 --- /dev/null +++ b/loader/streams/limiter/limiter.go @@ -0,0 +1,46 @@ +package limiter + +import ( + "sync" + + "github.com/ozontech/framer/loader/types" +) + +func New(quota uint32) types.StreamsLimiter { + if quota == 0 { + return noopLimiter{} + } + return newLimiter(quota) +} + +type noopLimiter struct{} + +func (noopLimiter) WaitAllow() {} +func (noopLimiter) Release() {} + +type limiter struct { + quota uint32 + cond *sync.Cond +} + +func newLimiter(quota uint32) *limiter { + return &limiter{quota, sync.NewCond(&sync.Mutex{})} +} + +func (l *limiter) WaitAllow() { + l.cond.L.Lock() + defer l.cond.L.Unlock() + for l.quota == 0 { + l.cond.Wait() + } + + l.quota-- +} + +func (l *limiter) Release() { + l.cond.L.Lock() + defer l.cond.Signal() + defer l.cond.L.Unlock() + + l.quota++ +} diff --git a/loader/streams/pool/pool.go b/loader/streams/pool/pool.go index 9d3b7d0..efc0871 100644 --- a/loader/streams/pool/pool.go +++ b/loader/streams/pool/pool.go @@ -1,7 +1,6 @@ package pool import ( - "math" "sync" "github.com/ozontech/framer/consts" @@ -15,18 +14,16 @@ type StreamsPool struct { cond *sync.Cond pool []*streamImpl - inUse uint32 - maxConcurrentStreams uint32 - initialWindowSize uint32 + inUse uint32 + initialWindowSize uint32 } func NewStreamsPool(reporter types.LoaderReporter, opts ...Opt) *StreamsPool { p := &StreamsPool{ - reporter: reporter, - cond: sync.NewCond(&sync.Mutex{}), - pool: make([]*streamImpl, 0, 1024), - maxConcurrentStreams: math.MaxUint32, - initialWindowSize: consts.DefaultInitialWindowSize, + reporter: reporter, + cond: sync.NewCond(&sync.Mutex{}), + pool: make([]*streamImpl, 0, 1024), + initialWindowSize: consts.DefaultInitialWindowSize, } for _, o := range opts { o.apply(p) @@ -37,10 +34,6 @@ func NewStreamsPool(reporter types.LoaderReporter, opts ...Opt) *StreamsPool { func (p *StreamsPool) Acquire(streamID uint32, tag string) types.Stream { var stream *streamImpl p.cond.L.Lock() - if p.inUse >= p.maxConcurrentStreams { - p.cond.Wait() - } - p.inUse++ l := len(p.pool) if l > 0 { @@ -55,7 +48,7 @@ func (p *StreamsPool) Acquire(streamID uint32, tag string) types.Stream { stream.streamID = streamID stream.fc.Reset(p.initialWindowSize) - stream.StreamState = p.reporter.Acquire(tag) + stream.StreamState = p.reporter.Acquire(tag, streamID) return stream } @@ -110,12 +103,6 @@ type Opt interface { apply(*StreamsPool) } -type WithMaxConcurrentStreams uint32 - -func (s WithMaxConcurrentStreams) apply(p *StreamsPool) { - p.maxConcurrentStreams = uint32(s) -} - type WithInitialWindowSize uint32 func (s WithInitialWindowSize) apply(p *StreamsPool) { diff --git a/loader/streams/store/store.go b/loader/streams/store/store.go index 7027526..6319fe3 100644 --- a/loader/streams/store/store.go +++ b/loader/streams/store/store.go @@ -45,9 +45,9 @@ type StreamsMap struct { mu *sync.RWMutex } -func NewStreamsMap() *StreamsMap { +func NewStreamsMap(size int) *StreamsMap { return &StreamsMap{ - m: NewStreamsMapUnlocked(1024), + m: NewStreamsMapUnlocked(size), mu: &sync.RWMutex{}, } } diff --git a/loader/types/req.go b/loader/types/req.go index 5083e2c..5458828 100644 --- a/loader/types/req.go +++ b/loader/types/req.go @@ -4,11 +4,17 @@ import "io" type HPackFieldWriter interface { SetWriter(w io.Writer) - WriteField(k, v string) error + WriteField(k, v string) } type Req interface { - SetUp(maxFramePayloadLen int, streamID uint32, fieldWriter HPackFieldWriter) []Frame + SetUp( + maxFramePayloadLen int, + maxHeaderListSize int, + streamID uint32, + fieldWriter HPackFieldWriter, + ) ([]Frame, error) + FullMethodName() string Tag() string Size() int Releaser diff --git a/loader/types/stream.go b/loader/types/stream.go index 52205a0..d9b3fba 100644 --- a/loader/types/stream.go +++ b/loader/types/stream.go @@ -4,6 +4,21 @@ import ( "golang.org/x/net/http2" ) +type Streams struct { + Pool StreamsPool + Limiter StreamsLimiter + Store StreamStore +} + +type StreamsPool interface { + Acquire(streamID uint32, tag string) Stream +} + +type StreamsLimiter interface { + WaitAllow() // дождаться разрешение лимитера на создание нового стрима + Release() // сообщить о завершении стрима +} + type StreamStore interface { Set(uint32, Stream) // добавить стрим в хранилище Get(uint32) Stream // получить стрим из хранилища @@ -13,7 +28,7 @@ type StreamStore interface { } type LoaderReporter interface { - Acquire(tag string) StreamState + Acquire(tag string, streamID uint32) StreamState } type Reporter interface { @@ -23,12 +38,16 @@ type Reporter interface { } type StreamState interface { - SetSize(int) // сообщаем какой размер у данного унарного стрима - OnHeader(name, value string) // сообщаем хедеры по мере их получения - IoError(err error) // сообщаем стриму, что получили ошибку ввода/вывода - RSTStream(code http2.ErrCode) // если получили RST_STREAM - GoAway(code http2.ErrCode) // получен фрейм goaway со stream id > текущего - End() // завершение стрима. отправляет результат в отчет + RequestError(error) // не смогли подготовить запрос: не смогли декодировать запрос, привысили лимиты сервера (SettingMaxHeaderListSize) etc + FirstByteSent() // отправили первый байт + LastByteSent() // отправили последний байт + SetSize(int) // сообщаем какой размер у данного унарного стрима + OnHeader(name, value string) // сообщаем хедеры по мере их получения + IoError(error) // сообщаем стриму, что получили ошибку ввода/вывода + RSTStream(code http2.ErrCode) // если получили RST_STREAM + GoAway(code http2.ErrCode, debugData []byte) // получен фрейм goaway со stream id > текущего + Timeout() // случился таймаут стрима + End() // завершение стрима. отправляет результат в отчет } type Stream interface { diff --git a/report/multi/multi.go b/report/multi/multi.go index bdade9b..17e60e0 100644 --- a/report/multi/multi.go +++ b/report/multi/multi.go @@ -37,20 +37,32 @@ func (m *Multi) Close() error { return g.Wait() } -func (m *Multi) Acquire(tag string) types.StreamState { +func (m *Multi) Acquire(tag string, streamID uint32) types.StreamState { ms, ok := m.pool.Acquire() if !ok { ms = make(multiState, len(m.nested)) } for i, r := range m.nested { - ms[i] = r.Acquire(tag) + ms[i] = r.Acquire(tag, streamID) } return ms } type multiState []types.StreamState +func (s multiState) FirstByteSent() { + for _, s := range s { + s.FirstByteSent() + } +} + +func (s multiState) LastByteSent() { + for _, s := range s { + s.FirstByteSent() + } +} + func (s multiState) SetSize(n int) { for _, s := range s { s.SetSize(n) @@ -69,15 +81,27 @@ func (s multiState) RSTStream(code http2.ErrCode) { } } +func (s multiState) RequestError(err error) { + for _, s := range s { + s.RequestError(err) + } +} + func (s multiState) IoError(err error) { for _, s := range s { s.IoError(err) } } -func (s multiState) GoAway(errCode http2.ErrCode) { +func (s multiState) GoAway(errCode http2.ErrCode, debugData []byte) { + for _, s := range s { + s.GoAway(errCode, debugData) + } +} + +func (s multiState) Timeout() { for _, s := range s { - s.GoAway(errCode) + s.Timeout() } } diff --git a/report/noop/noop.go b/report/noop/noop.go index db2c25d..98c0fbd 100644 --- a/report/noop/noop.go +++ b/report/noop/noop.go @@ -23,15 +23,19 @@ func (m *Noop) Close() error { return nil } -func (m *Noop) Acquire(string) types.StreamState { +func (m *Noop) Acquire(string, uint32) types.StreamState { return &noopState{} } type noopState struct{} -func (s *noopState) SetSize(int) {} -func (s *noopState) OnHeader(string, string) {} -func (s *noopState) RSTStream(http2.ErrCode) {} -func (s *noopState) IoError(error) {} -func (s *noopState) GoAway(http2.ErrCode) {} -func (s *noopState) End() {} +func (s *noopState) FirstByteSent() {} +func (s *noopState) LastByteSent() {} +func (s *noopState) RequestError(error) {} +func (s *noopState) SetSize(int) {} +func (s *noopState) OnHeader(string, string) {} +func (s *noopState) RSTStream(http2.ErrCode) {} +func (s *noopState) IoError(error) {} +func (s *noopState) GoAway(http2.ErrCode, []byte) {} +func (s *noopState) Timeout() {} +func (s *noopState) End() {} diff --git a/report/phout/phout.go b/report/phout/phout.go index 0dadd48..903d540 100644 --- a/report/phout/phout.go +++ b/report/phout/phout.go @@ -21,16 +21,14 @@ type Reporter struct { w *bufio.Writer ch chan *streamState pool *pool.SlicePool[*streamState] - timeout time.Duration } -func New(w io.Writer, timeout time.Duration) *Reporter { +func New(w io.Writer) *Reporter { return &Reporter{ make(chan struct{}), bufio.NewWriter(w), make(chan *streamState, 256), pool.NewSlicePoolSize[*streamState](256), - timeout, } } @@ -50,13 +48,12 @@ func (r *Reporter) Close() error { return nil } -func (r *Reporter) Acquire(tag string) types.StreamState { +func (r *Reporter) Acquire(tag string, streamID uint32) types.StreamState { ss, ok := r.pool.Acquire() if !ok { ss = &streamState{ reportLine: make([]byte, 128), reporter: r, - timeout: r.timeout, } } ss.reset(tag) @@ -71,14 +68,15 @@ type streamState struct { reportLine []byte reporter *Reporter - timeout time.Duration grpcCodeHeader string http2CodeHeader string + requestError error ioErr error rstStreamCode *http2.ErrCode goAwayCode *http2.ErrCode + timeouted bool reqSize int startTime time.Time @@ -92,13 +90,18 @@ func (s *streamState) reset(tag string) { s.grpcCodeHeader = "" s.http2CodeHeader = "" + s.timeouted = false + s.requestError = nil s.ioErr = nil s.goAwayCode = nil s.rstStreamCode = nil s.reqSize = 0 } +func (s *streamState) FirstByteSent() {} +func (s *streamState) LastByteSent() {} + func (s *streamState) SetSize(size int) { s.reqSize = size } @@ -112,6 +115,10 @@ func (s *streamState) OnHeader(name, value string) { } } +func (s *streamState) RequestError(err error) { + s.requestError = err +} + func (s *streamState) IoError(err error) { s.ioErr = err } @@ -120,10 +127,14 @@ func (s *streamState) RSTStream(code http2.ErrCode) { s.rstStreamCode = &code } -func (s *streamState) GoAway(code http2.ErrCode) { +func (s *streamState) GoAway(code http2.ErrCode, _ []byte) { s.goAwayCode = &code } +func (s *streamState) Timeout() { + s.timeouted = true +} + const tabChar = '\t' func (s *streamState) result() []byte { @@ -169,13 +180,15 @@ func (s *streamState) result() []byte { } // keyProtoCode switch { + case s.requestError != nil: + s.reportLine = append(s.reportLine, "client_error"...) case s.rstStreamCode != nil: s.reportLine = append(s.reportLine, "rst_"...) s.reportLine = strconv.AppendInt(s.reportLine, int64(*s.rstStreamCode), 10) case s.goAwayCode != nil: s.reportLine = append(s.reportLine, "goaway_"...) s.reportLine = strconv.AppendInt(s.reportLine, int64(*s.goAwayCode), 10) - case s.endTime.Sub(s.startTime) > s.timeout: + case s.timeouted: s.reportLine = append(s.reportLine, "grpc_4"...) case s.http2CodeHeader == "": s.reportLine = append(s.reportLine, "http2_1"...) // protocol errro diff --git a/report/phout/phout_test.go b/report/phout/phout_test.go index 5924a7e..8119fb7 100644 --- a/report/phout/phout_test.go +++ b/report/phout/phout_test.go @@ -12,13 +12,15 @@ import ( "golang.org/x/net/http2" ) +const streamID = 1 + func TestPhout(t *testing.T) { t.Parallel() a := assert.New(t) const timeout = 11 * time.Second b := new(bytes.Buffer) - r := New(b, timeout) + r := New(b) errChan := make(chan error) go func() { errChan <- r.Run() @@ -30,7 +32,7 @@ func TestPhout(t *testing.T) { startTime := time.Now() now = func() time.Time { return startTime } - state := r.Acquire("tag1") + state := r.Acquire("tag1", streamID) state.SetSize(111) state.OnHeader(":status", "200") state.OnHeader("grpc-status", "0") @@ -50,7 +52,7 @@ func TestPhout(t *testing.T) { startTime := time.Now() now = func() time.Time { return startTime } - state := r.Acquire("tag2") + state := r.Acquire("tag2", streamID) state.SetSize(222) state.OnHeader(":status", "200") state.OnHeader("grpc-status", "0") @@ -71,7 +73,7 @@ func TestPhout(t *testing.T) { startTime := time.Now() now = func() time.Time { return startTime } - state := r.Acquire("tag2") + state := r.Acquire("tag2", streamID) state.SetSize(222) state.OnHeader(":status", "200") state.OnHeader("grpc-status", "0") @@ -92,7 +94,7 @@ func TestPhout(t *testing.T) { startTime := time.Now() now = func() time.Time { return startTime } - state := r.Acquire("") + state := r.Acquire("", streamID) state.RSTStream(http2.ErrCodeInternal) endTime := time.Now() @@ -110,8 +112,8 @@ func TestPhout(t *testing.T) { startTime := time.Now() now = func() time.Time { return startTime } - state := r.Acquire("") - state.GoAway(http2.ErrCodeInternal) + state := r.Acquire("", streamID) + state.GoAway(http2.ErrCodeInternal, nil) endTime := time.Now() now = func() time.Time { return endTime } @@ -128,10 +130,11 @@ func TestPhout(t *testing.T) { startTime := time.Now() now = func() time.Time { return startTime } - state := r.Acquire("") + state := r.Acquire("", streamID) endTime := startTime.Add(timeout + 1) now = func() time.Time { return endTime } + state.Timeout() state.End() expected += fmt.Sprintf( @@ -145,7 +148,7 @@ func TestPhout(t *testing.T) { startTime := time.Now() now = func() time.Time { return startTime } - state := r.Acquire("") + state := r.Acquire("", streamID) endTime := time.Now() now = func() time.Time { return endTime } @@ -162,7 +165,7 @@ func TestPhout(t *testing.T) { startTime := time.Now() now = func() time.Time { return startTime } - state := r.Acquire("") + state := r.Acquire("", streamID) endTime := time.Now() now = func() time.Time { return endTime } @@ -181,7 +184,7 @@ func TestPhout(t *testing.T) { startTime := time.Now() now = func() time.Time { return startTime } - state := r.Acquire("") + state := r.Acquire("", streamID) endTime := time.Now() now = func() time.Time { return endTime } @@ -198,6 +201,5 @@ func TestPhout(t *testing.T) { r.Close() a.NoError(<-errChan) - a.Equal(expected, b.String()) } diff --git a/report/simple/simple.go b/report/simple/simple.go index 2608d12..b96957e 100644 --- a/report/simple/simple.go +++ b/report/simple/simple.go @@ -56,13 +56,13 @@ func (a *Reporter) Close() error { return nil } -func (a *Reporter) Acquire(tag string) types.StreamState { +func (a *Reporter) Acquire(_ string, _ uint32) types.StreamState { a.req.Add(1) ss, ok := a.pool.Acquire() if !ok { ss = &streamState{reporter: a} } - ss.reset(tag) + ss.reset() return ss } @@ -109,21 +109,18 @@ func (a *Reporter) report(now time.Time) { type streamState struct { reporter *Reporter size int - timeout time.Duration + timeouted bool grpcCodeStr string grpcMessageStr string code http2.ErrCode goAway bool ioErr error - - ctime time.Time - tag string + requestErr error } -func (s *streamState) reset(tag string) { - s.tag = tag - s.ctime = time.Now() +func (s *streamState) reset() { + s.timeouted = false s.size = 0 s.grpcCodeStr = "" @@ -132,6 +129,9 @@ func (s *streamState) reset(tag string) { s.ioErr = nil } +func (s *streamState) FirstByteSent() {} +func (s *streamState) LastByteSent() {} + func (s *streamState) SetSize(size int) { s.reporter.addSize(size) } @@ -145,6 +145,10 @@ func (s *streamState) OnHeader(name, value string) { } } +func (s *streamState) RequestError(err error) { + s.requestErr = err +} + func (s *streamState) IoError(err error) { s.ioErr = err } @@ -153,17 +157,22 @@ func (s *streamState) RSTStream(code http2.ErrCode) { s.code = code } -func (s *streamState) GoAway(http2.ErrCode) { +func (s *streamState) GoAway(http2.ErrCode, []byte) { s.goAway = true } +func (s *streamState) Timeout() { + s.timeouted = true +} + func (s *streamState) result() (ok bool) { - now := time.Now() switch { - case s.ctime.Add(s.timeout).Before(now): + case s.timeouted: return false case s.code != http2.ErrCodeNo: return false + case s.requestErr != nil: + return false case s.ioErr != nil: return false case s.goAway: diff --git a/report/supersimple/supersimple.go b/report/supersimple/supersimple.go index c26da75..d533f4d 100644 --- a/report/supersimple/supersimple.go +++ b/report/supersimple/supersimple.go @@ -3,7 +3,6 @@ package supersimple import ( "fmt" "sync/atomic" - "syscall" "time" "github.com/dustin/go-humanize" @@ -16,8 +15,6 @@ type Reporter struct { pool *pool.SlicePool[*streamState] closeCh chan struct{} - timeout time.Duration - start time.Time ok atomic.Uint32 nook atomic.Uint32 @@ -31,14 +28,13 @@ type Reporter struct { lastTime time.Time } -func New(timeout time.Duration) *Reporter { +func New() *Reporter { now := time.Now() return &Reporter{ pool: pool.NewSlicePoolSize[*streamState](100), closeCh: make(chan struct{}), start: now, lastTime: now, - timeout: timeout, } } @@ -61,11 +57,11 @@ func (a *Reporter) Close() error { return nil } -func (a *Reporter) Acquire(tag string) types.StreamState { +func (a *Reporter) Acquire(tag string, streamID uint32) types.StreamState { a.req.Add(1) ss, ok := a.pool.Acquire() if !ok { - ss = &streamState{reporter: a, ctime: new(syscall.Timeval)} + ss = &streamState{reporter: a} } ss.reset(tag) return ss @@ -73,9 +69,9 @@ func (a *Reporter) Acquire(tag string) types.StreamState { func (a *Reporter) accept(s *streamState) { if s.result() { - a.nook.Add(1) - } else { a.ok.Add(1) + } else { + a.nook.Add(1) } a.pool.Release(s) @@ -116,22 +112,15 @@ func (a *Reporter) report(now time.Time) { type streamState struct { reporter *Reporter noOk bool - ctime *syscall.Timeval -} - -func init() { - ctime := new(syscall.Timeval) - err := syscall.Gettimeofday(ctime) - if err != nil { - panic("Gettimeofday syscall unavailable?: " + err.Error()) - } } func (s *streamState) reset(_ string) { - syscall.Gettimeofday(s.ctime) //nolint:errcheck // syscall checked above s.noOk = false } +func (s *streamState) FirstByteSent() {} +func (s *streamState) LastByteSent() {} + func (s *streamState) SetSize(size int) { s.reporter.addSize(size) } @@ -142,18 +131,14 @@ func (s *streamState) OnHeader(name, value string) { } } -func (s *streamState) IoError(error) { s.noOk = true } -func (s *streamState) RSTStream(http2.ErrCode) { s.noOk = true } -func (s *streamState) GoAway(http2.ErrCode) { s.noOk = true } +func (s *streamState) RequestError(error) { s.noOk = true } +func (s *streamState) IoError(error) { s.noOk = true } +func (s *streamState) RSTStream(http2.ErrCode) { s.noOk = true } +func (s *streamState) GoAway(http2.ErrCode, []byte) { s.noOk = true } +func (s *streamState) Timeout() { s.noOk = true } func (s *streamState) result() (ok bool) { - if s.noOk { - return false - } - if time.Duration(s.ctime.Nano()) > s.reporter.timeout { - return false - } - return true + return !s.noOk } func (s *streamState) End() { diff --git a/utils/hpack_wrapper/wrapper.go b/utils/hpack_wrapper/wrapper.go index f4e2156..f6ced7a 100644 --- a/utils/hpack_wrapper/wrapper.go +++ b/utils/hpack_wrapper/wrapper.go @@ -17,12 +17,14 @@ func NewWrapper(opts ...Opt) *Wrapper { for _, o := range opts { o.apply(wrapper) } + return wrapper } func (ww *Wrapper) SetWriter(w io.Writer) { ww.Writer = w } -func (ww *Wrapper) WriteField(k, v string) error { - return ww.enc.WriteField(hpack.HeaderField{ +func (ww *Wrapper) WriteField(k, v string) { + //nolint:errcheck // всегда пишем в буфер, это безопасно + ww.enc.WriteField(hpack.HeaderField{ Name: k, Value: v, }) diff --git a/utils/lru/lru.go b/utils/lru/lru.go new file mode 100644 index 0000000..b11ee0a --- /dev/null +++ b/utils/lru/lru.go @@ -0,0 +1,47 @@ +package lru + +import ( + "container/list" + "sync" +) + +type LRU struct { + maxSize int + items map[string]*list.Element + list *list.List + mu sync.Mutex +} + +func New(maxSize int) *LRU { + if maxSize < 1 { + panic("assertion error: maxSize < 1") + } + return &LRU{ + maxSize: maxSize, + items: make(map[string]*list.Element, maxSize), + list: list.New(), + } +} + +// Get fetch item from lru and increase eviction order +func (l *LRU) GetOrAdd(keyB []byte) string { + l.mu.Lock() + defer l.mu.Unlock() + + element, ok := l.items[string(keyB)] + if ok { + l.list.MoveToFront(element) + return element.Value.(string) + } + + if len(l.items) >= l.maxSize { + element := l.list.Back() + l.list.Remove(element) + delete(l.items, element.Value.(string)) + } + + keyS := string(keyB) + element = l.list.PushFront(keyS) + l.items[keyS] = element + return keyS +} diff --git a/utils/lru/lru_test.go b/utils/lru/lru_test.go new file mode 100644 index 0000000..50abf85 --- /dev/null +++ b/utils/lru/lru_test.go @@ -0,0 +1,31 @@ +package lru + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestLRU(t *testing.T) { + a := assert.New(t) + l := New(3) + l.GetOrAdd([]byte("one")) + l.GetOrAdd([]byte("two")) + l.GetOrAdd([]byte("three")) + l.GetOrAdd([]byte("one")) + a.Len(l.items, 3) + a.Equal(l.list.Len(), 3) + l.GetOrAdd([]byte("four")) + a.Len(l.items, 3) + a.Equal(l.list.Len(), 3) + + lruOrder := []string{"four", "one", "three"} + a.Len(l.items, len(lruOrder)) + el := l.list.Front() + for _, v := range lruOrder { + _, ok := l.items[v] + a.True(ok) + a.Equal(el.Value, v) + el = el.Next() + } +}