Skip to content

Commit

Permalink
Merge pull request #654 from abronan/mitigate-stuck-dial-on-initiator…
Browse files Browse the repository at this point in the history
…-stop

Fix stuck call to Dial when calling Stop on the Initiator
  • Loading branch information
ackleymi authored Aug 9, 2024
2 parents c07597e + 3939268 commit e92fa68
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 5 deletions.
18 changes: 16 additions & 2 deletions dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/quickfixgo/quickfix/config"
)

func loadDialerConfig(settings *SessionSettings) (dialer proxy.Dialer, err error) {
func loadDialerConfig(settings *SessionSettings) (dialer proxy.ContextDialer, err error) {
stdDialer := &net.Dialer{}
if settings.HasSetting(config.SocketTimeout) {
timeout, err := settings.DurationSetting(config.SocketTimeout)
Expand Down Expand Up @@ -73,9 +73,23 @@ func loadDialerConfig(settings *SessionSettings) (dialer proxy.Dialer, err error
}
}

dialer, err = proxy.SOCKS5("tcp", fmt.Sprintf("%s:%d", proxyHost, proxyPort), proxyAuth, dialer)
var proxyDialer proxy.Dialer

proxyDialer, err = proxy.SOCKS5("tcp", fmt.Sprintf("%s:%d", proxyHost, proxyPort), proxyAuth, stdDialer)
if err != nil {
return
}

if contextDialer, ok := proxyDialer.(proxy.ContextDialer); ok {
dialer = contextDialer
} else {
err = fmt.Errorf("proxy does not support context dialer")
return
}

default:
err = fmt.Errorf("unsupported proxy type %s", proxyType)
}

return
}
26 changes: 23 additions & 3 deletions initiator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package quickfix

import (
"bufio"
"context"
"crypto/tls"
"strings"
"sync"
Expand Down Expand Up @@ -50,7 +51,7 @@ func (i *Initiator) Start() (err error) {
return
}

var dialer proxy.Dialer
var dialer proxy.ContextDialer
if dialer, err = loadDialerConfig(settings); err != nil {
return
}
Expand Down Expand Up @@ -142,7 +143,7 @@ func (i *Initiator) waitForReconnectInterval(reconnectInterval time.Duration) bo
return true
}

func (i *Initiator) handleConnection(session *session, tlsConfig *tls.Config, dialer proxy.Dialer) {
func (i *Initiator) handleConnection(session *session, tlsConfig *tls.Config, dialer proxy.ContextDialer) {
var wg sync.WaitGroup
wg.Add(1)
go func() {
Expand All @@ -162,14 +163,27 @@ func (i *Initiator) handleConnection(session *session, tlsConfig *tls.Config, di
return
}

ctx, cancel := context.WithCancel(context.Background())

// We start a goroutine in order to be able to cancel the dialer mid-connection
// on receiving a stop signal to stop the initiator.
go func() {
select {
case <-i.stopChan:
cancel()
case <-ctx.Done():
return
}
}()

var disconnected chan interface{}
var msgIn chan fixIn
var msgOut chan []byte

address := session.SocketConnectAddress[connectionAttempt%len(session.SocketConnectAddress)]
session.log.OnEventf("Connecting to: %v", address)

netConn, err := dialer.Dial("tcp", address)
netConn, err := dialer.DialContext(ctx, "tcp", address)
if err != nil {
session.log.OnEventf("Failed to connect: %v", err)
goto reconnect
Expand Down Expand Up @@ -208,13 +222,19 @@ func (i *Initiator) handleConnection(session *session, tlsConfig *tls.Config, di
close(disconnected)
}()

// This ensures we properly cleanup the goroutine and context used for
// dial cancelation after successful connection.
cancel()

select {
case <-disconnected:
case <-i.stopChan:
return
}

reconnect:
cancel()

connectionAttempt++
session.log.OnEventf("Reconnecting in %v", session.ReconnectInterval)
if !i.waitForReconnectInterval(session.ReconnectInterval) {
Expand Down

0 comments on commit e92fa68

Please sign in to comment.