Skip to content

Commit

Permalink
Fix stuck call to Dial when calling Stop on the initiator
Browse files Browse the repository at this point in the history
This commit fixes an issue when calling Start() and then
Stop() on the initiator while the connection is likely
to fail and timeout. Sending a SIGTERM will fail since
Dial will attempt to connect until it times out and returns
on the 'waitForReconnectInterval' call.

We mitigate this problem by using a proxy.ContextDialer and
allowing to pass a context with cancellation method to the
dialer.DialContext method on 'handleConnection'.

We need to start a routine listening for the stopChan in
order to call cancel() explicitly and thus exit the DialContext
method.

Note: there are scenarios where cancel() will be called twice,
this choice was made in order to avoid a larger refactor of the
reconnect logic, but since the call to cancel() is idempotent,
this doesn't lead to any adverse effect.

fixes #653

Signed-off-by: Alexandre Beslic <[email protected]>
  • Loading branch information
abronan committed Jul 15, 2024
1 parent e3a2994 commit 5121af8
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 5 deletions.
18 changes: 16 additions & 2 deletions dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/quickfixgo/quickfix/config"
)

func loadDialerConfig(settings *SessionSettings) (dialer proxy.Dialer, err error) {
func loadDialerConfig(settings *SessionSettings) (dialer proxy.ContextDialer, err error) {
stdDialer := &net.Dialer{}
if settings.HasSetting(config.SocketTimeout) {
timeout, err := settings.DurationSetting(config.SocketTimeout)
Expand Down Expand Up @@ -73,9 +73,23 @@ func loadDialerConfig(settings *SessionSettings) (dialer proxy.Dialer, err error
}
}

dialer, err = proxy.SOCKS5("tcp", fmt.Sprintf("%s:%d", proxyHost, proxyPort), proxyAuth, dialer)
var proxyDialer proxy.Dialer

proxyDialer, err = proxy.SOCKS5("tcp", fmt.Sprintf("%s:%d", proxyHost, proxyPort), proxyAuth, stdDialer)
if err != nil {
return
}

if contextDialer, ok := proxyDialer.(proxy.ContextDialer); ok {
dialer = contextDialer
} else {
err = fmt.Errorf("proxy does not support context dialer")
return
}

default:
err = fmt.Errorf("unsupported proxy type %s", proxyType)
}

return
}
26 changes: 23 additions & 3 deletions initiator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package quickfix

import (
"bufio"
"context"
"crypto/tls"
"strings"
"sync"
Expand Down Expand Up @@ -50,7 +51,7 @@ func (i *Initiator) Start() (err error) {
return
}

var dialer proxy.Dialer
var dialer proxy.ContextDialer
if dialer, err = loadDialerConfig(settings); err != nil {
return
}
Expand Down Expand Up @@ -142,7 +143,7 @@ func (i *Initiator) waitForReconnectInterval(reconnectInterval time.Duration) bo
return true
}

func (i *Initiator) handleConnection(session *session, tlsConfig *tls.Config, dialer proxy.Dialer) {
func (i *Initiator) handleConnection(session *session, tlsConfig *tls.Config, dialer proxy.ContextDialer) {
var wg sync.WaitGroup
wg.Add(1)
go func() {
Expand All @@ -162,14 +163,28 @@ func (i *Initiator) handleConnection(session *session, tlsConfig *tls.Config, di
return
}

ctx, cancel := context.WithCancel(context.Background())
done := make(chan any)

// We start a goroutine in order to be able to cancel the dialer mid-connection
// on receiving a stop signal to stop the initiator.
go func() {
select {
case <-i.stopChan:
cancel()
case <-done:
return
}
}()

var disconnected chan interface{}
var msgIn chan fixIn
var msgOut chan []byte

address := session.SocketConnectAddress[connectionAttempt%len(session.SocketConnectAddress)]
session.log.OnEventf("Connecting to: %v", address)

netConn, err := dialer.Dial("tcp", address)
netConn, err := dialer.DialContext(ctx, "tcp", address)
if err != nil {
session.log.OnEventf("Failed to connect: %v", err)
goto reconnect
Expand Down Expand Up @@ -211,10 +226,15 @@ func (i *Initiator) handleConnection(session *session, tlsConfig *tls.Config, di
select {
case <-disconnected:
case <-i.stopChan:
cancel()
close(done)
return
}

reconnect:
cancel()
close(done)

connectionAttempt++
session.log.OnEventf("Reconnecting in %v", session.ReconnectInterval)
if !i.waitForReconnectInterval(session.ReconnectInterval) {
Expand Down

0 comments on commit 5121af8

Please sign in to comment.