Skip to content

Commit

Permalink
Bug fix for unclosed listeners
Browse files Browse the repository at this point in the history
  • Loading branch information
yosebyte authored Dec 16, 2024
1 parent ece70ec commit 4438e24
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 140 deletions.
31 changes: 17 additions & 14 deletions internal/tunnel/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,20 +55,7 @@ func Server(parsedURL *url.URL, whiteList *sync.Map, tlsConfig *tls.Config) erro
errChan := make(chan error, 2)
done := make(chan struct{})
go func() {
for {
time.Sleep(internal.MaxReportInterval * time.Second)
sharedMU.Lock()
_, err = linkTLS.Write([]byte("[REPORT]\n"))
sharedMU.Unlock()
if err != nil {
log.Error("Tunnel connection health check failed: %v", err)
linkTLS.Close()
linkListen.Close()
close(done)
errChan <- err
break
}
}
errChan <- healthCheck(linkListen, linkTLS, &sharedMU, done)
}()
go func() {
errChan <- ServeTCP(parsedURL, whiteList, linkAddr, targetTCPAddr, linkListen, linkTLS, &sharedMU, done)
Expand All @@ -78,3 +65,19 @@ func Server(parsedURL *url.URL, whiteList *sync.Map, tlsConfig *tls.Config) erro
}()
return <-errChan
}

func healthCheck(linkListen net.Listener, linkTLS *tls.Conn, sharedMU *sync.Mutex, done chan struct{}) error {
for {
time.Sleep(internal.MaxReportInterval * time.Second)
sharedMU.Lock()
_, err := linkTLS.Write([]byte("[REPORT]\n"))
sharedMU.Unlock()
if err != nil {
log.Error("Tunnel connection health check failed: %v", err)
linkTLS.Close()
linkListen.Close()
close(done)
return err
}
}
}
120 changes: 59 additions & 61 deletions internal/tunnel/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,81 +14,79 @@ import (
)

