Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic session deadlock bug fix #523

Merged
merged 2 commits into from
Nov 29, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 23 additions & 9 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/quickfixgo/quickfix/internal"
)

//The Session is the primary FIX abstraction for message communication
// The Session is the primary FIX abstraction for message communication
type session struct {
store MessageStore

Expand Down Expand Up @@ -50,8 +50,8 @@ func (s *session) logError(err error) {
s.log.OnEvent(err.Error())
}

//TargetDefaultApplicationVersionID returns the default application version ID for messages received by this version.
//Applicable for For FIX.T.1 sessions.
// TargetDefaultApplicationVersionID returns the default application version ID for messages received by this version.
// Applicable for For FIX.T.1 sessions.
func (s *session) TargetDefaultApplicationVersionID() string {
return s.targetDefaultApplVerID
}
Expand Down Expand Up @@ -204,7 +204,7 @@ func (s *session) resend(msg *Message) bool {
return s.application.ToApp(msg, s.sessionID) == nil
}

//queueForSend will validate, persist, and queue the message for send
// queueForSend will validate, persist, and queue the message for send
func (s *session) queueForSend(msg *Message) error {
s.sendMutex.Lock()
defer s.sendMutex.Unlock()
Expand All @@ -224,7 +224,7 @@ func (s *session) queueForSend(msg *Message) error {
return nil
}

//send will validate, persist, queue the message. If the session is logged on, send all messages in the queue
// send will validate, persist, queue the message. If the session is logged on, send all messages in the queue
func (s *session) send(msg *Message) error {
return s.sendInReplyTo(msg, nil)
}
Expand All @@ -247,7 +247,7 @@ func (s *session) sendInReplyTo(msg *Message, inReplyTo *Message) error {
return nil
}

//dropAndReset will drop the send queue and reset the message store
// dropAndReset will drop the send queue and reset the message store
func (s *session) dropAndReset() error {
s.sendMutex.Lock()
defer s.sendMutex.Unlock()
Expand All @@ -256,7 +256,7 @@ func (s *session) dropAndReset() error {
return s.store.Reset()
}

//dropAndSend will validate and persist the message, then drops the send queue and sends the message.
// dropAndSend will validate and persist the message, then drops the send queue and sends the message.
func (s *session) dropAndSend(msg *Message) error {
return s.dropAndSendInReplyTo(msg, nil)
}
Expand Down Expand Up @@ -738,12 +738,26 @@ func (s *session) onAdmin(msg interface{}) {

func (s *session) run() {
s.Start(s)
var stopChan = make(chan struct{})
s.stateTimer = internal.NewEventTimer(func() {
select {
//deadlock in write to chan s.sessionEvent after s.Stopped()==true and end of loop session.go:766 because no reader of chan s.sessionEvent
case s.sessionEvent <- internal.NeedHeartbeat:
case <-stopChan:
}
})
s.peerTimer = internal.NewEventTimer(func() {
select {
//deadlock in write to chan s.sessionEvent after s.Stopped()==true and end of loop session.go:766 because no reader of chan s.sessionEvent
case s.sessionEvent <- internal.PeerTimeout:
case <-stopChan:
}

s.stateTimer = internal.NewEventTimer(func() { s.sessionEvent <- internal.NeedHeartbeat })
s.peerTimer = internal.NewEventTimer(func() { s.sessionEvent <- internal.PeerTimeout })
})
ticker := time.NewTicker(time.Second)

defer func() {
close(stopChan)
s.stateTimer.Stop()
s.peerTimer.Stop()
ticker.Stop()
Expand Down