From a0b8be5ce48c0d1e0b573d1952211913c58d4935 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Wed, 31 Aug 2016 00:55:57 -0400 Subject: [PATCH 1/1] Bug 6587: Retain state when transitioning between Leader and IsolatedLeader If there's a transaction in the COMMIT_PENDING state, ie it has been persisted and is in the process of being replicated, and the Leader switches to IsolatedLeader, the ClientRequestTracker state is lost. As a result when the follower(s) come back and replication consensus is achieved and the tx is applied to state, the tx ID isn't available and the ShardDataTree applies it as a foreign candidate, leaving the tx in the pending queue. This prevents subsequent transactions from making progress. To fix this, we need to retain/copy the internal leader state when transitioning between Leader and IsolatedLeader. Change-Id: If06996dccf083fd5d37757fd91fde2eb0eb82ea1 Signed-off-by: Tom Pantelis --- .../raft/behaviors/AbstractLeader.java | 19 ++-- .../behaviors/AbstractRaftActorBehavior.java | 9 +- .../raft/behaviors/IsolatedLeader.java | 9 +- .../cluster/raft/behaviors/Leader.java | 2 +- ...butedDataStoreRemotingIntegrationTest.java | 88 ++++++++----------- 5 files changed, 58 insertions(+), 69 deletions(-) diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index befc6d4129..b241e0a67a 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -96,7 +96,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private final Queue trackers = new LinkedList<>(); private Cancellable heartbeatSchedule = null; - private Optional snapshot; + private Optional snapshot = Optional.absent();; private int minReplicationCount; protected AbstractLeader(RaftActorContext context, RaftState state, @@ -105,6 +105,9 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { if(initializeFromLeader != null) { followerToLog.putAll(initializeFromLeader.followerToLog); + mapFollowerToSnapshot.putAll(initializeFromLeader.mapFollowerToSnapshot); + snapshot = initializeFromLeader.snapshot; + trackers.addAll(initializeFromLeader.trackers); } else { for(PeerInfo peerInfo: context.getPeers()) { FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(peerInfo, -1, context); @@ -116,8 +119,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { updateMinReplicaCount(); - snapshot = Optional.absent(); - // Immediately schedule a heartbeat // Upon election: send initial empty AppendEntries RPCs // (heartbeat) to each server; repeat during idle periods to @@ -299,7 +300,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { LOG.trace("{}: checking Nth index {}", logName(), N); for (FollowerLogInformation info : followerToLog.values()) { final PeerInfo peerInfo = context.getPeerInfo(info.getId()); - if(info.getMatchIndex() >= N && (peerInfo != null && peerInfo.isVoting())) { + if(info.getMatchIndex() >= N && peerInfo != null && peerInfo.isVoting()) { replicatedCount++; } else if(LOG.isTraceEnabled()) { LOG.trace("{}: Not counting follower {} - matchIndex: {}, {}", logName(), info.getId(), @@ -699,9 +700,9 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // If the follower's nextIndex is -1 then we might as well send it a snapshot // Otherwise send it a snapshot only if the nextIndex is not present in the log but is present // in the snapshot - return (nextIndex == -1 || + return nextIndex == -1 || (!context.getReplicatedLog().isPresent(nextIndex) - && context.getReplicatedLog().isInSnapshot(nextIndex))); + && context.getReplicatedLog().isInSnapshot(nextIndex)); } @@ -841,7 +842,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } } } - return (minPresent != 0); + return minPresent != 0; } /** @@ -863,8 +864,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { public FollowerToSnapshot(ByteString snapshotBytes) { this.snapshotBytes = snapshotBytes; int size = snapshotBytes.size(); - totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) + - ((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0); + totalChunks = (size / context.getConfigParams().getSnapshotChunkSize()) + + (size % context.getConfigParams().getSnapshotChunkSize() > 0 ? 1 : 0); if(LOG.isDebugEnabled()) { LOG.debug("{}: Snapshot {} bytes, total chunks to send:{}", logName(), size, totalChunks); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java index d6a4e9d98e..375b3779b8 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java @@ -419,13 +419,14 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { } protected RaftActorBehavior internalSwitchBehavior(RaftState newState) { - if(context.getRaftPolicy().automaticElectionsEnabled()){ - return internalSwitchBehavior(createBehavior(context, newState)); - } - return this; + return internalSwitchBehavior(createBehavior(context, newState)); } protected RaftActorBehavior internalSwitchBehavior(RaftActorBehavior newBehavior) { + if(!context.getRaftPolicy().automaticElectionsEnabled()) { + return this; + } + LOG.info("{} :- Switching from behavior {} to {}", logName(), this.state(), newBehavior.state()); try { close(); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/IsolatedLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/IsolatedLeader.java index c969586722..a02e40092b 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/IsolatedLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/IsolatedLeader.java @@ -8,6 +8,7 @@ package org.opendaylight.controller.cluster.raft.behaviors; import akka.actor.ActorRef; +import javax.annotation.Nullable; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; @@ -26,8 +27,12 @@ import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; * */ public class IsolatedLeader extends AbstractLeader { + IsolatedLeader(RaftActorContext context, @Nullable AbstractLeader initializeFromLeader) { + super(context, RaftState.IsolatedLeader, initializeFromLeader); + } + public IsolatedLeader(RaftActorContext context) { - super(context, RaftState.IsolatedLeader); + this(context, null); } // we received an Append Entries reply, we should switch the Behavior to Leader @@ -40,7 +45,7 @@ public class IsolatedLeader extends AbstractLeader { // changes its state to Follower, hence we only need to switch to Leader if the state is still Isolated if (ret.state() == RaftState.IsolatedLeader && !isLeaderIsolated()) { LOG.info("IsolatedLeader {} switching from IsolatedLeader to Leader", getLeaderId()); - return internalSwitchBehavior(RaftState.Leader); + return internalSwitchBehavior(new Leader(context, this)); } return ret; } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java index 58cf716106..827364c29f 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java @@ -71,7 +71,7 @@ public class Leader extends AbstractLeader { if (isLeaderIsolated()) { LOG.warn("{}: At least {} followers need to be active, Switching {} from Leader to IsolatedLeader", context.getId(), getMinIsolatedLeaderPeerCount(), getLeaderId()); - return internalSwitchBehavior(RaftState.IsolatedLeader); + return internalSwitchBehavior(new IsolatedLeader(context, this)); } else { return this; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java index d6faff694c..4c0aac4834 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java @@ -41,16 +41,12 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker; import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; -import org.opendaylight.controller.cluster.datastore.IntegrationTestKit.ShardStatsVerifier; -import org.opendaylight.controller.cluster.datastore.MemberNode.RaftStateVerifier; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException; -import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction; import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree; @@ -58,7 +54,6 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransact import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.modification.MergeModification; import org.opendaylight.controller.cluster.datastore.modification.WriteModification; -import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.Shutdown; import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; @@ -706,19 +701,11 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { // Wait for the commit to be replicated to the follower. - MemberNode.verifyRaftState(followerDistributedDataStore, "cars", new RaftStateVerifier() { - @Override - public void verify(OnDemandRaftState raftState) { - assertEquals("getLastApplied", 0, raftState.getLastApplied()); - } - }); + MemberNode.verifyRaftState(followerDistributedDataStore, "cars", + raftState -> assertEquals("getLastApplied", 0, raftState.getLastApplied())); - MemberNode.verifyRaftState(followerDistributedDataStore, "people", new RaftStateVerifier() { - @Override - public void verify(OnDemandRaftState raftState) { - assertEquals("getLastApplied", 0, raftState.getLastApplied()); - } - }); + MemberNode.verifyRaftState(followerDistributedDataStore, "people", + raftState -> assertEquals("getLastApplied", 0, raftState.getLastApplied())); // Prepare, ready and canCommit a WO tx that writes to 2 shards. This will become the current tx in // the leader shard. @@ -769,12 +756,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex))); readWriteTx.write(CarsModel.newCarPath("car" + carIndex), cars.getLast()); - IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", new ShardStatsVerifier() { - @Override - public void verify(ShardStats stats) { - assertEquals("getReadWriteTransactionCount", 1, stats.getReadWriteTransactionCount()); - } - }); + IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", + stats -> assertEquals("getReadWriteTransactionCount", 1, stats.getReadWriteTransactionCount())); // Disable elections on the leader so it switches to follower. @@ -828,24 +811,16 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer()); DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready(); - IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", new ShardStatsVerifier() { - @Override - public void verify(ShardStats stats) { - assertEquals("getTxCohortCacheSize", 1, stats.getTxCohortCacheSize()); - } - }); + IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", + stats -> assertEquals("getTxCohortCacheSize", 1, stats.getTxCohortCacheSize())); writeTx = followerDistributedDataStore.newWriteOnlyTransaction(); MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); writeTx.write(CarsModel.newCarPath("optima"), car); DOMStoreThreePhaseCommitCohort cohort2 = writeTx.ready(); - IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", new ShardStatsVerifier() { - @Override - public void verify(ShardStats stats) { - assertEquals("getTxCohortCacheSize", 2, stats.getTxCohortCacheSize()); - } - }); + IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", + stats -> assertEquals("getTxCohortCacheSize", 2, stats.getTxCohortCacheSize())); // Gracefully stop the leader via a Shutdown message. @@ -877,29 +852,40 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Test public void testTransactionWithIsolatedLeader() throws Throwable { - leaderDatastoreContextBuilder.shardIsolatedLeaderCheckIntervalInMillis(200); + // Set the isolated leader check interval high so we can control the switch to IsolatedLeader. + leaderDatastoreContextBuilder.shardIsolatedLeaderCheckIntervalInMillis(10000000); String testName = "testTransactionWithIsolatedLeader"; initDatastoresWithCars(testName); - DOMStoreWriteTransaction failWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction(); - failWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + // Tx that is submitted after the follower is stopped but before the leader transitions to IsolatedLeader. + DOMStoreWriteTransaction preIsolatedLeaderWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction(); + preIsolatedLeaderWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + // Tx that is submitted after the leader transitions to IsolatedLeader. + DOMStoreWriteTransaction noShardLeaderWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction(); + noShardLeaderWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + + // Tx that is submitted after the follower is reinstated. DOMStoreWriteTransaction successWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction(); successWriteTx.merge(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + // Stop the follower followerTestKit.watch(followerDistributedDataStore.getActorContext().getShardManager()); followerDistributedDataStore.close(); followerTestKit.expectTerminated(followerDistributedDataStore.getActorContext().getShardManager()); - MemberNode.verifyRaftState(leaderDistributedDataStore, "cars", new RaftStateVerifier() { - @Override - public void verify(OnDemandRaftState raftState) { - assertEquals("getRaftState", "IsolatedLeader", raftState.getRaftState()); - } - }); + // Submit the preIsolatedLeaderWriteTx so it's pending + DOMStoreThreePhaseCommitCohort preIsolatedLeaderTxCohort = preIsolatedLeaderWriteTx.ready(); + + // Change the isolated leader check interval low so it changes to IsolatedLeader. + sendDatastoreContextUpdate(leaderDistributedDataStore, leaderDatastoreContextBuilder. + shardIsolatedLeaderCheckIntervalInMillis(200)); + + MemberNode.verifyRaftState(leaderDistributedDataStore, "cars", + raftState -> assertEquals("getRaftState", "IsolatedLeader", raftState.getRaftState())); try { - leaderTestKit.doCommit(failWriteTx.ready()); + leaderTestKit.doCommit(noShardLeaderWriteTx.ready()); fail("Expected NoShardLeaderException"); } catch (ExecutionException e) { assertEquals("getCause", NoShardLeaderException.class, e.getCause().getClass()); @@ -908,12 +894,13 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { sendDatastoreContextUpdate(leaderDistributedDataStore, leaderDatastoreContextBuilder. shardElectionTimeoutFactor(100)); - DOMStoreThreePhaseCommitCohort writeTxCohort = successWriteTx.ready(); + DOMStoreThreePhaseCommitCohort successTxCohort = successWriteTx.ready(); followerDistributedDataStore = followerTestKit.setupDistributedDataStore(testName, MODULE_SHARDS_CARS_ONLY_1_2, false, CARS); - leaderTestKit.doCommit(writeTxCohort); + leaderTestKit.doCommit(preIsolatedLeaderTxCohort); + leaderTestKit.doCommit(successTxCohort); } @Test(expected=AskTimeoutException.class) @@ -1017,12 +1004,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { private static void sendDatastoreContextUpdate(DistributedDataStore dataStore, final Builder builder) { final Builder newBuilder = DatastoreContext.newBuilderFrom(builder.build()); DatastoreContextFactory mockContextFactory = Mockito.mock(DatastoreContextFactory.class); - Answer answer = new Answer() { - @Override - public DatastoreContext answer(InvocationOnMock invocation) { - return newBuilder.build(); - } - }; + Answer answer = invocation -> newBuilder.build(); Mockito.doAnswer(answer).when(mockContextFactory).getBaseDatastoreContext(); Mockito.doAnswer(answer).when(mockContextFactory).getShardDatastoreContext(Mockito.anyString()); dataStore.onDatastoreContextUpdated(mockContextFactory); -- 2.36.6