Skip to content

Commit

Permalink
Correct tcp echo server, improve udp server memory usage and add echo id
Browse files Browse the repository at this point in the history
  • Loading branch information
lzeng committed Jan 9, 2025
1 parent a82844b commit 49529f3
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 91 deletions.
36 changes: 26 additions & 10 deletions server/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

const (
CONNECTION_IDLE_TIMEOUTMS = 30000
MESSAGE_DATA_LENGTH = 32
)

var (
Expand All @@ -25,12 +26,20 @@ const (
UDP_CONN
)

type MessageData struct {
MagicHeader [4]byte
Cmd [4]byte
Args [4]byte
Reserved [4]byte
Checksum [16]byte
}

type ConnData struct {
// atomic data, keep 64bit(8-bytes) alignment for 32-bit system compatibility
InitTime int64 // local connection setup time. immutable after created
InitTime int64 // local connection setup time. fixed after created
LastRemoteSendTime int64
LastLocalSendTime int64
LastLocalRecvTime int64
echoId int64

sync.Mutex
sync.WaitGroup
Expand All @@ -52,20 +61,27 @@ func (conn *ConnData) Close() {
return
}
conn.nativeConn.Close()
close(conn.echoQueue)
conn.closed.Store(true)
}

type TupleData struct {
SrcIp string
DstIp string
SrcPort int
DstPort int
Timestamp time.Time
msg string
EchoId int64
SrcIp string
DstIp string
SrcPort int
DstPort int
Timestamp time.Time
echoData []byte
serializeOnce sync.Once
serializedStr string
}

func (td *TupleData) String() string {
return fmt.Sprintf("%s %s:%d -> %s:%d \"%s\"", td.Timestamp.Format("2006-01-02 15:04:05"), td.SrcIp, td.SrcPort, td.DstIp, td.DstPort, td.msg)
func (td *TupleData) Serialize() []byte {
td.serializeOnce.Do(func() {
td.serializedStr = fmt.Sprintf("[%d %s] %s:%d -> %s:%d \"%s\"", td.EchoId, td.Timestamp.Format("2006-01-02 15:04:05"), td.SrcIp, td.SrcPort, td.DstIp, td.DstPort, string(td.echoData))
})
return []byte(td.serializedStr)
}

// need external connection
Expand Down
127 changes: 60 additions & 67 deletions server/tcpserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,102 +108,94 @@ func (s *TcpServer) recvPacketRoutine() {
continue
}

// incoming connection
remoteAddr := tcpConn.RemoteAddr().(*net.TCPAddr)
addrStr := remoteAddr.String()
// allocate a new packet buffer for every read
pkt := make([]byte, 4096)
conn := &ConnData{
InitTime: time.Now().UnixMicro(),
LocalAddr: s.listenAddr,
RemoteAddr: remoteAddr,
nativeConn: tcpConn,
connType: TCP_CONN,
idleTimeoutMs: CONNECTION_IDLE_TIMEOUTMS,
echoQueue: make(chan *TupleData, 4096),
}
// record new connection
s.remoteConnectionMapMutex.Lock()
s.remoteConnectionMap[addrStr] = conn
s.remoteConnectionMapMutex.Unlock()

// launch connection routine
s.wg.Add(1)
go s.connectionRoutine(conn)
}
}