func ServeTCP(parsedURL *url.URL, whiteList *sync.Map, linkAddr, targetAddr *net.TCPAddr, linkListen net.Listener, linkTLS *tls.Conn, mu *sync.Mutex, done <-chan struct{}) error {
targetListen, err := net.ListenTCP("tcp", targetAddr)
if err != nil {
log.Error("Unable to listen target address: [%v]", targetAddr)
return err
}
defer targetListen.Close()
sem := make(chan struct{}, internal.MaxSemaphoreLimit)
for {
select {
case <-done:
log.Warn("TCP server received shutdown signal")
return nil
default:
targetListen, err := net.ListenTCP("tcp", targetAddr)
targetConn, err := targetListen.AcceptTCP()
if err != nil {
log.Error("Unable to listen target address: [%v]", targetAddr)
return err
log.Error("Unable to accept connections form target address: [%v] %v", targetAddr, err)
time.Sleep(1 * time.Second)
continue
}
defer targetListen.Close()
sem := make(chan struct{}, internal.MaxSemaphoreLimit)
for {
targetConn, err := targetListen.AcceptTCP()
clientAddr := targetConn.RemoteAddr().String()
log.Info("Target connection established from: [%v]", clientAddr)
if parsedURL.Fragment != "" {
clientIP, _, err := net.SplitHostPort(clientAddr)
if err != nil {
log.Error("Unable to accept connections form target address: [%v] %v", targetAddr, err)
log.Error("Unable to extract client IP address: [%v] %v", clientAddr, err)
targetConn.Close()
time.Sleep(1 * time.Second)
continue
}
clientAddr := targetConn.RemoteAddr().String()
log.Info("Target connection established from: [%v]", clientAddr)
if parsedURL.Fragment != "" {
clientIP, _, err := net.SplitHostPort(clientAddr)
if err != nil {
log.Error("Unable to extract client IP address: [%v] %v", clientAddr, err)
targetConn.Close()
time.Sleep(1 * time.Second)
continue
}
if _, exists := whiteList.Load(clientIP); !exists {
log.Warn("Unauthorized IP address blocked: [%v]", clientIP)
targetConn.Close()
continue
}
if _, exists := whiteList.Load(clientIP); !exists {
log.Warn("Unauthorized IP address blocked: [%v]", clientIP)
targetConn.Close()
continue
}
sem <- struct{}{}
go func(targetConn *net.TCPConn) {
defer func() { <-sem }()
mu.Lock()
_, err = linkTLS.Write([]byte("[PASSPORT]<TCP>\n"))
mu.Unlock()
if err != nil {
log.Error("Unable to send signal: %v", err)
targetConn.Close()
return
}
remoteConn, err := linkListen.Accept()
if err != nil {
log.Error("Unable to accept connections form link address: [%v] %v", linkAddr, err)
return
}
remoteTLS, ok := remoteConn.(*tls.Conn)
if !ok {
log.Error("Non-TLS connection received")
targetConn.Close()
remoteConn.Close()
return
}
if err := remoteTLS.Handshake(); err != nil {
log.Error("TLS handshake failed: %v", err)
targetConn.Close()
remoteTLS.Close()
return
}
log.Info("Starting data exchange: [%v] <-> [%v]", clientAddr, targetAddr)
if err := conn.DataExchange(remoteTLS, targetConn); err != nil {
if err == io.EOF {
log.Info("Connection closed successfully: %v", err)
} else {
log.Warn("Connection closed unexpectedly: %v", err)
}
}
}(targetConn)
}
sem <- struct{}{}
go func(targetConn *net.TCPConn) {
defer func() { <-sem }()
mu.Lock()
_, err = linkTLS.Write([]byte("[PASSPORT]<TCP>\n"))
mu.Unlock()
if err != nil {
log.Error("Unable to send signal: %v", err)
targetConn.Close()
return
}
remoteConn, err := linkListen.Accept()
if err != nil {
log.Error("Unable to accept connections form link address: [%v] %v", linkAddr, err)
return
}
remoteTLS, ok := remoteConn.(*tls.Conn)
if !ok {
log.Error("Non-TLS connection received")
targetConn.Close()
remoteConn.Close()
return
}
if err := remoteTLS.Handshake(); err != nil {
log.Error("TLS handshake failed: %v", err)
targetConn.Close()
remoteTLS.Close()
return
}
log.Info("Starting data exchange: [%v] <-> [%v]", clientAddr, targetAddr)
if err := conn.DataExchange(remoteTLS, targetConn); err != nil {
if err == io.EOF {
log.Info("Connection closed successfully: %v", err)
} else {
log.Warn("Connection closed unexpectedly: %v", err)
}
}
}(targetConn)
}
}
}
Expand Down
128 changes: 63 additions & 65 deletions internal/tunnel/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,86 +12,84 @@ import (
)

func ServeUDP(parsedURL *url.URL, whiteList *sync.Map, linkAddr *net.TCPAddr, targetAddr *net.UDPAddr, linkListen net.Listener, linkTLS *tls.Conn, mu *sync.Mutex, done <-chan struct{}) error {
targetConn, err := net.ListenUDP("udp", targetAddr)
if err != nil {
log.Error("Unable to listen target address: [%v]", targetAddr)
return err
}
defer targetConn.Close()
sem := make(chan struct{}, internal.MaxSemaphoreLimit)
for {
select {
case <-done:
log.Warn("UDP server received shutdown signal")
return nil
default:
targetConn, err := net.ListenUDP("udp", targetAddr)
buffer := make([]byte, internal.MaxDataBuffer)
n, clientAddr, err := targetConn.ReadFromUDP(buffer)
if err != nil {
log.Error("Unable to listen target address: [%v]", targetAddr)
return err
log.Error("Unable to read from client address: [%v] %v", clientAddr, err)
time.Sleep(1 * time.Second)
continue
}
defer targetConn.Close()
sem := make(chan struct{}, internal.MaxSemaphoreLimit)
for {
buffer := make([]byte, internal.MaxDataBuffer)
n, clientAddr, err := targetConn.ReadFromUDP(buffer)
if err != nil {
log.Error("Unable to read from client address: [%v] %v", clientAddr, err)
time.Sleep(1 * time.Second)
if parsedURL.Fragment != "" {
clientIP := clientAddr.IP.String()
if _, exists := whiteList.Load(clientIP); !exists {
log.Warn("Unauthorized IP address blocked: [%v]", clientIP)
continue
}
if parsedURL.Fragment != "" {
clientIP := clientAddr.IP.String()
if _, exists := whiteList.Load(clientIP); !exists {
log.Warn("Unauthorized IP address blocked: [%v]", clientIP)
continue
}
}
mu.Lock()
_, err = linkTLS.Write([]byte("[PASSPORT]<UDP>\n"))
mu.Unlock()
}
mu.Lock()
_, err = linkTLS.Write([]byte("[PASSPORT]<UDP>\n"))
mu.Unlock()
if err != nil {
log.Error("Unable to send signal: %v", err)
time.Sleep(1 * time.Second)
continue
}
remoteConn, err := linkListen.Accept()
if err != nil {
log.Error("Unable to accept connections from link address: [%v] %v", linkAddr, err)
time.Sleep(1 * time.Second)
continue
}
remoteTLS, ok := remoteConn.(*tls.Conn)
if !ok {
log.Error("Non-TLS connection received")
remoteConn.Close()
time.Sleep(1 * time.Second)
continue
}
if err := remoteTLS.Handshake(); err != nil {
log.Error("TLS handshake failed: %v", err)
remoteTLS.Close()
time.Sleep(1 * time.Second)
continue
}
sem <- struct{}{}
go func(buffer []byte, n int, remoteTLS *tls.Conn, clientAddr *net.UDPAddr) {
defer func() {
<-sem
remoteTLS.Close()
}()
log.Info("Starting data transfer: [%v] <-> [%v]", clientAddr, targetAddr)
_, err = remoteTLS.Write(buffer[:n])
if err != nil {
log.Error("Unable to send signal: %v", err)
time.Sleep(1 * time.Second)
continue
log.Error("Unable to write to link address: [%v] %v", linkAddr, err)
return
}
remoteConn, err := linkListen.Accept()
n, err = remoteTLS.Read(buffer)
if err != nil {
log.Error("Unable to accept connections from link address: [%v] %v", linkAddr, err)
time.Sleep(1 * time.Second)
continue
}
remoteTLS, ok := remoteConn.(*tls.Conn)
if !ok {
log.Error("Non-TLS connection received")
remoteConn.Close()
time.Sleep(1 * time.Second)
continue
log.Error("Unable to read from link address: [%v] %v", linkAddr, err)
return
}
if err := remoteTLS.Handshake(); err != nil {
log.Error("TLS handshake failed: %v", err)
remoteTLS.Close()
time.Sleep(1 * time.Second)
continue
_, err = targetConn.WriteToUDP(buffer[:n], clientAddr)
if err != nil {
log.Error("Unable to write to client address: [%v] %v", clientAddr, err)
return
}
sem <- struct{}{}
go func(buffer []byte, n int, remoteTLS *tls.Conn, clientAddr *net.UDPAddr) {
defer func() {
<-sem
remoteTLS.Close()
}()
log.Info("Starting data transfer: [%v] <-> [%v]", clientAddr, targetAddr)
_, err = remoteTLS.Write(buffer[:n])
if err != nil {
log.Error("Unable to write to link address: [%v] %v", linkAddr, err)
return
}
n, err = remoteTLS.Read(buffer)
if err != nil {
log.Error("Unable to read from link address: [%v] %v", linkAddr, err)
return
}
_, err = targetConn.WriteToUDP(buffer[:n], clientAddr)
if err != nil {
log.Error("Unable to write to client address: [%v] %v", clientAddr, err)
return
}
log.Info("Transfer completed successfully")
}(buffer, n, remoteTLS, clientAddr)
}
log.Info("Transfer completed successfully")
}(buffer, n, remoteTLS, clientAddr)
}
}
}
Expand Down

0 comments on commit 4438e24

Please sign in to comment.