Skip to content

Commit

Permalink
Fix bug that causes resourse closing failure
Browse files Browse the repository at this point in the history
  • Loading branch information
yosebyte authored Dec 16, 2024
1 parent 80eaa93 commit ece70ec
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 139 deletions.
1 change: 0 additions & 1 deletion internal/tunnel/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ func Client(parsedURL *url.URL) error {
}
defer linkConn.Close()
if err := linkConn.Handshake(); err != nil {
linkConn.Close()
return err
}
log.Info("Tunnel connection established to: [%v]", linkAddr)
Expand Down
11 changes: 7 additions & 4 deletions internal/tunnel/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,25 +53,28 @@ func Server(parsedURL *url.URL, whiteList *sync.Map, tlsConfig *tls.Config) erro
log.Info("Tunnel connection established from: [%v]", linkConn.RemoteAddr().String())
var sharedMU sync.Mutex
errChan := make(chan error, 2)
done := make(chan struct{})
go func() {
for {
time.Sleep(internal.MaxReportInterval * time.Second)
sharedMU.Lock()
_, err = linkTLS.Write([]byte("[REPORT]\n"))
sharedMU.Unlock()
if err != nil {
log.Error("TLS connection health check failed: %v", err)
log.Error("Tunnel connection health check failed: %v", err)
linkTLS.Close()
linkListen.Close()
close(done)
errChan <- err
return
break
}
}
}()
go func() {
errChan <- ServeTCP(parsedURL, whiteList, linkAddr, targetTCPAddr, linkListen, linkTLS, &sharedMU)
errChan <- ServeTCP(parsedURL, whiteList, linkAddr, targetTCPAddr, linkListen, linkTLS, &sharedMU, done)
}()
go func() {
errChan <- ServeUDP(parsedURL, whiteList, linkAddr, targetUDPAddr, linkListen, linkTLS, &sharedMU)
errChan <- ServeUDP(parsedURL, whiteList, linkAddr, targetUDPAddr, linkListen, linkTLS, &sharedMU, done)
}()
return <-errChan
}
136 changes: 72 additions & 64 deletions internal/tunnel/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,75 +13,83 @@ import (
"github.com/yosebyte/passport/pkg/log"
)

