diff --git a/consistence/data_placement_mgr.go b/consistence/data_placement_mgr.go index c754c8a0..8c4e7c86 100644 --- a/consistence/data_placement_mgr.go +++ b/consistence/data_placement_mgr.go @@ -1614,6 +1614,7 @@ func (dpm *DataPlacement) prepareCandidateNodesForNewLeader(topicInfo *TopicPart coordLog.Infof("failed to get log id on replica: %v, %v", replica, err) continue } + // TODO: maybe we should check the epoch in the log, is it possible we have higher epoch with less log id? if cid > newestLogID { newestReplicas = newestReplicas[0:0] newestReplicas = append(newestReplicas, replica) diff --git a/consistence/nsqd_coordinator.go b/consistence/nsqd_coordinator.go index dd8d8a40..353c176c 100644 --- a/consistence/nsqd_coordinator.go +++ b/consistence/nsqd_coordinator.go @@ -742,8 +742,11 @@ func (ncoord *NsqdCoordinator) loadLocalTopicData() error { partition := topic.GetTopicPart() if tc, err := ncoord.getTopicCoordData(topicName, partition); err == nil && tc != nil { // already loaded + if tc.IsNeedFix() { + go ncoord.requestLeaveFromISR(topicName, partition) + } if tc.topicLeaderSession.LeaderNode == nil || tc.topicLeaderSession.Session == "" { - if tc.topicInfo.Leader == ncoord.myNode.GetID() { + if tc.topicInfo.Leader == ncoord.myNode.GetID() && !tc.IsNeedFix() { err := ncoord.acquireTopicLeader(&tc.topicInfo) if err != nil { coordLog.Infof("failed to acquire leader : %v", err) @@ -869,6 +872,7 @@ func (ncoord *NsqdCoordinator) loadLocalTopicData() error { if localErr != nil { coordLog.Errorf("check local topic %v data need to be fixed:%v", topicInfo.GetTopicDesp(), localErr) topic.SetDataFixState(true) + tc.GetData().setNeedFix(true) go ncoord.requestLeaveFromISR(topicInfo.Name, topicInfo.Partition) } else if !topicInfo.OrderedMulti { delayQ := topic.GetDelayedQueue() @@ -876,6 +880,7 @@ func (ncoord *NsqdCoordinator) loadLocalTopicData() error { if localErr != nil { coordLog.Errorf("check local topic %v delayed queue data need to be fixed:%v", topicInfo.GetTopicDesp(), localErr) delayQ.SetDataFixState(true) + tc.GetData().setNeedFix(true) go ncoord.requestLeaveFromISR(topicInfo.Name, topicInfo.Partition) } if delayQ != nil { @@ -883,11 +888,12 @@ func (ncoord *NsqdCoordinator) loadLocalTopicData() error { if localErr != nil { coordLog.Errorf("check local topic %v delayed queue data need to be fixed:%v", topicInfo.GetTopicDesp(), localErr) delayQ.SetDataFixState(true) + tc.GetData().setNeedFix(true) go ncoord.requestLeaveFromISR(topicInfo.Name, topicInfo.Partition) } } } - if topicInfo.Leader == ncoord.myNode.GetID() { + if !tc.IsNeedFix() && topicInfo.Leader == ncoord.myNode.GetID() { coordLog.Infof("topic %v starting as leader.", topicInfo.GetTopicDesp()) tc.DisableWrite(true) err := ncoord.acquireTopicLeader(topicInfo) @@ -1575,6 +1581,8 @@ func (ncoord *NsqdCoordinator) decideCatchupCommitLogInfo(tc *TopicCoordinator, localLogSegStart, _, _ := logMgr.GetLogStartInfo() countNumIndex, _ := logMgr.ConvertToCountIndex(logIndex, offset) + // TODO: here we need check if we have the same epoch with leader in the last commit log, + // If not, we need to truncate the catchup to make sure the last old epoch log is the same with leader coordLog.Infof("topic %v catchup commit log begin :%v at: %v:%v:%v", topicInfo.GetTopicDesp(), localLogSegStart, logIndex, offset, countNumIndex) for offset > localLogSegStart.SegmentStartOffset || logIndex > localLogSegStart.SegmentStartIndex { @@ -1693,6 +1701,7 @@ func (ncoord *NsqdCoordinator) decideCatchupCommitLogInfo(tc *TopicCoordinator, if localErr != nil { coordLog.Errorf("check local topic %v data need to be fixed:%v", topicInfo.GetTopicDesp(), localErr) localLogQ.SetDataFixState(true) + tc.setNeedFix(true) } if localLogQ.IsDataNeedFix() { @@ -1714,6 +1723,7 @@ func (ncoord *NsqdCoordinator) decideCatchupCommitLogInfo(tc *TopicCoordinator, localTopic.Unlock() if localErr != nil { localLogQ.SetDataFixState(true) + tc.setNeedFix(true) coordLog.Errorf("failed to reset local topic %v data: %v", localTopic.GetFullName(), localErr) return logIndex, offset, needFullSync, &CoordErr{localErr.Error(), RpcNoErr, CoordLocalErr} } @@ -1743,6 +1753,7 @@ func (ncoord *NsqdCoordinator) decideCatchupCommitLogInfo(tc *TopicCoordinator, if localErr != nil { coordLog.Errorf("failed to reset local topic data: %v", localErr) localLogQ.SetDataFixState(true) + tc.setNeedFix(true) return logIndex, offset, needFullSync, &CoordErr{localErr.Error(), RpcNoErr, CoordLocalErr} } _, localErr = logMgr.TruncateToOffsetV2(0, 0) @@ -1779,6 +1790,7 @@ func (ncoord *NsqdCoordinator) decideCatchupCommitLogInfo(tc *TopicCoordinator, localTopic.Unlock() if localErr != nil { localLogQ.SetDataFixState(true) + tc.setNeedFix(true) coordLog.Warningf("reset topic %v queue with start %v failed: %v", topicInfo.GetTopicDesp(), firstLogData, localErr) return logIndex, offset, needFullSync, &CoordErr{localErr.Error(), RpcNoErr, CoordLocalErr} } @@ -1914,6 +1926,7 @@ func (ncoord *NsqdCoordinator) pullCatchupDataFromLeader(tc *TopicCoordinator, localLogQ.ForceFlush() logIndex, offset = logMgr.GetCurrentEnd() localLogQ.SetDataFixState(false) + tc.setNeedFix(false) if synced { break } @@ -2326,6 +2339,9 @@ func (ncoord *NsqdCoordinator) updateTopicInfo(topicCoord *TopicCoordinator, sho return err } + if newCoordData.IsNeedFix() { + go ncoord.requestLeaveFromISR(newTopicInfo.Name, newTopicInfo.Partition) + } if newTopicInfo.Leader == ncoord.myNode.GetID() { // not leader before and became new leader if oldData.GetLeader() != ncoord.myNode.GetID() { @@ -2337,7 +2353,7 @@ func (ncoord *NsqdCoordinator) updateTopicInfo(topicCoord *TopicCoordinator, sho if shouldDisableWrite { topicCoord.DisableWrite(true) } - if needAcquireLeaderSession { + if needAcquireLeaderSession && !newCoordData.IsNeedFix() { go ncoord.acquireTopicLeader(newTopicInfo) } } else { @@ -2359,6 +2375,10 @@ func (ncoord *NsqdCoordinator) notifyAcquireTopicLeader(coord *coordData) *Coord if atomic.LoadInt32(&ncoord.stopping) == 1 { return ErrClusterChanged } + if coord.IsNeedFix() { + coordLog.Warningf("topic %v is need fix, should not acquire leader", coord.topicInfo.GetTopicDesp()) + return nil + } coordLog.Infof("I am notified to acquire topic leader %v.", coord.topicInfo) go ncoord.acquireTopicLeader(&coord.topicInfo) return nil @@ -2384,6 +2404,7 @@ func (ncoord *NsqdCoordinator) TryFixLocalTopic(topic string, pid int) error { localTopic.Unlock() if localErr == nil { localTopic.SetDataFixState(false) + topicCoord.setNeedFix(false) } return nil } @@ -2410,6 +2431,7 @@ func (ncoord *NsqdCoordinator) switchStateForMaster(topicCoord *TopicCoordinator atomic.StoreInt32(&topicCoord.disableWrite, 1) isWriteDisabled = true localTopic.SetDataFixState(true) + tcData.setNeedFix(true) localTopic.DisableForSlave(master) } if tcData.delayedLogMgr != nil && !tcData.topicInfo.OrderedMulti { @@ -2420,6 +2442,7 @@ func (ncoord *NsqdCoordinator) switchStateForMaster(topicCoord *TopicCoordinator isWriteDisabled = true localTopic.GetDelayedQueue().SetDataFixState(true) localTopic.DisableForSlave(master) + tcData.setNeedFix(true) } } localTopic.Unlock() @@ -2555,6 +2578,10 @@ func (ncoord *NsqdCoordinator) updateTopicLeaderSession(topicCoord *TopicCoordin } tcData.updateBufferSize(int(dyConf.SyncEvery - 1)) localTopic.SetDynamicInfo(*dyConf, tcData.logMgr) + + if tcData.IsNeedFix() { + go ncoord.requestLeaveFromISR(tcData.topicInfo.Name, tcData.topicInfo.Partition) + } // leader changed (maybe down), we make sure out data is flushed to keep data safe ncoord.switchStateForMaster(topicCoord, localTopic, false) @@ -2564,7 +2591,7 @@ func (ncoord *NsqdCoordinator) updateTopicLeaderSession(topicCoord *TopicCoordin } else { if newLS == nil || newLS.LeaderNode == nil || newLS.Session == "" { coordLog.Infof("topic leader is missing : %v", tcData.topicInfo.GetTopicDesp()) - if tcData.GetLeader() == ncoord.myNode.GetID() { + if tcData.GetLeader() == ncoord.myNode.GetID() && !tcData.IsNeedFix() { go ncoord.acquireTopicLeader(&tcData.topicInfo) } } else { diff --git a/consistence/topic_coordinator.go b/consistence/topic_coordinator.go index de35f698..c76d7633 100644 --- a/consistence/topic_coordinator.go +++ b/consistence/topic_coordinator.go @@ -124,6 +124,15 @@ type coordData struct { syncedConsumeMgr *ChannelConsumeMgr logMgr *TopicCommitLogMgr delayedLogMgr *TopicCommitLogMgr + needFix int32 +} + +func (cd *coordData) setNeedFix(needFix bool) { + if needFix { + atomic.StoreInt32(&cd.needFix, 1) + } else { + atomic.StoreInt32(&cd.needFix, 0) + } } func (cd *coordData) updateBufferSize(bs int) { @@ -326,6 +335,10 @@ func (tc *TopicCoordinator) IsForceLeave() bool { return atomic.LoadInt32(&tc.forceLeave) == 1 } +func (cd *coordData) IsNeedFix() bool { + return atomic.LoadInt32(&cd.needFix) == 1 +} + func (cd *coordData) GetLeader() string { return cd.topicInfo.Leader } diff --git a/nsqadmin/http.go b/nsqadmin/http.go index a8bf3fc1..46999f98 100644 --- a/nsqadmin/http.go +++ b/nsqadmin/http.go @@ -495,6 +495,9 @@ func (s *httpServer) topicHandler(w http.ResponseWriter, req *http.Request, ps h s.ctx.nsqadmin.logf("WARNING: %s", err) messages = append(messages, pe.Error()) } + if producers.Len() == 0 { + return nil, http_api.Err{404, "NODE_NOT_FOUND"} + } topicStats, _, err := s.ci.GetNSQDStatsWithClients(producers, topicName, "partition", true) if err != nil { pe, ok := err.(clusterinfo.PartialErr) @@ -587,6 +590,9 @@ func (s *httpServer) channelHandler(w http.ResponseWriter, req *http.Request, ps s.ctx.nsqadmin.logf("WARNING: %s", err) messages = append(messages, pe.Error()) } + if producers.Len() == 0 { + return nil, http_api.Err{404, "NODE_NOT_FOUND"} + } _, allChannelStats, err := s.ci.GetNSQDStatsWithClients(producers, topicName, "partition", true) if err != nil { pe, ok := err.(clusterinfo.PartialErr) diff --git a/nsqd/channel_test.go b/nsqd/channel_test.go index e6d52d9c..b021fac9 100644 --- a/nsqd/channel_test.go +++ b/nsqd/channel_test.go @@ -1627,6 +1627,8 @@ func TestChannelSkipZanTestForOrdered(t *testing.T) { topic := nsqd.GetTopicWithExt(topicName, 0, true) channel := topic.GetChannel("order_channel") channel.SetOrdered(true) + // sleep here since set order will reset reader + time.Sleep(time.Millisecond) channel.doSkipZanTest(false) msgs := make([]*Message, 0, 3) @@ -1663,7 +1665,7 @@ func TestChannelSkipZanTestForOrdered(t *testing.T) { } topic.PutMessages(msgs) topic.flushBuffer(true) - time.Sleep(time.Second) + time.Sleep(time.Millisecond) // consume normal message and some test message for i := 0; i < 3; i++ { outputMsg := <-channel.clientMsgChan @@ -1672,7 +1674,6 @@ func TestChannelSkipZanTestForOrdered(t *testing.T) { channel.FinishMessageForce(0, "", outputMsg.ID, true) channel.ContinueConsumeForOrder() } - time.Sleep(time.Millisecond * 10) // make sure zan test timeout outputMsg := <-channel.clientMsgChan t.Logf("consume %v", string(outputMsg.Body)) @@ -1681,7 +1682,7 @@ func TestChannelSkipZanTestForOrdered(t *testing.T) { time.Sleep(time.Millisecond * 10) // skip zan test soon to make sure the zan test is inflight channel.doSkipZanTest(true) - time.Sleep(time.Second * 3) + time.Sleep(opts.MsgTimeout * 2) toC := time.After(time.Second * 30) // set zan test skip and should continue consume normal messages diff --git a/nsqd/delay_queue.go b/nsqd/delay_queue.go index b5d31878..ccdf4cfa 100644 --- a/nsqd/delay_queue.go +++ b/nsqd/delay_queue.go @@ -828,14 +828,17 @@ func (q *DelayQueue) put(m *Message, rawData []byte, trace bool, checkSize int64 } oldMsgKey = getDelayedMsgDBKey(int(m.DelayedType), m.DelayedChannel, ts, oldID) oldV = b.Get(oldMsgKey) - oldMsg, err := DecodeDelayedMessage(oldV, q.IsExt()) - if err != nil { - return err - } - // the value from old index is not the same message we are inserting, this may happend when old code use the wrong id in index - if oldMsg.DelayedOrigID != m.DelayedOrigID || oldMsg.DelayedChannel != m.DelayedChannel || oldMsg.DelayedType != m.DelayedType { - nsqLog.Infof("found old delayed index key %v (%v, %v) msg value not matched : %v, %v", newIndexKey, iv, oldMsgKey, oldMsg, m) - oldV = nil + // it may have some old data which only have index but no msg value + if len(oldV) > 0 { + oldMsg, err := DecodeDelayedMessage(oldV, q.IsExt()) + if err != nil { + nsqLog.Warningf("found old delayed index key %v (%v, %v) msg value data wrong: %v, %v", newIndexKey, iv, oldMsgKey, oldV, m) + // we can just delete this safely + } else if oldMsg.DelayedOrigID != m.DelayedOrigID || oldMsg.DelayedChannel != m.DelayedChannel || oldMsg.DelayedType != m.DelayedType { + // the value from old index is not the same message we are inserting, this may happend when old code use the wrong id in index + nsqLog.Infof("found old delayed index key %v (%v, %v) msg value not matched : %v, %v", newIndexKey, iv, oldMsgKey, oldMsg, m) + oldV = nil + } } } exists := oldV != nil