Skip to content

Commit

Permalink
rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
kl943 committed Dec 19, 2024
1 parent 44c5adf commit 8767f7b
Show file tree
Hide file tree
Showing 17 changed files with 1,277 additions and 151 deletions.
645 changes: 645 additions & 0 deletions internal/interceptor/interceptortest/outbound.go

Large diffs are not rendered by default.

196 changes: 133 additions & 63 deletions internal/interceptor/outbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,60 @@ package interceptor
import (
"context"

"go.uber.org/yarpc/api/middleware"
"go.uber.org/yarpc/api/transport"
)

type (
// UnaryOutboundChain defines the interface for a chain of unary outbound requests.
// It provides methods to invoke the next outbound in the chain with the given context
// and request, and to retrieve the outbound component of the chain.
//
// Next: Executes the next outbound request in the chain with the provided context and request,
// returning the response and any error encountered during the process.
// Outbound: Retrieves the outbound component of the chain, allowing for further inspection or manipulation.
UnaryOutboundChain interface {
Next(ctx context.Context, request *transport.Request) (*transport.Response, error)
Outbound() Outbound
}

// OnewayOutboundChain defines the interface for a chain of one-way outbound requests.
// It provides methods to invoke the next outbound in the chain with the given context
// and request, and to retrieve the outbound component of the chain.
//
// Next: Executes the next one-way outbound request in the chain with the provided context and request,
// returning an acknowledgment and any error encountered during the process.
// Outbound: Retrieves the outbound component of the chain, allowing for further inspection or manipulation.
OnewayOutboundChain interface {
Next(ctx context.Context, request *transport.Request) (transport.Ack, error)
Outbound() Outbound
}

// StreamOutboundChain defines the interface for a chain of streaming outbound requests.
// It provides methods to invoke the next outbound in the chain with the given context
// and request, and to retrieve the outbound component of the chain.
//
// Next: Executes the next streaming outbound request in the chain with the provided context and request,
// returning a client stream and any error encountered during the process.
// Outbound: Retrieves the outbound component of the chain, allowing for further inspection or manipulation.
StreamOutboundChain interface {
Next(ctx context.Context, request *transport.StreamRequest) (*transport.ClientStream, error)
Outbound() Outbound
}

// UnaryOutbound defines transport interceptor for `UnaryOutbound`s.
//
// UnaryOutbound interceptor MAY do zero or more of the following: change the
// context, change the request, change the returned response, handle the
// returned error, call the given outbound zero or more times.
//
// UnaryOutbound interceptor MUST always return a non-nil Response or error,
// and they MUST be thread-safe
// and they MUST be thread-safe.
//
// UnaryOutbound interceptor is re-used across requests and MAY be called
// multiple times on the same request.
UnaryOutbound = middleware.UnaryOutbound
UnaryOutbound interface {
Call(ctx context.Context, request *transport.Request, out UnaryOutboundChain) (*transport.Response, error)
}

// OnewayOutbound defines transport interceptor for `OnewayOutbound`s.
//
Expand All @@ -52,7 +89,9 @@ type (
//
// OnewayOutbound interceptor is re-used across requests and MAY be called
// multiple times on the same request.
OnewayOutbound = middleware.OnewayOutbound
OnewayOutbound interface {
CallOneway(ctx context.Context, request *transport.Request, out OnewayOutboundChain) (transport.Ack, error)
}

// StreamOutbound defines transport interceptor for `StreamOutbound`s.
//
Expand All @@ -61,99 +100,130 @@ type (
// returned error, call the given outbound zero or more times.
//
// StreamOutbound interceptor MUST always return a non-nil Stream or error,
// and they MUST be thread-safe
// and they MUST be thread-safe.
//
// StreamOutbound interceptors is re-used across requests and MAY be called
// multiple times on the same request.
StreamOutbound = middleware.StreamOutbound
StreamOutbound interface {
CallStream(ctx context.Context, req *transport.StreamRequest, out StreamOutboundChain) (*transport.ClientStream, error)
}
)

var (
_ transport.UnaryOutbound = UnaryOutboundFunc(nil)
_ transport.OnewayOutbound = OnewayOutboundFunc(nil)
_ transport.StreamOutbound = StreamOutboundFunc(nil)
)

// UnaryOutboundFunc defines a function type that implements the UnaryOutbound interface.
type UnaryOutboundFunc func(context.Context, *transport.Request) (*transport.Response, error)
// Outbound is the common interface for all outbounds.
//
// Outbounds should also implement the Namer interface so that YARPC can
// properly update the Request.Transport field.
type Outbound interface {
transport.Lifecycle

// Call calls the UnaryOutbound function.
func (f UnaryOutboundFunc) Call(ctx context.Context, req *transport.Request) (*transport.Response, error) {
return f(ctx, req)
// Transports returns the transports that used by this outbound, so they
// can be collected for lifecycle management, typically by a Dispatcher.
//
// Though most outbounds only use a single transport, composite outbounds
// may use multiple transport protocols, particularly for shadowing traffic
// across multiple transport protocols during a transport protocol
// migration.
Transports() []transport.Transport
}

// Start starts the UnaryOutbound function. This is a no-op in this implementation.
func (f UnaryOutboundFunc) Start() error {
return nil
// DirectUnaryOutbound is a transport that knows how to send unary requests for procedure
// calls.
type DirectUnaryOutbound interface {
Outbound

// DirectCall is called without interceptor.
DirectCall(ctx context.Context, request *transport.Request) (*transport.Response, error)
}

// Stop stops the UnaryOutbound function. This is a no-op in this implementation.
func (f UnaryOutboundFunc) Stop() error {
return nil
// DirectOnewayOutbound defines a transport outbound for oneway requests
// that does not involve any interceptors.
type DirectOnewayOutbound interface {
Outbound

// DirectCallOneway is called without interceptor.
DirectCallOneway(ctx context.Context, request *transport.Request) (transport.Ack, error)
}

// IsRunning returns whether the UnaryOutbound function is running. This is a no-op in this implementation.
func (f UnaryOutboundFunc) IsRunning() bool {
return false
// DirectStreamOutbound defines a transport outbound for streaming requests
// that does not involve any interceptors.
type DirectStreamOutbound interface {
Outbound

// DirectCallStream is called without interceptor.
DirectCallStream(ctx context.Context, req *transport.StreamRequest) (*transport.ClientStream, error)
}

// Transports returns the transports used by the UnaryOutbound function. This is a no-op in this implementation.
func (f UnaryOutboundFunc) Transports() []transport.Transport {
return nil
type nopUnaryOutbound struct{}

func (nopUnaryOutbound) Call(ctx context.Context, request *transport.Request, out UnaryOutboundChain) (*transport.Response, error) {
return out.Next(ctx, request)
}

// OnewayOutboundFunc defines a function type that implements the OnewayOutbound interface.
type OnewayOutboundFunc func(context.Context, *transport.Request) (transport.Ack, error)
// NopUnaryOutbound is a unary outbound middleware that does not do
// anything special. It simply calls the underlying UnaryOutbound.
var NopUnaryOutbound UnaryOutbound = nopUnaryOutbound{}

type nopOnewayOutbound struct{}

// CallOneway calls the OnewayOutbound function.
func (f OnewayOutboundFunc) CallOneway(ctx context.Context, req *transport.Request) (transport.Ack, error) {
return f(ctx, req)
func (nopOnewayOutbound) CallOneway(ctx context.Context, request *transport.Request, out OnewayOutboundChain) (transport.Ack, error) {
return out.Next(ctx, request)
}

// Start starts the OnewayOutbound function. This is a no-op in this implementation.
func (f OnewayOutboundFunc) Start() error {
return nil
// NopOnewayOutbound is an oneway outbound middleware that does not do
// anything special. It simply calls the underlying OnewayOutbound.
var NopOnewayOutbound OnewayOutbound = nopOnewayOutbound{}

type nopStreamOutbound struct{}

func (nopStreamOutbound) CallStream(ctx context.Context, requestMeta *transport.StreamRequest, out StreamOutboundChain) (*transport.ClientStream, error) {
return out.Next(ctx, requestMeta)
}

// Stop stops the OnewayOutbound function. This is a no-op in this implementation.
func (f OnewayOutboundFunc) Stop() error {
return nil
// NopStreamOutbound is a stream outbound middleware that does not do
// anything special. It simply calls the underlying StreamOutbound.
var NopStreamOutbound StreamOutbound = nopStreamOutbound{}

// ApplyUnaryOutbound applies the given UnaryOutbound interceptor to the given DirectUnaryOutbound transport.
func ApplyUnaryOutbound(uo UnaryOutboundChain, i UnaryOutbound) transport.UnaryOutbound {
return unaryOutboundWithInterceptor{uo: uo, i: i}
}

// IsRunning returns whether the OnewayOutbound function is running. This is a no-op in this implementation.
func (f OnewayOutboundFunc) IsRunning() bool {
return false
// ApplyOnewayOutbound applies the given OnewayOutbound interceptor to the given DirectOnewayOutbound transport.
func ApplyOnewayOutbound(oo OnewayOutboundChain, i OnewayOutbound) transport.OnewayOutbound {
return onewayOutboundWithInterceptor{oo: oo, i: i}
}

// Transports returns the transports used by the OnewayOutbound function. This is a no-op in this implementation.
func (f OnewayOutboundFunc) Transports() []transport.Transport {
return nil
// ApplyStreamOutbound applies the given StreamOutbound interceptor to the given DirectStreamOutbound transport.
func ApplyStreamOutbound(so StreamOutboundChain, i StreamOutbound) transport.StreamOutbound {
return streamOutboundWithInterceptor{so: so, i: i}
}

// StreamOutboundFunc defines a function type that implements the StreamOutbound interface.
type StreamOutboundFunc func(context.Context, *transport.StreamRequest) (*transport.ClientStream, error)
type unaryOutboundWithInterceptor struct {
transport.Outbound
uo UnaryOutboundChain
i UnaryOutbound
}

// CallStream calls the StreamOutbound function.
func (f StreamOutboundFunc) CallStream(ctx context.Context, req *transport.StreamRequest) (*transport.ClientStream, error) {
return f(ctx, req)
func (uoc unaryOutboundWithInterceptor) Call(ctx context.Context, request *transport.Request) (*transport.Response, error) {
return uoc.i.Call(ctx, request, uoc.uo)
}

// Start starts the StreamOutbound function. This is a no-op in this implementation.
func (f StreamOutboundFunc) Start() error {
return nil
type onewayOutboundWithInterceptor struct {
transport.Outbound
oo OnewayOutboundChain
i OnewayOutbound
}

// Stop stops the StreamOutbound function. This is a no-op in this implementation.
func (f StreamOutboundFunc) Stop() error {
return nil
func (ooc onewayOutboundWithInterceptor) CallOneway(ctx context.Context, request *transport.Request) (transport.Ack, error) {
return ooc.i.CallOneway(ctx, request, ooc.oo)
}

// IsRunning returns whether the StreamOutbound function is running. This is a no-op in this implementation.
func (f StreamOutboundFunc) IsRunning() bool {
return false
type streamOutboundWithInterceptor struct {
transport.Outbound
so StreamOutboundChain
i StreamOutbound
}

// Transports returns the transports used by the StreamOutbound function. This is a no-op in this implementation.
func (f StreamOutboundFunc) Transports() []transport.Transport {
return nil
func (soc streamOutboundWithInterceptor) CallStream(ctx context.Context, requestMeta *transport.StreamRequest) (*transport.ClientStream, error) {
return soc.i.CallStream(ctx, requestMeta, soc.so)
}
130 changes: 130 additions & 0 deletions internal/interceptor/outboundinterceptor/chain.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// Copyright (c) 2024 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package outboundinterceptor

import (
"context"

"go.uber.org/yarpc/api/transport"
"go.uber.org/yarpc/internal/interceptor"
)

// NewUnaryChain combines a series of `UnaryInbound`s into a single `InboundMiddleware`.
func NewUnaryChain(out interceptor.DirectUnaryOutbound, list []interceptor.UnaryOutbound) interceptor.UnaryOutboundChain {
return unaryChainExec{
Chain: list,
Final: out,
}
}

func (x unaryChainExec) Next(ctx context.Context, request *transport.Request) (*transport.Response, error) {
if len(x.Chain) == 0 {
return x.Final.DirectCall(ctx, request)
}
next := x.Chain[0]
x.Chain = x.Chain[1:]
return next.Call(ctx, request, x)
}

func (x unaryChainExec) Outbound() interceptor.Outbound {
return x.Final
}

// unaryChainExec adapts a series of `UnaryOutbound`s into a `UnaryOutbound`. It
// is scoped to a single call of a UnaryOutbound and is not thread-safe.
type unaryChainExec struct {
Chain []interceptor.UnaryOutbound
Final interceptor.DirectUnaryOutbound
}

// NewOnewayChain combines a series of `OnewayInbound`s into a single `InboundMiddleware`.
func NewOnewayChain(out interceptor.DirectOnewayOutbound, list []interceptor.OnewayOutbound) interceptor.OnewayOutboundChain {
return onewayChainExec{
Chain: list,
Final: out,
}
}

func (x onewayChainExec) Next(ctx context.Context, request *transport.Request) (transport.Ack, error) {
if len(x.Chain) == 0 {
return x.Final.DirectCallOneway(ctx, request)
}
next := x.Chain[0]
x.Chain = x.Chain[1:]
return next.CallOneway(ctx, request, x)
}

func (x onewayChainExec) Outbound() interceptor.Outbound {
return x.Final
}

// onewayChainExec adapts a series of `OnewayOutbound`s into a `OnewayOutbound`. It
// is scoped to a single call of a OnewayOutbound and is not thread-safe.
type onewayChainExec struct {
Chain []interceptor.OnewayOutbound
Final interceptor.DirectOnewayOutbound
}

func (x onewayChainExec) DirectCallOneway(ctx context.Context, request *transport.Request) (transport.Ack, error) {
if len(x.Chain) == 0 {
return x.Final.DirectCallOneway(ctx, request)
}
next := x.Chain[0]
x.Chain = x.Chain[1:]
return next.CallOneway(ctx, request, x)
}

// NewStreamChain combines a series of `OnewayInbound`s into a single `InboundMiddleware`.
func NewStreamChain(out interceptor.DirectStreamOutbound, list []interceptor.StreamOutbound) interceptor.StreamOutboundChain {
return streamChainExec{
Chain: list,
Final: out,
}
}

func (x streamChainExec) Next(ctx context.Context, request *transport.StreamRequest) (*transport.ClientStream, error) {
if len(x.Chain) == 0 {
return x.Final.DirectCallStream(ctx, request)
}
next := x.Chain[0]
x.Chain = x.Chain[1:]
return next.CallStream(ctx, request, x)
}

func (x streamChainExec) Outbound() interceptor.Outbound {
return x.Final
}

// streamChainExec adapts a series of `StreamOutbound`s into a `StreamOutbound`. It
// is scoped to a single call of a StreamOutbound and is not thread-safe.
type streamChainExec struct {
Chain []interceptor.StreamOutbound
Final interceptor.DirectStreamOutbound
}

func (x streamChainExec) DirectCallStream(ctx context.Context, request *transport.StreamRequest) (*transport.ClientStream, error) {
if len(x.Chain) == 0 {
return x.Final.DirectCallStream(ctx, request)
}
next := x.Chain[0]
x.Chain = x.Chain[1:]
return next.CallStream(ctx, request, x)
}
Loading

0 comments on commit 8767f7b

Please sign in to comment.