func (s *TcpServer) connectionRoutine(conn *ConnData) {
defer s.wg.Done()
// stop connection and clean up
defer func() {
// remove conn from remoteConnectionMap
s.remoteConnectionMapMutex.Lock()
delete(s.remoteConnectionMap, conn.RemoteAddr.String())
s.remoteConnectionMapMutex.Unlock()
conn.Close()
}()

s.wg.Add(1)
go s.echoRoutine(conn)

// allocate a common packet buffer for every read
pkt := make([]byte, MESSAGE_DATA_LENGTH)

for {
select {
case <-s.signals.stop:
return
default:
}

// tcp recv, blocking until packet arrives or conn.Close()
tcpConn.SetReadDeadline(time.Now().Add(5 * time.Second))
n, err := tcpConn.Read(pkt[:])
conn.nativeConn.SetReadDeadline(time.Now().Add(5 * time.Second))
n, err := conn.nativeConn.Read(pkt[:])
if err != nil {
pkt = nil
//log.Error("Read from UDP error: %v\n", err)
//log.Error("Read from TCP error: %v\n", err)
if n == 0 {
// listenConn closed
// tcpConn closed
return
}
continue
}

recvTime := time.Now()

// add total recv bytes
recvTime := time.Now()
atomic.AddUint64(&s.stats.totalRecvBytes, uint64(n))

atomic.StoreInt64(&conn.LastLocalRecvTime, recvTime.UnixMicro())
//log.Trace("receive udp packet (%s -> %s): %+v", addrStr, s.listenAddr.String(), pkt.Packet)
//log.Info("Receive [%s] packet (%s -> %s), %d bytes", msgType, addrStr, s.listenAddr.String(), n)

s.remoteConnectionMapMutex.Lock()
conn, found := s.remoteConnectionMap[addrStr]
s.remoteConnectionMapMutex.Unlock()

tuple := new(TupleData)
tuple.msg = string(pkt)
tuple.SrcIp = remoteAddr.IP.String()
tuple.SrcPort = remoteAddr.Port
tuple.echoData = make([]byte, n)
copy(tuple.echoData, pkt[:n])
tuple.SrcIp = conn.RemoteAddr.(*net.TCPAddr).IP.String()
tuple.SrcPort = conn.RemoteAddr.(*net.TCPAddr).Port
tuple.DstIp = ListenIp
tuple.DstPort = ListenPort
tuple.Timestamp = recvTime

if found {
// existing connection
atomic.StoreInt64(&conn.LastLocalRecvTime, recvTime.UnixMicro())
conn.echoQueue <- tuple

} else {
// create new connection
conn = &ConnData{
InitTime: recvTime.UnixMicro(),
LocalAddr: s.listenAddr,
RemoteAddr: remoteAddr,
nativeConn: tcpConn,
connType: TCP_CONN,
idleTimeoutMs: CONNECTION_IDLE_TIMEOUTMS,
}
conn.echoQueue = make(chan *TupleData)
// setup new routine for connection
s.remoteConnectionMapMutex.Lock()
s.remoteConnectionMap[addrStr] = conn
s.remoteConnectionMapMutex.Unlock()

conn.echoQueue <- tuple

//log.Info("Accept new UDP connection from %s to %s", addrStr, s.listenAddr.String())

// launch connection routine
s.wg.Add(1)
go s.connectionRoutine(conn)
}
tuple.EchoId = atomic.AddInt64(&conn.echoId, 1)
conn.echoQueue <- tuple
//log.Info("Accept new UDP connection from %s to %s", addrStr, s.listenAddr.String())
}
}

