Skip to content
This repository has been archived by the owner on Nov 5, 2023. It is now read-only.

Commit

Permalink
Add docs to TCPClient
Browse files Browse the repository at this point in the history
  • Loading branch information
jmaralo committed Oct 27, 2023
1 parent 5fa756d commit e6fcdfa
Showing 1 changed file with 40 additions and 13 deletions.
53 changes: 40 additions & 13 deletions pkg/transport/network/tcp/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,33 @@ import (
)

var (
// CLIENT_INITIAL_BACKOFF is the starting value for the client backoff
CLIENT_INITIAL_BACKOFF = time.Millisecond * 100
CLIENT_MAX_BACKOFF = time.Second * 5
// CLIENT_MAX_BACKOFF is the max value for the client backoff
CLIENT_MAX_BACKOFF = time.Second * 5
)

// backoff is an abstraction over an exponential backoff algorithmn
type backoff struct {
base time.Duration
current time.Duration
rate float32
max time.Duration
}

// next returns a channel where a message will be sent once the next period
// of time elapses.
func (b *backoff) next() <-chan time.Time {
timer := time.NewTimer(b.current)
return timer.C
}

// reset resets the current value of the backoff to the default one
func (b *backoff) reset() {
b.current = b.base
}

// increase sets the current value of the backoff to the next one
func (b *backoff) increase() {
b.current = time.Duration(float32(b.current) * b.rate)
if b.current > b.max {
Expand All @@ -40,6 +47,10 @@ func (b *backoff) increase() {
// Assertion to check the TCPClient is a TCPSource
var _ TCPSource = &TCPClient{}

// TCPClient is a TCPSource that creates connections with a server.
//
// TCPClient must be used the same way as a regular TCPSource, extra methods provided
// are just to configure certain aspects about it.
type TCPClient struct {
target abstraction.TransportTarget
raddr net.Addr
Expand All @@ -49,6 +60,12 @@ type TCPClient struct {
retry backoff
}

// NewClient creates a new TCPClient for the specified socket.
//
// It returns a non nil err if the one of the socket addresses are invalid.
//
// Target is used internally by the TransportModule to identify the client, it should follow
// this form: "client/<target>"
func NewClient(target abstraction.TransportTarget, socket network.Socket) (*TCPClient, error) {
laddr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:%d", socket.SrcIP, socket.SrcPort))
if err != nil {
Expand Down Expand Up @@ -76,30 +93,40 @@ func NewClient(target abstraction.TransportTarget, socket network.Socket) (*TCPC
}, nil
}

func (client *TCPClient) SetTimeout(timeout *time.Duration) {
if timeout != nil {
client.dialer.Timeout = *timeout
} else {
client.dialer.Timeout = 0
}
// SetTimeout sets the timeout for the connection to happen, if the connection
// isn't established before this time, it will generate an error and attempt to reconnect
//
// A timeout of 0 means no timeout
func (client *TCPClient) SetTimeout(timeout time.Duration) {
client.dialer.Timeout = timeout
}

func (client *TCPClient) SetKeepalive(keepalive *time.Duration) {
if keepalive != nil {
client.dialer.KeepAlive = *keepalive
} else {
client.dialer.KeepAlive = -1
}
// SetKeepalive sets the keepalive period for the connecion once it is established.
//
// A keepalive of 0 is a default keepalive (15s) and a negative keepalive disables it.
func (client *TCPClient) SetKeepalive(keepalive time.Duration) {
client.dialer.KeepAlive = keepalive
}

// SetOnConnection registers the callback that will be used when a connection is made
func (client *TCPClient) SetOnConnection(callback connectionCallback) {
client.onConnection = callback
}

// SetOnError registers the callback that will be used when an error making the connection occurs.
func (client *TCPClient) SetOnError(callback errorCallback) {
client.onError = callback
}

// Run starts the connection of the client.
//
// If the connection fails, the client tries to reconnect using an exponential backoff algorithmn.
//
// Errors and connections are reported using the OnError and OnConnection callbacks, and should be
// set before calling Run.
//
// Run will block, callers should run this as a goroutine. The execution can be halted by sending a
// message to the cancel message.
func (client *TCPClient) Run(cancel <-chan struct{}) {
client.retry.reset()
for {
Expand Down

0 comments on commit e6fcdfa

Please sign in to comment.