Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Maintain repeating group field order when parsing messages #636

Merged
merged 3 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ linters-install:
lint: linters-install
golangci-lint run

# An easy way to run the linter without going through the install process -
# docker run -t --rm -v $(pwd):/app -w /app golangci/golangci-lint:v1.57.2 golangci-lint run -v
# See https://golangci-lint.run/welcome/install/ for more details.

# ---------------------------------------------------------------
# Targets related to running acceptance tests -

Expand Down
6 changes: 5 additions & 1 deletion in_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,11 @@ func (state inSession) resendMessages(session *session, beginSeqNo, endSeqNo int
nextSeqNum := seqNum
msg := NewMessage()
for _, msgBytes := range msgs {
_ = ParseMessageWithDataDictionary(msg, bytes.NewBuffer(msgBytes), session.transportDataDictionary, session.appDataDictionary)
err = ParseMessageWithDataDictionary(msg, bytes.NewBuffer(msgBytes), session.transportDataDictionary, session.appDataDictionary)
if err != nil {
session.log.OnEventf("Resend Msg Parse Error: %v, %v", err.Error(), bytes.NewBuffer(msgBytes).String())
return // We cant continue with a message that cant be parsed correctly.
}
msgType, _ := msg.Header.GetBytes(tagMsgType)
sentMessageSeqNum, _ := msg.Header.GetInt(tagMsgSeqNum)

Expand Down
282 changes: 224 additions & 58 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,19 @@ import (
// Header is first section of a FIX Message.
type Header struct{ FieldMap }

// msgparser contains message parsing vars needed to parse a string into a message.
type msgParser struct {
msg *Message
transportDataDictionary *datadictionary.DataDictionary
appDataDictionary *datadictionary.DataDictionary
rawBytes []byte
fieldIndex int
parsedFieldBytes *TagValue
trailerBytes []byte
foundBody bool
foundTrailer bool
}

// in the message header, the first 3 tags in the message header must be 8,9,35.
func headerFieldOrdering(i, j Tag) bool {
var ordering = func(t Tag) uint32 {
Expand Down Expand Up @@ -152,124 +165,134 @@ func ParseMessageWithDataDictionary(
msg *Message,
rawMessage *bytes.Buffer,
transportDataDictionary *datadictionary.DataDictionary,
_ *datadictionary.DataDictionary,
appDataDictionary *datadictionary.DataDictionary,
) (err error) {
msg.Header.Clear()
msg.Body.Clear()
msg.Trailer.Clear()
msg.rawMessage = rawMessage
// Create msgparser before we go any further.
mp := &msgParser{
msg: msg,
transportDataDictionary: transportDataDictionary,
appDataDictionary: appDataDictionary,
}
mp.msg.rawMessage = rawMessage
mp.rawBytes = rawMessage.Bytes()

rawBytes := rawMessage.Bytes()
return doParsing(mp)
}

// Allocate fields in one chunk.
// doParsing executes the message parsing process.
func doParsing(mp *msgParser) (err error) {
// Initialize for parsing.
mp.msg.Header.Clear()
mp.msg.Body.Clear()
mp.msg.Trailer.Clear()

// Allocate expected message fields in one chunk.
fieldCount := 0
for _, b := range rawBytes {
for _, b := range mp.rawBytes {
if b == '\001' {
fieldCount++
}
}

if fieldCount == 0 {
return parseError{OrigError: fmt.Sprintf("No Fields detected in %s", string(rawBytes))}
return parseError{OrigError: fmt.Sprintf("No Fields detected in %s", string(mp.rawBytes))}
}

if cap(msg.fields) < fieldCount {
msg.fields = make([]TagValue, fieldCount)
if cap(mp.msg.fields) < fieldCount {
mp.msg.fields = make([]TagValue, fieldCount)
} else {
msg.fields = msg.fields[0:fieldCount]
mp.msg.fields = mp.msg.fields[0:fieldCount]
}

fieldIndex := 0

// Message must start with begin string, body length, msg type.
if rawBytes, err = extractSpecificField(&msg.fields[fieldIndex], tagBeginString, rawBytes); err != nil {
// Get begin string.
if mp.rawBytes, err = extractSpecificField(&mp.msg.fields[mp.fieldIndex], tagBeginString, mp.rawBytes); err != nil {
return
}
mp.msg.Header.add(mp.msg.fields[mp.fieldIndex : mp.fieldIndex+1])

msg.Header.add(msg.fields[fieldIndex : fieldIndex+1])
fieldIndex++

parsedFieldBytes := &msg.fields[fieldIndex]
if rawBytes, err = extractSpecificField(parsedFieldBytes, tagBodyLength, rawBytes); err != nil {
// Get body length.
mp.fieldIndex++
mp.parsedFieldBytes = &mp.msg.fields[mp.fieldIndex]
if mp.rawBytes, err = extractSpecificField(mp.parsedFieldBytes, tagBodyLength, mp.rawBytes); err != nil {
return
}
mp.msg.Header.add(mp.msg.fields[mp.fieldIndex : mp.fieldIndex+1])

msg.Header.add(msg.fields[fieldIndex : fieldIndex+1])
fieldIndex++

parsedFieldBytes = &msg.fields[fieldIndex]
if rawBytes, err = extractSpecificField(parsedFieldBytes, tagMsgType, rawBytes); err != nil {
// Get msg type.
mp.fieldIndex++
mp.parsedFieldBytes = &mp.msg.fields[mp.fieldIndex]
if mp.rawBytes, err = extractSpecificField(mp.parsedFieldBytes, tagMsgType, mp.rawBytes); err != nil {
return
}
mp.msg.Header.add(mp.msg.fields[mp.fieldIndex : mp.fieldIndex+1])

// Start parsing.
mp.fieldIndex++
xmlDataLen := 0
xmlDataMsg := false

msg.Header.add(msg.fields[fieldIndex : fieldIndex+1])
fieldIndex++

trailerBytes := []byte{}
foundBody := false
foundTrailer := false
mp.trailerBytes = []byte{}
mp.foundBody = false
mp.foundTrailer = false
for {
parsedFieldBytes = &msg.fields[fieldIndex]
mp.parsedFieldBytes = &mp.msg.fields[mp.fieldIndex]
if xmlDataLen > 0 {
rawBytes, err = extractXMLDataField(parsedFieldBytes, rawBytes, xmlDataLen)
mp.rawBytes, err = extractXMLDataField(mp.parsedFieldBytes, mp.rawBytes, xmlDataLen)
xmlDataLen = 0
xmlDataMsg = true
} else {
rawBytes, err = extractField(parsedFieldBytes, rawBytes)
mp.rawBytes, err = extractField(mp.parsedFieldBytes, mp.rawBytes)
}
if err != nil {
return
}

switch {
case isHeaderField(parsedFieldBytes.tag, transportDataDictionary):
msg.Header.add(msg.fields[fieldIndex : fieldIndex+1])
case isTrailerField(parsedFieldBytes.tag, transportDataDictionary):
msg.Trailer.add(msg.fields[fieldIndex : fieldIndex+1])
foundTrailer = true
case isHeaderField(mp.parsedFieldBytes.tag, mp.transportDataDictionary):
mp.msg.Header.add(mp.msg.fields[mp.fieldIndex : mp.fieldIndex+1])
case isTrailerField(mp.parsedFieldBytes.tag, mp.transportDataDictionary):
mp.msg.Trailer.add(mp.msg.fields[mp.fieldIndex : mp.fieldIndex+1])
mp.foundTrailer = true
case isNumInGroupField(mp.msg, []Tag{mp.parsedFieldBytes.tag}, mp.appDataDictionary):
parseGroup(mp, []Tag{mp.parsedFieldBytes.tag})
default:
foundBody = true
trailerBytes = rawBytes
msg.Body.add(msg.fields[fieldIndex : fieldIndex+1])
mp.foundBody = true
mp.trailerBytes = mp.rawBytes
mp.msg.Body.add(mp.msg.fields[mp.fieldIndex : mp.fieldIndex+1])
}
if parsedFieldBytes.tag == tagCheckSum {
if mp.parsedFieldBytes.tag == tagCheckSum {
break
}

if !foundBody {
msg.bodyBytes = rawBytes
if !mp.foundBody {
mp.msg.bodyBytes = mp.rawBytes
}

if parsedFieldBytes.tag == tagXMLDataLen {
xmlDataLen, _ = msg.Header.GetInt(tagXMLDataLen)
if mp.parsedFieldBytes.tag == tagXMLDataLen {
xmlDataLen, _ = mp.msg.Header.GetInt(tagXMLDataLen)
}
fieldIndex++
mp.fieldIndex++
}

// This will happen if there are no fields in the body
if foundTrailer && !foundBody {
trailerBytes = rawBytes
msg.bodyBytes = nil
if mp.foundTrailer && !mp.foundBody {
mp.trailerBytes = mp.rawBytes
mp.msg.bodyBytes = nil
}

// Body length would only be larger than trailer if fields out of order.
if len(msg.bodyBytes) > len(trailerBytes) {
msg.bodyBytes = msg.bodyBytes[:len(msg.bodyBytes)-len(trailerBytes)]
if len(mp.msg.bodyBytes) > len(mp.trailerBytes) {
mp.msg.bodyBytes = mp.msg.bodyBytes[:len(mp.msg.bodyBytes)-len(mp.trailerBytes)]
}

length := 0
for _, field := range msg.fields {
for _, field := range mp.msg.fields {
switch field.tag {
case tagBeginString, tagBodyLength, tagCheckSum: // Tags do not contribute to length.
default:
length += field.length()
}
}

bodyLength, err := msg.Header.GetInt(tagBodyLength)
bodyLength, err := mp.msg.Header.GetInt(tagBodyLength)
if err != nil {
err = parseError{OrigError: err.Error()}
} else if length != bodyLength && !xmlDataMsg {
Expand All @@ -279,6 +302,149 @@ func ParseMessageWithDataDictionary(
return
}

// parseGroup iterates through a repeating group to maintain correct order of those fields.
func parseGroup(mp *msgParser, tags []Tag) {
mp.foundBody = true
dm := mp.msg.fields[mp.fieldIndex : mp.fieldIndex+1]
fields := getGroupFields(mp.msg, tags, mp.appDataDictionary)

for {
mp.fieldIndex++
mp.parsedFieldBytes = &mp.msg.fields[mp.fieldIndex]
mp.rawBytes, _ = extractField(mp.parsedFieldBytes, mp.rawBytes)
mp.trailerBytes = mp.rawBytes

// Is this field a member for the group.
if isGroupMember(mp.parsedFieldBytes.tag, fields) {
// Is this field a nested repeating group.
if isNumInGroupField(mp.msg, append(tags, mp.parsedFieldBytes.tag), mp.appDataDictionary) {
dm = append(dm, *mp.parsedFieldBytes)
tags = append(tags, mp.parsedFieldBytes.tag)
fields = getGroupFields(mp.msg, tags, mp.appDataDictionary)
continue
}
// Add the field member to the group.
dm = append(dm, *mp.parsedFieldBytes)
} else if isHeaderField(mp.parsedFieldBytes.tag, mp.transportDataDictionary) {
// Found a header tag for some reason..
mp.msg.Body.add(dm)
mp.msg.Header.add(mp.msg.fields[mp.fieldIndex : mp.fieldIndex+1])
break
} else if isTrailerField(mp.parsedFieldBytes.tag, mp.transportDataDictionary) {
// Found the trailer at the end of the message.
mp.msg.Body.add(dm)
mp.msg.Trailer.add(mp.msg.fields[mp.fieldIndex : mp.fieldIndex+1])
mp.foundTrailer = true
break
} else {
// Found a body field outside the group.
searchTags := []Tag{mp.parsedFieldBytes.tag}
// Is this a new group not inside the existing group.
if isNumInGroupField(mp.msg, searchTags, mp.appDataDictionary) {
// Add the current repeating group.
mp.msg.Body.add(dm)
// Cycle again with the new group.
dm = mp.msg.fields[mp.fieldIndex : mp.fieldIndex+1]
fields = getGroupFields(mp.msg, searchTags, mp.appDataDictionary)
continue
}
if len(tags) > 1 {
searchTags = tags[:len(tags)-1]
}
// Did this tag occur after a nested group and belongs to the parent group.
if isNumInGroupField(mp.msg, searchTags, mp.appDataDictionary) {
// Add the field member to the group.
dm = append(dm, *mp.parsedFieldBytes)
// Continue parsing the parent group.
fields = getGroupFields(mp.msg, searchTags, mp.appDataDictionary)
continue
}
// Add the repeating group.
mp.msg.Body.add(dm)
// Add the next body field.
mp.msg.Body.add(mp.msg.fields[mp.fieldIndex : mp.fieldIndex+1])

break
}
}
}

// isNumInGroupField evaluates if this tag is the start of a repeating group.
// tags slice will contain multiple tags if the tag in question is found while processing a group already.
func isNumInGroupField(msg *Message, tags []Tag, appDataDictionary *datadictionary.DataDictionary) bool {
if appDataDictionary != nil {
msgt, err := msg.MsgType()
if err != nil {
return false
}
mm, ok := appDataDictionary.Messages[msgt]
if ok {
fields := mm.Fields
for idx, tag := range tags {
fd, ok := fields[int(tag)]
if ok {
if idx == len(tags)-1 {
if len(fd.Fields) > 0 {
return true
}
} else {
// Map nested fields.
newFields := make(map[int]*datadictionary.FieldDef)
for _, ff := range fd.Fields {
newFields[ff.Tag()] = ff
}
fields = newFields
}
}
}
}
}
return false
}

// getGroupFields gets the relevant fields for parsing a repeating group if this tag is the start of a repeating group.
// tags slice will contain multiple tags if the tag in question is found while processing a group already.
func getGroupFields(msg *Message, tags []Tag, appDataDictionary *datadictionary.DataDictionary) (fields []*datadictionary.FieldDef) {
if appDataDictionary != nil {
msgt, err := msg.MsgType()
if err != nil {
return
}
mm, ok := appDataDictionary.Messages[msgt]
if ok {
fields := mm.Fields
for idx, tag := range tags {
fd, ok := fields[int(tag)]
if ok {
if idx == len(tags)-1 {
if len(fd.Fields) > 0 {
return fd.Fields
}
} else {
// Map nested fields.
newFields := make(map[int]*datadictionary.FieldDef)
for _, ff := range fd.Fields {
newFields[ff.Tag()] = ff
}
fields = newFields
}
}
}
}
}
return
}

// isGroupMember evaluates if this tag belongs to a repeating group.
func isGroupMember(tag Tag, fields []*datadictionary.FieldDef) bool {
for _, f := range fields {
if f.Tag() == int(tag) {
return true
}
}
return false
}

func isHeaderField(tag Tag, dataDict *datadictionary.DataDictionary) bool {
if tag.IsHeader() {
return true
Expand Down
Loading
Loading