func ServeTCP(parsedURL *url.URL, whiteList *sync.Map, linkAddr, targetAddr *net.TCPAddr, linkListen net.Listener, linkTLS *tls.Conn, mu *sync.Mutex) error {
targetListen, err := net.ListenTCP("tcp", targetAddr)
if err != nil {
log.Error("Unable to listen target address: [%v]", targetAddr)
return err
}
defer targetListen.Close()
sem := make(chan struct{}, internal.MaxSemaphoreLimit)
func ServeTCP(parsedURL *url.URL, whiteList *sync.Map, linkAddr, targetAddr *net.TCPAddr, linkListen net.Listener, linkTLS *tls.Conn, mu *sync.Mutex, done <-chan struct{}) error {
for {
targetConn, err := targetListen.AcceptTCP()
if err != nil {
log.Error("Unable to accept connections form target address: [%v] %v", targetAddr, err)
time.Sleep(1 * time.Second)
continue
}
clientAddr := targetConn.RemoteAddr().String()
log.Info("Target connection established from: [%v]", clientAddr)
if parsedURL.Fragment != "" {
clientIP, _, err := net.SplitHostPort(clientAddr)
if err != nil {
log.Error("Unable to extract client IP address: [%v] %v", clientAddr, err)
targetConn.Close()
time.Sleep(1 * time.Second)
continue
}
if _, exists := whiteList.Load(clientIP); !exists {
log.Warn("Unauthorized IP address blocked: [%v]", clientIP)
targetConn.Close()
continue
}
}
sem <- struct{}{}
go func(targetConn *net.TCPConn) {
defer func() { <-sem }()
mu.Lock()
_, err = linkTLS.Write([]byte("[PASSPORT]<TCP>\n"))
mu.Unlock()
select {
case <-done:
log.Warn("TCP server received shutdown signal")
return nil
default:
targetListen, err := net.ListenTCP("tcp", targetAddr)
if err != nil {
log.Error("Unable to send signal: %v", err)
targetConn.Close()
return
log.Error("Unable to listen target address: [%v]", targetAddr)
return err
}
remoteConn, err := linkListen.Accept()
if err != nil {
log.Error("Unable to accept connections form link address: [%v] %v", linkAddr, err)
return
}
remoteTLS, ok := remoteConn.(*tls.Conn)
if !ok {
log.Error("Non-TLS connection received")
targetConn.Close()
remoteConn.Close()
return
}
if err := remoteTLS.Handshake(); err != nil {
log.Error("TLS handshake failed: %v", err)
targetConn.Close()
remoteTLS.Close()
return
}
log.Info("Starting data exchange: [%v] <-> [%v]", clientAddr, targetAddr)
if err := conn.DataExchange(remoteTLS, targetConn); err != nil {
if err == io.EOF {
log.Info("Connection closed successfully: %v", err)
} else {
log.Warn("Connection closed unexpectedly: %v", err)
defer targetListen.Close()
sem := make(chan struct{}, internal.MaxSemaphoreLimit)
for {
targetConn, err := targetListen.AcceptTCP()
if err != nil {
log.Error("Unable to accept connections form target address: [%v] %v", targetAddr, err)
time.Sleep(1 * time.Second)
continue
}
clientAddr := targetConn.RemoteAddr().String()
log.Info("Target connection established from: [%v]", clientAddr)
if parsedURL.Fragment != "" {
clientIP, _, err := net.SplitHostPort(clientAddr)
if err != nil {
log.Error("Unable to extract client IP address: [%v] %v", clientAddr, err)
targetConn.Close()
time.Sleep(1 * time.Second)
continue
}
if _, exists := whiteList.Load(clientIP); !exists {
log.Warn("Unauthorized IP address blocked: [%v]", clientIP)
targetConn.Close()
continue
}
}
sem <- struct{}{}
go func(targetConn *net.TCPConn) {
defer func() { <-sem }()
mu.Lock()
_, err = linkTLS.Write([]byte("[PASSPORT]<TCP>\n"))
mu.Unlock()
if err != nil {
log.Error("Unable to send signal: %v", err)
targetConn.Close()
return
}
remoteConn, err := linkListen.Accept()
if err != nil {
log.Error("Unable to accept connections form link address: [%v] %v", linkAddr, err)
return
}
remoteTLS, ok := remoteConn.(*tls.Conn)
if !ok {
log.Error("Non-TLS connection received")
targetConn.Close()
remoteConn.Close()
return
}
if err := remoteTLS.Handshake(); err != nil {
log.Error("TLS handshake failed: %v", err)
targetConn.Close()
remoteTLS.Close()
return
}
log.Info("Starting data exchange: [%v] <-> [%v]", clientAddr, targetAddr)
if err := conn.DataExchange(remoteTLS, targetConn); err != nil {
if err == io.EOF {
log.Info("Connection closed successfully: %v", err)
} else {
log.Warn("Connection closed unexpectedly: %v", err)
}
}
}(targetConn)
}
}(targetConn)
}
}
}

Expand Down
148 changes: 78 additions & 70 deletions internal/tunnel/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,80 +11,88 @@ import (
"github.com/yosebyte/passport/pkg/log"
)

func ServeUDP(parsedURL *url.URL, whiteList *sync.Map, linkAddr *net.TCPAddr, targetAddr *net.UDPAddr, linkListen net.Listener, linkTLS *tls.Conn, mu *sync.Mutex) error {
targetConn, err := net.ListenUDP("udp", targetAddr)
if err != nil {
log.Error("Unable to listen target address: [%v]", targetAddr)
return err
}
defer targetConn.Close()
sem := make(chan struct{}, internal.MaxSemaphoreLimit)
func ServeUDP(parsedURL *url.URL, whiteList *sync.Map, linkAddr *net.TCPAddr, targetAddr *net.UDPAddr, linkListen net.Listener, linkTLS *tls.Conn, mu *sync.Mutex, done <-chan struct{}) error {
for {
buffer := make([]byte, internal.MaxDataBuffer)
n, clientAddr, err := targetConn.ReadFromUDP(buffer)
if err != nil {
log.Error("Unable to read from client address: [%v] %v", clientAddr, err)
time.Sleep(1 * time.Second)
continue
}
if parsedURL.Fragment != "" {
clientIP := clientAddr.IP.String()
if _, exists := whiteList.Load(clientIP); !exists {
log.Warn("Unauthorized IP address blocked: [%v]", clientIP)
continue
}
}
mu.Lock()
_, err = linkTLS.Write([]byte("[PASSPORT]<UDP>\n"))
mu.Unlock()
if err != nil {
log.Error("Unable to send signal: %v", err)
time.Sleep(1 * time.Second)
continue
}
remoteConn, err := linkListen.Accept()
if err != nil {
log.Error("Unable to accept connections from link address: [%v] %v", linkAddr, err)
time.Sleep(1 * time.Second)
continue
}
remoteTLS, ok := remoteConn.(*tls.Conn)
if !ok {
log.Error("Non-TLS connection received")
remoteConn.Close()
time.Sleep(1 * time.Second)
continue
}
if err := remoteTLS.Handshake(); err != nil {
log.Error("TLS handshake failed: %v", err)
remoteTLS.Close()
time.Sleep(1 * time.Second)
continue
}
sem <- struct{}{}
go func(buffer []byte, n int, remoteTLS *tls.Conn, clientAddr *net.UDPAddr) {
defer func() {
<-sem
remoteTLS.Close()
}()
log.Info("Starting data transfer: [%v] <-> [%v]", clientAddr, targetAddr)
_, err = remoteTLS.Write(buffer[:n])
select {
case <-done:
log.Warn("UDP server received shutdown signal")
return nil
default:
targetConn, err := net.ListenUDP("udp", targetAddr)
if err != nil {
log.Error("Unable to write to link address: [%v] %v", linkAddr, err)
return
log.Error("Unable to listen target address: [%v]", targetAddr)
return err
}
n, err = remoteTLS.Read(buffer)
if err != nil {
log.Error("Unable to read from link address: [%v] %v", linkAddr, err)
return
defer targetConn.Close()
sem := make(chan struct{}, internal.MaxSemaphoreLimit)
for {
buffer := make([]byte, internal.MaxDataBuffer)
n, clientAddr, err := targetConn.ReadFromUDP(buffer)
if err != nil {
log.Error("Unable to read from client address: [%v] %v", clientAddr, err)
time.Sleep(1 * time.Second)
continue
}
if parsedURL.Fragment != "" {
clientIP := clientAddr.IP.String()
if _, exists := whiteList.Load(clientIP); !exists {
log.Warn("Unauthorized IP address blocked: [%v]", clientIP)
continue
}
}
mu.Lock()
_, err = linkTLS.Write([]byte("[PASSPORT]<UDP>\n"))
mu.Unlock()
if err != nil {
log.Error("Unable to send signal: %v", err)
time.Sleep(1 * time.Second)
continue
}
remoteConn, err := linkListen.Accept()
if err != nil {
log.Error("Unable to accept connections from link address: [%v] %v", linkAddr, err)
time.Sleep(1 * time.Second)
continue
}
remoteTLS, ok := remoteConn.(*tls.Conn)
if !ok {
log.Error("Non-TLS connection received")
remoteConn.Close()
time.Sleep(1 * time.Second)
continue
}
if err := remoteTLS.Handshake(); err != nil {
log.Error("TLS handshake failed: %v", err)
remoteTLS.Close()
time.Sleep(1 * time.Second)
continue
}
sem <- struct{}{}
go func(buffer []byte, n int, remoteTLS *tls.Conn, clientAddr *net.UDPAddr) {
defer func() {
<-sem
remoteTLS.Close()
}()
log.Info("Starting data transfer: [%v] <-> [%v]", clientAddr, targetAddr)
_, err = remoteTLS.Write(buffer[:n])
if err != nil {
log.Error("Unable to write to link address: [%v] %v", linkAddr, err)
return
}
n, err = remoteTLS.Read(buffer)
if err != nil {
log.Error("Unable to read from link address: [%v] %v", linkAddr, err)
return
}
_, err = targetConn.WriteToUDP(buffer[:n], clientAddr)
if err != nil {
log.Error("Unable to write to client address: [%v] %v", clientAddr, err)
return
}
log.Info("Transfer completed successfully")
}(buffer, n, remoteTLS, clientAddr)
}
_, err = targetConn.WriteToUDP(buffer[:n], clientAddr)
if err != nil {
log.Error("Unable to write to client address: [%v] %v", clientAddr, err)
return
}
log.Info("Transfer completed successfully")
}(buffer, n, remoteTLS, clientAddr)
}
}
}

Expand Down

0 comments on commit ece70ec

Please sign in to comment.