Skip to content
This repository has been archived by the owner on Nov 5, 2023. It is now read-only.

Implement TCP connections #16

Merged
merged 13 commits into from
Nov 3, 2023
4 changes: 4 additions & 0 deletions pkg/abstraction/transport.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package abstraction

// TransportTarget uniquely identifies a target resource for the Transport
// module operations
type TransportTarget string

// TransportEvent is an event of the Transport module
type TransportEvent string

Expand Down
3 changes: 0 additions & 3 deletions pkg/transport/network/tcp.go

This file was deleted.

147 changes: 147 additions & 0 deletions pkg/transport/network/tcp/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package tcp

import (
"fmt"
"net"
"time"

"github.com/HyperloopUPV-H8/h9-backend/pkg/abstraction"
"github.com/HyperloopUPV-H8/h9-backend/pkg/transport/network"
)

var (
// CLIENT_INITIAL_BACKOFF is the starting value for the client backoff
CLIENT_INITIAL_BACKOFF = time.Millisecond * 100
// CLIENT_MAX_BACKOFF is the max value for the client backoff
CLIENT_MAX_BACKOFF = time.Second * 5
)

// backoff is an abstraction over an exponential backoff algorithmn
type backoff struct {
base time.Duration
current time.Duration
rate float32
max time.Duration
}

// next returns a channel where a message will be sent once the next period
// of time elapses.
func (b *backoff) next() <-chan time.Time {
timer := time.NewTimer(b.current)
return timer.C
}

// reset resets the current value of the backoff to the default one
func (b *backoff) reset() {
b.current = b.base
}

// increase sets the current value of the backoff to the next one
func (b *backoff) increase() {
b.current = time.Duration(float32(b.current) * b.rate)
if b.current > b.max {
b.current = b.max
}
}

// Assertion to check the TCPClient is a TCPSource
var _ Source = &Client{}

// Client is a TCPSource that creates connections with a server.
//
// Client must be used the same way as a regular TCPSource, extra methods provided
// are just to configure certain aspects about it.
type Client struct {
target abstraction.TransportTarget
raddr net.Addr
dialer net.Dialer
onConnection connectionCallback
onError errorCallback
retry backoff
}

// NewClient creates a new TCPClient for the specified socket.
//
// It returns a non nil err if the one of the socket addresses are invalid.
//
// Target is used internally by the TransportModule to identify the client, it should follow
// this form: "client/<target>"
func NewClient(target abstraction.TransportTarget, socket network.Socket) (*Client, error) {
laddr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:%d", socket.SrcIP, socket.SrcPort))
if err != nil {
return nil, err
}
raddr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:%d", socket.DstIP, socket.DstPort))
if err != nil {
return nil, err
}

return &Client{
target: target,
raddr: raddr,
dialer: net.Dialer{
LocalAddr: laddr,
KeepAlive: -1, // Disable default keepalive (defaults to 15s)
Timeout: 0, // Default value -> No timeout
},
retry: backoff{
base: CLIENT_INITIAL_BACKOFF,
current: CLIENT_INITIAL_BACKOFF,
rate: 2,
max: CLIENT_MAX_BACKOFF,
},
}, nil
}

// SetTimeout sets the timeout for the connection to happen, if the connection
// isn't established before this time, it will generate an error and attempt to reconnect
//
// A timeout of 0 means no timeout
func (client *Client) SetTimeout(timeout time.Duration) {
client.dialer.Timeout = timeout
}

// SetKeepalive sets the keepalive period for the connecion once it is established.
//
// A keepalive of 0 is a default keepalive (15s) and a negative keepalive disables it.
func (client *Client) SetKeepalive(keepalive time.Duration) {
client.dialer.KeepAlive = keepalive
}

// SetOnConnection registers the callback that will be used when a connection is made
func (client *Client) SetOnConnection(callback connectionCallback) {
client.onConnection = callback
}

