From: Moiz Raja Date: Tue, 3 Mar 2015 03:49:41 +0000 (-0800) Subject: Introduce a mechanism for a Follower to signal it's sync up status X-Git-Tag: release/lithium~459^2 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=6eb0469b729fbe69dad58ce6223fc231669089c7 Introduce a mechanism for a Follower to signal it's sync up status This patch introduces a new message FollowerInitialSyncUpStatus. The purpose of the message is to inform the RaftActor or a sub-class thereof whether the Follower's commitIndex is now atleast at the same level as the Leader was when it first sent a heartbeat to the Follower. This will hopefully be useful in a rolling upgrade scenario where we may have just brought up a new node and need to wait for it to be synced with the current leader before we bring down and upgrade another node in the cluster. Change-Id: If19e4d98c2be708fc6a35823ca92cfac7ca87394 Signed-off-by: Moiz Raja --- diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/FollowerInitialSyncUpStatus.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/FollowerInitialSyncUpStatus.java new file mode 100644 index 0000000000..3ce1f5d1e8 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/FollowerInitialSyncUpStatus.java @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.raft.base.messages; + +/** + * The FollowerInitialSyncUpStatus is sent by a Follower to inform any RaftActor subclass whether the Follower + * is at least at the same commitIndex as the Leader was when it sent the follower the very first heartbeat. + * + * This status can be used to determine if a Follower has caught up with the current Leader in an upgrade scenario + * for example. + * + */ +public class FollowerInitialSyncUpStatus { + private final boolean initialSyncDone; + + public FollowerInitialSyncUpStatus(boolean initialSyncDone){ + this.initialSyncDone = initialSyncDone; + } + + public boolean isInitialSyncDone() { + return initialSyncDone; + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java index c799441d60..618865cb88 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java @@ -17,6 +17,7 @@ import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.Snapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; +import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; @@ -36,12 +37,18 @@ import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; */ public class Follower extends AbstractRaftActorBehavior { + + private SnapshotTracker snapshotTracker = null; + private final InitialSyncStatusTracker initialSyncStatusTracker; + public Follower(RaftActorContext context) { super(context, RaftState.Follower); scheduleElection(electionDuration()); + + initialSyncStatusTracker = new InitialSyncStatusTracker(context.getActor()); } private boolean isLogEntryPresent(long index){ @@ -71,6 +78,10 @@ public class Follower extends AbstractRaftActorBehavior { return -1; } + private void updateInitialSyncStatus(long currentLeaderCommit, String leaderId){ + initialSyncStatusTracker.update(leaderId, currentLeaderCommit, context.getCommitIndex()); + } + @Override protected RaftActorBehavior handleAppendEntries(ActorRef sender, AppendEntries appendEntries) { @@ -97,6 +108,7 @@ public class Follower extends AbstractRaftActorBehavior { long prevLogTerm = getLogEntryTerm(appendEntries.getPrevLogIndex()); boolean prevEntryPresent = isLogEntryPresent(appendEntries.getPrevLogIndex()); + updateInitialSyncStatus(appendEntries.getLeaderCommit(), appendEntries.getLeaderId()); boolean outOfSync = true; @@ -280,7 +292,6 @@ public class Follower extends AbstractRaftActorBehavior { private void handleInstallSnapshot(ActorRef sender, InstallSnapshot installSnapshot) { - LOG.debug("{}: InstallSnapshot received from leader {}, datasize: {} , Chunk: {}/{}", logName(), installSnapshot.getLeaderId(), installSnapshot.getData().size(), installSnapshot.getChunkIndex(), installSnapshot.getTotalChunks()); @@ -289,6 +300,8 @@ public class Follower extends AbstractRaftActorBehavior { snapshotTracker = new SnapshotTracker(LOG, installSnapshot.getTotalChunks()); } + updateInitialSyncStatus(installSnapshot.getLastIncludedIndex(), installSnapshot.getLeaderId()); + try { if(snapshotTracker.addChunk(installSnapshot.getChunkIndex(), installSnapshot.getData(), installSnapshot.getLastChunkHashCode())){ @@ -338,4 +351,36 @@ public class Follower extends AbstractRaftActorBehavior { SnapshotTracker getSnapshotTracker(){ return snapshotTracker; } + + private static class InitialSyncStatusTracker { + + private static final long INVALID_LOG_INDEX = -2L; + private long initialLeaderCommit = INVALID_LOG_INDEX; + private boolean initialSyncUpDone = false; + private String syncedLeaderId = null; + private final ActorRef actor; + + public InitialSyncStatusTracker(ActorRef actor) { + this.actor = actor; + } + + public void update(String leaderId, long leaderCommit, long commitIndex){ + + if(!leaderId.equals(syncedLeaderId)){ + initialSyncUpDone = false; + initialLeaderCommit = INVALID_LOG_INDEX; + syncedLeaderId = leaderId; + } + + if(!initialSyncUpDone){ + if(initialLeaderCommit == INVALID_LOG_INDEX){ + actor.tell(new FollowerInitialSyncUpStatus(false), ActorRef.noSender()); + initialLeaderCommit = leaderCommit; + } else if(commitIndex >= initialLeaderCommit){ + actor.tell(new FollowerInitialSyncUpStatus(true), ActorRef.noSender()); + initialSyncUpDone = true; + } + } + } + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java index 4e8e7fe11b..29fb613327 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java @@ -1,6 +1,8 @@ package org.opendaylight.controller.cluster.raft.behaviors; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import akka.actor.ActorRef; import akka.actor.Props; @@ -19,6 +21,7 @@ import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.Snapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; +import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; @@ -118,6 +121,178 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { assertEquals("isVoteGranted", false, reply.isVoteGranted()); } + + @Test + public void testHandleFirstAppendEntries() throws Exception { + logStart("testHandleFirstAppendEntries"); + + MockRaftActorContext context = createActorContext(); + + List entries = Arrays.asList( + newReplicatedLogEntry(2, 101, "foo")); + + // The new commitIndex is 101 + AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100); + + follower = createBehavior(context); + follower.handleMessage(leaderActor, appendEntries); + + FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + + assertFalse(syncStatus.isInitialSyncDone()); + } + + @Test + public void testHandleSyncUpAppendEntries() throws Exception { + logStart("testHandleSyncUpAppendEntries"); + + MockRaftActorContext context = createActorContext(); + + List entries = Arrays.asList( + newReplicatedLogEntry(2, 101, "foo")); + + // The new commitIndex is 101 + AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100); + + follower = createBehavior(context); + follower.handleMessage(leaderActor, appendEntries); + + FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + + assertFalse(syncStatus.isInitialSyncDone()); + + // Clear all the messages + followerActor.underlyingActor().clear(); + + context.setLastApplied(101); + context.setCommitIndex(101); + setLastLogEntry(context, 1, 101, + new MockRaftActorContext.MockPayload("")); + + entries = Arrays.asList( + newReplicatedLogEntry(2, 101, "foo")); + + // The new commitIndex is 101 + appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101); + follower.handleMessage(leaderActor, appendEntries); + + syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + + assertTrue(syncStatus.isInitialSyncDone()); + + followerActor.underlyingActor().clear(); + + // Sending the same message again should not generate another message + + follower.handleMessage(leaderActor, appendEntries); + + syncStatus = MessageCollectorActor.getFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + + assertNull(syncStatus); + + } + + @Test + public void testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete() throws Exception { + logStart("testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete"); + + MockRaftActorContext context = createActorContext(); + + List entries = Arrays.asList( + newReplicatedLogEntry(2, 101, "foo")); + + // The new commitIndex is 101 + AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100); + + follower = createBehavior(context); + follower.handleMessage(leaderActor, appendEntries); + + FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + + assertFalse(syncStatus.isInitialSyncDone()); + + // Clear all the messages + followerActor.underlyingActor().clear(); + + context.setLastApplied(100); + setLastLogEntry(context, 1, 100, + new MockRaftActorContext.MockPayload("")); + + entries = Arrays.asList( + newReplicatedLogEntry(2, 101, "foo")); + + // leader-2 is becoming the leader now and it says the commitIndex is 45 + appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100); + follower.handleMessage(leaderActor, appendEntries); + + syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + + // We get a new message saying initial status is not done + assertFalse(syncStatus.isInitialSyncDone()); + + } + + + @Test + public void testHandleAppendEntriesLeaderChangedAfterSyncUpComplete() throws Exception { + logStart("testHandleAppendEntriesLeaderChangedAfterSyncUpComplete"); + + MockRaftActorContext context = createActorContext(); + + List entries = Arrays.asList( + newReplicatedLogEntry(2, 101, "foo")); + + // The new commitIndex is 101 + AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100); + + follower = createBehavior(context); + follower.handleMessage(leaderActor, appendEntries); + + FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + + assertFalse(syncStatus.isInitialSyncDone()); + + // Clear all the messages + followerActor.underlyingActor().clear(); + + context.setLastApplied(101); + context.setCommitIndex(101); + setLastLogEntry(context, 1, 101, + new MockRaftActorContext.MockPayload("")); + + entries = Arrays.asList( + newReplicatedLogEntry(2, 101, "foo")); + + // The new commitIndex is 101 + appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101); + follower.handleMessage(leaderActor, appendEntries); + + syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + + assertTrue(syncStatus.isInitialSyncDone()); + + // Clear all the messages + followerActor.underlyingActor().clear(); + + context.setLastApplied(100); + setLastLogEntry(context, 1, 100, + new MockRaftActorContext.MockPayload("")); + + entries = Arrays.asList( + newReplicatedLogEntry(2, 101, "foo")); + + // leader-2 is becoming the leader now and it says the commitIndex is 45 + appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100); + follower.handleMessage(leaderActor, appendEntries); + + syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + + // We get a new message saying initial status is not done + assertFalse(syncStatus.isInitialSyncDone()); + + } + + /** * This test verifies that when an AppendEntries RPC is received by a RaftActor * with a commitIndex that is greater than what has been applied to the @@ -358,7 +533,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { } @Test - public void testHandleAppendAfterInstallingSnapshot(){ + public void testHandleAppendEntriesAfterInstallingSnapshot(){ logStart("testHandleAppendAfterInstallingSnapshot"); MockRaftActorContext context = createActorContext(); @@ -449,7 +624,64 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { assertEquals("getFollowerId", context.getId(), reply.getFollowerId()); } - Assert.assertNull("Expected null SnapshotTracker", ((Follower)follower).getSnapshotTracker()); + assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker()); + } + + @Test + public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() throws Exception { + logStart("testInitialSyncUpWithHandleInstallSnapshot"); + + MockRaftActorContext context = createActorContext(); + + follower = createBehavior(context); + + HashMap followerSnapshot = new HashMap<>(); + followerSnapshot.put("1", "A"); + followerSnapshot.put("2", "B"); + followerSnapshot.put("3", "C"); + + ByteString bsSnapshot = toByteString(followerSnapshot); + int offset = 0; + int snapshotLength = bsSnapshot.size(); + int chunkSize = 50; + int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0); + int lastIncludedIndex = 1; + int chunkIndex = 1; + InstallSnapshot lastInstallSnapshot = null; + + for(int i = 0; i < totalChunks; i++) { + ByteString chunkData = getNextChunk(bsSnapshot, offset, chunkSize); + lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1, + chunkData, chunkIndex, totalChunks); + follower.handleMessage(leaderActor, lastInstallSnapshot); + offset = offset + 50; + lastIncludedIndex++; + chunkIndex++; + } + + FollowerInitialSyncUpStatus syncStatus = + MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + + assertFalse(syncStatus.isInitialSyncDone()); + + // Clear all the messages + followerActor.underlyingActor().clear(); + + context.setLastApplied(101); + context.setCommitIndex(101); + setLastLogEntry(context, 1, 101, + new MockRaftActorContext.MockPayload("")); + + List entries = Arrays.asList( + newReplicatedLogEntry(2, 101, "foo")); + + // The new commitIndex is 101 + AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101); + follower.handleMessage(leaderActor, appendEntries); + + syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + + assertTrue(syncStatus.isInitialSyncDone()); } @Test @@ -479,7 +711,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { assertEquals("getTerm", 1, reply.getTerm()); assertEquals("getFollowerId", context.getId(), reply.getFollowerId()); - Assert.assertNull("Expected null SnapshotTracker", ((Follower)follower).getSnapshotTracker()); + assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker()); } public ByteString getNextChunk (ByteString bs, int offset, int chunkSize){