Skip to content

Commit

Permalink
Merge pull request #523 from shipa988/dynamic-session-deadlock-bug-fix
Browse files Browse the repository at this point in the history
Dynamic session deadlock bug fix
  • Loading branch information
ackleymi authored Nov 29, 2022
2 parents 7945e4b + 2941db5 commit 7b665fe
Showing 1 changed file with 23 additions and 9 deletions.
32 changes: 23 additions & 9 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/quickfixgo/quickfix/internal"
)

//The Session is the primary FIX abstraction for message communication
// The Session is the primary FIX abstraction for message communication
type session struct {
store MessageStore

Expand Down Expand Up @@ -50,8 +50,8 @@ func (s *session) logError(err error) {
s.log.OnEvent(err.Error())
}

//TargetDefaultApplicationVersionID returns the default application version ID for messages received by this version.
//Applicable for For FIX.T.1 sessions.
// TargetDefaultApplicationVersionID returns the default application version ID for messages received by this version.
// Applicable for For FIX.T.1 sessions.
func (s *session) TargetDefaultApplicationVersionID() string {
return s.targetDefaultApplVerID
}
Expand Down Expand Up @@ -204,7 +204,7 @@ func (s *session) resend(msg *Message) bool {
return s.application.ToApp(msg, s.sessionID) == nil
}

//queueForSend will validate, persist, and queue the message for send
// queueForSend will validate, persist, and queue the message for send
func (s *session) queueForSend(msg *Message) error {
s.sendMutex.Lock()
defer s.sendMutex.Unlock()
Expand All @@ -224,7 +224,7 @@ func (s *session) queueForSend(msg *Message) error {
return nil
}

//send will validate, persist, queue the message. If the session is logged on, send all messages in the queue
// send will validate, persist, queue the message. If the session is logged on, send all messages in the queue
func (s *session) send(msg *Message) error {
return s.sendInReplyTo(msg, nil)
}
Expand All @@ -247,7 +247,7 @@ func (s *session) sendInReplyTo(msg *Message, inReplyTo *Message) error {
return nil
}

//dropAndReset will drop the send queue and reset the message store
// dropAndReset will drop the send queue and reset the message store
func (s *session) dropAndReset() error {
s.sendMutex.Lock()
defer s.sendMutex.Unlock()
Expand All @@ -256,7 +256,7 @@ func (s *session) dropAndReset() error {
return s.store.Reset()
}

//dropAndSend will validate and persist the message, then drops the send queue and sends the message.
// dropAndSend will validate and persist the message, then drops the send queue and sends the message.
func (s *session) dropAndSend(msg *Message) error {
return s.dropAndSendInReplyTo(msg, nil)
}
Expand Down Expand Up @@ -738,12 +738,26 @@ func (s *session) onAdmin(msg interface{}) {

func (s *session) run() {
s.Start(s)
var stopChan = make(chan struct{})
s.stateTimer = internal.NewEventTimer(func() {
select {
//deadlock in write to chan s.sessionEvent after s.Stopped()==true and end of loop session.go:766 because no reader of chan s.sessionEvent
case s.sessionEvent <- internal.NeedHeartbeat:
case <-stopChan:
}
})
s.peerTimer = internal.NewEventTimer(func() {
select {
//deadlock in write to chan s.sessionEvent after s.Stopped()==true and end of loop session.go:766 because no reader of chan s.sessionEvent
case s.sessionEvent <- internal.PeerTimeout:
case <-stopChan:
}

s.stateTimer = internal.NewEventTimer(func() { s.sessionEvent <- internal.NeedHeartbeat })
s.peerTimer = internal.NewEventTimer(func() { s.sessionEvent <- internal.PeerTimeout })
})
ticker := time.NewTicker(time.Second)

defer func() {
close(stopChan)
s.stateTimer.Stop()
s.peerTimer.Stop()
ticker.Stop()
Expand Down

0 comments on commit 7b665fe

Please sign in to comment.