// SetOnError registers the callback that will be used when an error making the connection occurs.
func (client *Client) SetOnError(callback errorCallback) {
client.onError = callback
}

// Run starts the connection of the client.
//
// If the connection fails, the client tries to reconnect using an exponential backoff algorithmn.
//
// Errors and connections are reported using the OnError and OnConnection callbacks, and should be
// set before calling Run.
//
// Run will block, callers should run this as a goroutine. The execution can be halted by sending a
// message to the cancel message.
func (client *Client) Run(cancel <-chan struct{}) {
client.retry.reset()
for {
backoffTimer := client.retry.next()
select {
case <-backoffTimer:
conn, err := client.dialer.Dial("tcp", client.raddr.String())
if err == nil {
client.onConnection(client.target, conn.(*net.TCPConn))
return
}
client.onError(client.target, err)
client.retry.increase()
case <-cancel:
return
}
}
}
131 changes: 131 additions & 0 deletions pkg/transport/network/tcp/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package tcp

import (
"net"
"time"

"github.com/HyperloopUPV-H8/h9-backend/pkg/abstraction"
)

// Address is an alias for string encoded network addresses (e.g. "127.0.0.1:4040")
type Address = string

// serverTargets are the addresses are expected to connect and their respective target name
type serverTargets = map[Address]abstraction.TransportTarget

// Assertion to check the TCPServer is a TCPSource
var _ Source = &Server{}
andresdlt03 marked this conversation as resolved.
Show resolved Hide resolved

// Server is a TCPSource that gets connections from clients
//
// Server must be used as any other TCPSource.
type Server struct {
name abstraction.TransportTarget
targets serverTargets
listener *net.TCPListener
keepalive time.Duration
onConnection connectionCallback
onError errorCallback
}

// NewServer creates a new TCPServer with the given name, local address and target connections.
// It returns a non nil error if it fails to resolve the local address or when the listener creation fails.
func NewServer(name abstraction.TransportTarget, laddr string, targets serverTargets) (*Server, error) {
localAddr, err := net.ResolveTCPAddr("tcp", laddr)
if err != nil {
return nil, err
}

listener, err := net.ListenTCP("tcp", localAddr)
if err != nil {
return nil, err
}

return &Server{
name: name,
targets: targets,
listener: listener,
keepalive: 0,
}, nil
}

// SetKeepalive sets the keepalive that will be applied to the connections established
func (server *Server) SetKeepalive(keepalive time.Duration) {
server.keepalive = keepalive
}

// SetOnConnection registers the callback that will be used when a connection is made
func (server *Server) SetOnConnection(callback connectionCallback) {
server.onConnection = callback
}

// SetOnError registers the callback that will be used when an error making the connection occurs.
func (server *Server) SetOnError(callback errorCallback) {
server.onError = callback
}

// accept result is an auxiliary struct to pass the results of listener.Accept through a channel
type acceptResult struct {
conn *net.TCPConn
err error
}

// Run starts the TCPServer.
//
// When a conneciton is established, it first checks if it's valid, then configures it and lastly
// notifies of the connection with the OnConnection callback.
//
// Callers should make sure the OnConnection and the OnError callbacks are provided before calling Run.
//
// The server execution can be stopped by sending a message to the cancel channel.
func (server *Server) Run(cancel <-chan struct{}) {
defer server.listener.Close()
for {
acceptChan := make(chan acceptResult)

go func(acceptChan chan<- acceptResult) {
conn, err := server.listener.AcceptTCP()
acceptChan <- acceptResult{conn, err}
}(acceptChan)

var conn *net.TCPConn
var err error
select {
case result := <-acceptChan:
conn = result.conn
err = result.err
case <-cancel:
return
}

if err != nil {
server.onError(server.name, err)
continue
}

target, ok := server.targets[Address(conn.RemoteAddr().String())]
if !ok {
conn.Close()
continue
}

err = server.configureConn(conn)
if err != nil {
conn.Close()
server.onError(server.name, err)
continue
}

server.onConnection(target, conn)
}
}