func (s *TcpServer) connectionRoutine(conn *ConnData) {
addrStr := conn.RemoteAddr.String()
func (s *TcpServer) echoRoutine(conn *ConnData) {
defer s.wg.Done()
//defer log.Debug("Connection routine: %s stopped", addrStr)

//log.Debug("Connection routine: %s started", addrStr)

// stop receiving packets and clean up
defer func() {
// remove conn from remoteConnectionMap
s.remoteConnectionMapMutex.Lock()
delete(s.remoteConnectionMap, addrStr)
s.remoteConnectionMapMutex.Unlock()
conn.Close()
}()

for {
select {
case <-s.signals.stop:
return

case <-time.After(time.Duration(conn.idleTimeoutMs) * time.Millisecond):
// timeout, quit routine
//log.Debug("Connection routine idle timeout")
return

case tuple, ok := <-conn.echoQueue:
if !ok {
return
Expand All @@ -213,7 +205,8 @@ func (s *TcpServer) connectionRoutine(conn *ConnData) {
}
//log.Debug("Received udp packet len [%d] from addr: %s\n", len(pkt.Packet), addrStr)
conn.nativeConn.SetWriteDeadline(time.Now().Add(500 * time.Millisecond))
conn.nativeConn.Write([]byte(tuple.String()))
conn.nativeConn.Write(tuple.Serialize())
atomic.AddUint64(&s.stats.totalSendBytes, uint64(len(tuple.serializedStr)))
}
}
}
28 changes: 14 additions & 14 deletions server/udpserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,21 +87,19 @@ func (s *UdpServer) Stop() {
func (s *UdpServer) recvPacketRoutine() {
defer s.wg.Done()

// allocate a common packet buffer for every read
pkt := make([]byte, 65536)

for {
select {
case <-s.signals.stop:
return

default:
}

// allocate a new packet buffer for every read
pkt := make([]byte, 4096)

// udp recv, blocking until packet arrives or conn.Close()
n, remoteAddr, err := s.listenConn.ReadFromUDP(pkt[:])
n, remoteAddr, err := s.listenConn.ReadFromUDP(pkt)
if err != nil {
pkt = nil
//log.Error("Read from UDP error: %v\n", err)
if n == 0 {
// listenConn closed
Expand All @@ -112,9 +110,8 @@ func (s *UdpServer) recvPacketRoutine() {
addrStr := remoteAddr.String()

// add total recv bytes
atomic.AddUint64(&s.stats.totalRecvBytes, uint64(n))

recvTime := time.Now()
atomic.AddUint64(&s.stats.totalRecvBytes, uint64(n))
//log.Trace("receive udp packet (%s -> %s): %+v", addrStr, s.listenAddr.String(), pkt.Packet)
//log.Info("Receive [%s] packet (%s -> %s), %d bytes", msgType, addrStr, s.listenAddr.String(), n)

Expand All @@ -123,7 +120,8 @@ func (s *UdpServer) recvPacketRoutine() {
s.remoteConnectionMapMutex.Unlock()

tuple := new(TupleData)
tuple.msg = string(pkt)
tuple.echoData = make([]byte, n)
copy(tuple.echoData, pkt[:n])
tuple.SrcIp = remoteAddr.IP.String()
tuple.SrcPort = remoteAddr.Port
tuple.DstIp = ListenIp
Expand All @@ -133,8 +131,8 @@ func (s *UdpServer) recvPacketRoutine() {
if found {
// existing connection
atomic.StoreInt64(&conn.LastLocalRecvTime, recvTime.UnixMicro())
tuple.EchoId = atomic.AddInt64(&conn.echoId, 1)
conn.echoQueue <- tuple

} else {
// create new connection
conn = &ConnData{
Expand All @@ -144,15 +142,16 @@ func (s *UdpServer) recvPacketRoutine() {
nativeConn: s.listenConn,
connType: UDP_CONN,
idleTimeoutMs: CONNECTION_IDLE_TIMEOUTMS,
echoQueue: make(chan *TupleData, 4096),
}
conn.echoQueue = make(chan *TupleData)
// setup new routine for connection

// record new connection
s.remoteConnectionMapMutex.Lock()
s.remoteConnectionMap[addrStr] = conn
s.remoteConnectionMapMutex.Unlock()

tuple.EchoId = atomic.AddInt64(&conn.echoId, 1)
conn.echoQueue <- tuple

//log.Info("Accept new UDP connection from %s to %s", addrStr, s.listenAddr.String())

// launch connection routine
Expand Down Expand Up @@ -197,7 +196,8 @@ func (s *UdpServer) connectionRoutine(conn *ConnData) {
}
//log.Debug("Received udp packet len [%d] from addr: %s\n", len(pkt.Packet), addrStr)
conn.nativeConn.SetWriteDeadline(time.Now().Add(500 * time.Millisecond))
conn.nativeConn.Write([]byte(tuple.String()))
conn.nativeConn.Write(tuple.Serialize())
atomic.AddUint64(&s.stats.totalSendBytes, uint64(len(tuple.serializedStr)))
}
}
}

0 comments on commit 49529f3

Please sign in to comment.