// configureConn is a helper to apply all configuration to a connection.
func (server *Server) configureConn(conn *net.TCPConn) error {
err := conn.SetKeepAlive(server.keepalive > 0)
if err != nil {
return err
}

return conn.SetKeepAlivePeriod(server.keepalive)
}
87 changes: 87 additions & 0 deletions pkg/transport/network/tcp/tcp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package tcp

import (
"net"

"github.com/HyperloopUPV-H8/h9-backend/pkg/abstraction"
)

// These callbacks are used by the Source and the Conn structs
// connectionCallback is called when a source has obtained a new connection
type connectionCallback = func(abstraction.TransportTarget, *net.TCPConn)

// errorCallback is called when a source encounters an error creating a connection
type errorCallback = func(abstraction.TransportTarget, error)

// stateCallback is called when a Conn connection state changes
type stateCallback = func(abstraction.TransportTarget, bool)

// Source is an interface for different ways of creating TCP connections.
//
// When using sources the caller should first use SetOnConnection and SetOnError
// to ensure both that connections are created correctly and any errors are handled.
//
// Once the callbacks are set, the consumer should execute Run in a goroutine, passing
// an optional channel to stop the execution of the Source
type Source interface {
// Run starts sourcing connections. Implementations might block, so it is advised
// to call this in a goroutine to prevent blocking.
//
// The provided channel is used to stop the runner, when a message is sent the
// Source stops creating connections.
Run(<-chan struct{})

// SetOnConnection gives the Source the callback it should call when a new connection
// is established.
SetOnConnection(connectionCallback)

// SetOnError gives the Source the callback it should call when an error making a
// connection is encountered.
SetOnError(errorCallback)
}

// Conn is a wrapper around a net.TCPConn which gives it an associated target and
// other callbacks
type Conn struct {
target abstraction.TransportTarget
conn *net.TCPConn
onConnUpdate stateCallback
}

// NewConn creates a new Conn with the provided parameters
//
// target is the TransportTarget associated with this Conn
//
// conn is the actual network connection
//
// callback is the callback that will be called when the conn connects / disconnects
func NewConn(target abstraction.TransportTarget, conn *net.TCPConn, callback stateCallback) *Conn {
callback(target, true)
return &Conn{
target: target,
conn: conn,
onConnUpdate: callback,
}
}

// Target returns the connection TransportTarget
func (tcp *Conn) Target() abstraction.TransportTarget {
return tcp.target
}

// Read maps the underlying conn Read method
func (tcp *Conn) Read(p []byte) (n int, err error) {
return tcp.conn.Read(p)
}

// Write maps the underlying conn Write method
func (tcp *Conn) Write(p []byte) (n int, err error) {
return tcp.conn.Write(p)
}

// Close maps the unterlying conn Close method, calling the appropiate callbacks
// in the process.
func (tcp *Conn) Close() error {
tcp.onConnUpdate(tcp.target, false)
return tcp.conn.Close()
}
3 changes: 2 additions & 1 deletion pkg/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/HyperloopUPV-H8/h9-backend/pkg/abstraction"
"github.com/HyperloopUPV-H8/h9-backend/pkg/transport/network"
"github.com/HyperloopUPV-H8/h9-backend/pkg/transport/network/sniffer"
"github.com/HyperloopUPV-H8/h9-backend/pkg/transport/network/tcp"
"github.com/HyperloopUPV-H8/h9-backend/pkg/transport/presentation"
"github.com/HyperloopUPV-H8/h9-backend/pkg/transport/session"
)
Expand All @@ -21,7 +22,7 @@ type Transport struct {
conversations map[network.Socket]*session.SocketBuffer

sniffer *sniffer.Sniffer
boards map[abstraction.BoardId]*network.TCPConn
boards map[abstraction.BoardId]*tcp.Conn

tftp *network.TFTPConn

Expand Down