If there's a transaction in the COMMIT_PENDING state, ie it has been persisted and
is in the process of being replicated, and the Leader switches to IsolatedLeader, the
ClientRequestTracker state is lost. As a result when the follower(s) come back and
replication consensus is achieved and the tx is applied to state, the tx ID isn't
available and the ShardDataTree applies it as a foreign candidate, leaving the
tx in the pending queue. This prevents subsequent transactions from making progress.
To fix this, we need to retain/copy the internal leader state when transitioning
between Leader and IsolatedLeader.
Change-Id: If06996dccf083fd5d37757fd91fde2eb0eb82ea1
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
private final Queue<ClientRequestTracker> trackers = new LinkedList<>();
private Cancellable heartbeatSchedule = null;
private final Queue<ClientRequestTracker> trackers = new LinkedList<>();
private Cancellable heartbeatSchedule = null;
- private Optional<SnapshotHolder> snapshot;
+ private Optional<SnapshotHolder> snapshot = Optional.absent();;
private int minReplicationCount;
protected AbstractLeader(RaftActorContext context, RaftState state,
private int minReplicationCount;
protected AbstractLeader(RaftActorContext context, RaftState state,
if(initializeFromLeader != null) {
followerToLog.putAll(initializeFromLeader.followerToLog);
if(initializeFromLeader != null) {
followerToLog.putAll(initializeFromLeader.followerToLog);
+ mapFollowerToSnapshot.putAll(initializeFromLeader.mapFollowerToSnapshot);
+ snapshot = initializeFromLeader.snapshot;
+ trackers.addAll(initializeFromLeader.trackers);
} else {
for(PeerInfo peerInfo: context.getPeers()) {
FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(peerInfo, -1, context);
} else {
for(PeerInfo peerInfo: context.getPeers()) {
FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(peerInfo, -1, context);
- snapshot = Optional.absent();
-
// Immediately schedule a heartbeat
// Upon election: send initial empty AppendEntries RPCs
// (heartbeat) to each server; repeat during idle periods to
// Immediately schedule a heartbeat
// Upon election: send initial empty AppendEntries RPCs
// (heartbeat) to each server; repeat during idle periods to
LOG.trace("{}: checking Nth index {}", logName(), N);
for (FollowerLogInformation info : followerToLog.values()) {
final PeerInfo peerInfo = context.getPeerInfo(info.getId());
LOG.trace("{}: checking Nth index {}", logName(), N);
for (FollowerLogInformation info : followerToLog.values()) {
final PeerInfo peerInfo = context.getPeerInfo(info.getId());
- if(info.getMatchIndex() >= N && (peerInfo != null && peerInfo.isVoting())) {
+ if(info.getMatchIndex() >= N && peerInfo != null && peerInfo.isVoting()) {
replicatedCount++;
} else if(LOG.isTraceEnabled()) {
LOG.trace("{}: Not counting follower {} - matchIndex: {}, {}", logName(), info.getId(),
replicatedCount++;
} else if(LOG.isTraceEnabled()) {
LOG.trace("{}: Not counting follower {} - matchIndex: {}, {}", logName(), info.getId(),
// If the follower's nextIndex is -1 then we might as well send it a snapshot
// Otherwise send it a snapshot only if the nextIndex is not present in the log but is present
// in the snapshot
// If the follower's nextIndex is -1 then we might as well send it a snapshot
// Otherwise send it a snapshot only if the nextIndex is not present in the log but is present
// in the snapshot
- return (nextIndex == -1 ||
+ return nextIndex == -1 ||
(!context.getReplicatedLog().isPresent(nextIndex)
(!context.getReplicatedLog().isPresent(nextIndex)
- && context.getReplicatedLog().isInSnapshot(nextIndex)));
+ && context.getReplicatedLog().isInSnapshot(nextIndex));
- return (minPresent != 0);
+ return minPresent != 0;
public FollowerToSnapshot(ByteString snapshotBytes) {
this.snapshotBytes = snapshotBytes;
int size = snapshotBytes.size();
public FollowerToSnapshot(ByteString snapshotBytes) {
this.snapshotBytes = snapshotBytes;
int size = snapshotBytes.size();
- totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) +
- ((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0);
+ totalChunks = (size / context.getConfigParams().getSnapshotChunkSize()) +
+ (size % context.getConfigParams().getSnapshotChunkSize() > 0 ? 1 : 0);
if(LOG.isDebugEnabled()) {
LOG.debug("{}: Snapshot {} bytes, total chunks to send:{}",
logName(), size, totalChunks);
if(LOG.isDebugEnabled()) {
LOG.debug("{}: Snapshot {} bytes, total chunks to send:{}",
logName(), size, totalChunks);
}
protected RaftActorBehavior internalSwitchBehavior(RaftState newState) {
}
protected RaftActorBehavior internalSwitchBehavior(RaftState newState) {
- if(context.getRaftPolicy().automaticElectionsEnabled()){
- return internalSwitchBehavior(createBehavior(context, newState));
- }
- return this;
+ return internalSwitchBehavior(createBehavior(context, newState));
}
protected RaftActorBehavior internalSwitchBehavior(RaftActorBehavior newBehavior) {
}
protected RaftActorBehavior internalSwitchBehavior(RaftActorBehavior newBehavior) {
+ if(!context.getRaftPolicy().automaticElectionsEnabled()) {
+ return this;
+ }
+
LOG.info("{} :- Switching from behavior {} to {}", logName(), this.state(), newBehavior.state());
try {
close();
LOG.info("{} :- Switching from behavior {} to {}", logName(), this.state(), newBehavior.state());
try {
close();
package org.opendaylight.controller.cluster.raft.behaviors;
import akka.actor.ActorRef;
package org.opendaylight.controller.cluster.raft.behaviors;
import akka.actor.ActorRef;
+import javax.annotation.Nullable;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
*
*/
public class IsolatedLeader extends AbstractLeader {
*
*/
public class IsolatedLeader extends AbstractLeader {
+ IsolatedLeader(RaftActorContext context, @Nullable AbstractLeader initializeFromLeader) {
+ super(context, RaftState.IsolatedLeader, initializeFromLeader);
+ }
+
public IsolatedLeader(RaftActorContext context) {
public IsolatedLeader(RaftActorContext context) {
- super(context, RaftState.IsolatedLeader);
}
// we received an Append Entries reply, we should switch the Behavior to Leader
}
// we received an Append Entries reply, we should switch the Behavior to Leader
// changes its state to Follower, hence we only need to switch to Leader if the state is still Isolated
if (ret.state() == RaftState.IsolatedLeader && !isLeaderIsolated()) {
LOG.info("IsolatedLeader {} switching from IsolatedLeader to Leader", getLeaderId());
// changes its state to Follower, hence we only need to switch to Leader if the state is still Isolated
if (ret.state() == RaftState.IsolatedLeader && !isLeaderIsolated()) {
LOG.info("IsolatedLeader {} switching from IsolatedLeader to Leader", getLeaderId());
- return internalSwitchBehavior(RaftState.Leader);
+ return internalSwitchBehavior(new Leader(context, this));
if (isLeaderIsolated()) {
LOG.warn("{}: At least {} followers need to be active, Switching {} from Leader to IsolatedLeader",
context.getId(), getMinIsolatedLeaderPeerCount(), getLeaderId());
if (isLeaderIsolated()) {
LOG.warn("{}: At least {} followers need to be active, Switching {} from Leader to IsolatedLeader",
context.getId(), getMinIsolatedLeaderPeerCount(), getLeaderId());
- return internalSwitchBehavior(RaftState.IsolatedLeader);
+ return internalSwitchBehavior(new IsolatedLeader(context, this));
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker;
import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
import org.mockito.stubbing.Answer;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker;
import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
-import org.opendaylight.controller.cluster.datastore.IntegrationTestKit.ShardStatsVerifier;
-import org.opendaylight.controller.cluster.datastore.MemberNode.RaftStateVerifier;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException;
-import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
-import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
// Wait for the commit to be replicated to the follower.
// Wait for the commit to be replicated to the follower.
- MemberNode.verifyRaftState(followerDistributedDataStore, "cars", new RaftStateVerifier() {
- @Override
- public void verify(OnDemandRaftState raftState) {
- assertEquals("getLastApplied", 0, raftState.getLastApplied());
- }
- });
+ MemberNode.verifyRaftState(followerDistributedDataStore, "cars",
+ raftState -> assertEquals("getLastApplied", 0, raftState.getLastApplied()));
- MemberNode.verifyRaftState(followerDistributedDataStore, "people", new RaftStateVerifier() {
- @Override
- public void verify(OnDemandRaftState raftState) {
- assertEquals("getLastApplied", 0, raftState.getLastApplied());
- }
- });
+ MemberNode.verifyRaftState(followerDistributedDataStore, "people",
+ raftState -> assertEquals("getLastApplied", 0, raftState.getLastApplied()));
// Prepare, ready and canCommit a WO tx that writes to 2 shards. This will become the current tx in
// the leader shard.
// Prepare, ready and canCommit a WO tx that writes to 2 shards. This will become the current tx in
// the leader shard.
cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex)));
readWriteTx.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex)));
readWriteTx.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
- IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", new ShardStatsVerifier() {
- @Override
- public void verify(ShardStats stats) {
- assertEquals("getReadWriteTransactionCount", 1, stats.getReadWriteTransactionCount());
- }
- });
+ IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars",
+ stats -> assertEquals("getReadWriteTransactionCount", 1, stats.getReadWriteTransactionCount()));
// Disable elections on the leader so it switches to follower.
// Disable elections on the leader so it switches to follower.
writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
- IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", new ShardStatsVerifier() {
- @Override
- public void verify(ShardStats stats) {
- assertEquals("getTxCohortCacheSize", 1, stats.getTxCohortCacheSize());
- }
- });
+ IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars",
+ stats -> assertEquals("getTxCohortCacheSize", 1, stats.getTxCohortCacheSize()));
writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
writeTx.write(CarsModel.newCarPath("optima"), car);
DOMStoreThreePhaseCommitCohort cohort2 = writeTx.ready();
writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
writeTx.write(CarsModel.newCarPath("optima"), car);
DOMStoreThreePhaseCommitCohort cohort2 = writeTx.ready();
- IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", new ShardStatsVerifier() {
- @Override
- public void verify(ShardStats stats) {
- assertEquals("getTxCohortCacheSize", 2, stats.getTxCohortCacheSize());
- }
- });
+ IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars",
+ stats -> assertEquals("getTxCohortCacheSize", 2, stats.getTxCohortCacheSize()));
// Gracefully stop the leader via a Shutdown message.
// Gracefully stop the leader via a Shutdown message.
@Test
public void testTransactionWithIsolatedLeader() throws Throwable {
@Test
public void testTransactionWithIsolatedLeader() throws Throwable {
- leaderDatastoreContextBuilder.shardIsolatedLeaderCheckIntervalInMillis(200);
+ // Set the isolated leader check interval high so we can control the switch to IsolatedLeader.
+ leaderDatastoreContextBuilder.shardIsolatedLeaderCheckIntervalInMillis(10000000);
String testName = "testTransactionWithIsolatedLeader";
initDatastoresWithCars(testName);
String testName = "testTransactionWithIsolatedLeader";
initDatastoresWithCars(testName);
- DOMStoreWriteTransaction failWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction();
- failWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+ // Tx that is submitted after the follower is stopped but before the leader transitions to IsolatedLeader.
+ DOMStoreWriteTransaction preIsolatedLeaderWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction();
+ preIsolatedLeaderWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+ // Tx that is submitted after the leader transitions to IsolatedLeader.
+ DOMStoreWriteTransaction noShardLeaderWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction();
+ noShardLeaderWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+
+ // Tx that is submitted after the follower is reinstated.
DOMStoreWriteTransaction successWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction();
successWriteTx.merge(CarsModel.BASE_PATH, CarsModel.emptyContainer());
DOMStoreWriteTransaction successWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction();
successWriteTx.merge(CarsModel.BASE_PATH, CarsModel.emptyContainer());
followerTestKit.watch(followerDistributedDataStore.getActorContext().getShardManager());
followerDistributedDataStore.close();
followerTestKit.expectTerminated(followerDistributedDataStore.getActorContext().getShardManager());
followerTestKit.watch(followerDistributedDataStore.getActorContext().getShardManager());
followerDistributedDataStore.close();
followerTestKit.expectTerminated(followerDistributedDataStore.getActorContext().getShardManager());
- MemberNode.verifyRaftState(leaderDistributedDataStore, "cars", new RaftStateVerifier() {
- @Override
- public void verify(OnDemandRaftState raftState) {
- assertEquals("getRaftState", "IsolatedLeader", raftState.getRaftState());
- }
- });
+ // Submit the preIsolatedLeaderWriteTx so it's pending
+ DOMStoreThreePhaseCommitCohort preIsolatedLeaderTxCohort = preIsolatedLeaderWriteTx.ready();
+
+ // Change the isolated leader check interval low so it changes to IsolatedLeader.
+ sendDatastoreContextUpdate(leaderDistributedDataStore, leaderDatastoreContextBuilder.
+ shardIsolatedLeaderCheckIntervalInMillis(200));
+
+ MemberNode.verifyRaftState(leaderDistributedDataStore, "cars",
+ raftState -> assertEquals("getRaftState", "IsolatedLeader", raftState.getRaftState()));
- leaderTestKit.doCommit(failWriteTx.ready());
+ leaderTestKit.doCommit(noShardLeaderWriteTx.ready());
fail("Expected NoShardLeaderException");
} catch (ExecutionException e) {
assertEquals("getCause", NoShardLeaderException.class, e.getCause().getClass());
fail("Expected NoShardLeaderException");
} catch (ExecutionException e) {
assertEquals("getCause", NoShardLeaderException.class, e.getCause().getClass());
sendDatastoreContextUpdate(leaderDistributedDataStore, leaderDatastoreContextBuilder.
shardElectionTimeoutFactor(100));
sendDatastoreContextUpdate(leaderDistributedDataStore, leaderDatastoreContextBuilder.
shardElectionTimeoutFactor(100));
- DOMStoreThreePhaseCommitCohort writeTxCohort = successWriteTx.ready();
+ DOMStoreThreePhaseCommitCohort successTxCohort = successWriteTx.ready();
followerDistributedDataStore = followerTestKit.setupDistributedDataStore(testName,
MODULE_SHARDS_CARS_ONLY_1_2, false, CARS);
followerDistributedDataStore = followerTestKit.setupDistributedDataStore(testName,
MODULE_SHARDS_CARS_ONLY_1_2, false, CARS);
- leaderTestKit.doCommit(writeTxCohort);
+ leaderTestKit.doCommit(preIsolatedLeaderTxCohort);
+ leaderTestKit.doCommit(successTxCohort);
}
@Test(expected=AskTimeoutException.class)
}
@Test(expected=AskTimeoutException.class)
private static void sendDatastoreContextUpdate(DistributedDataStore dataStore, final Builder builder) {
final Builder newBuilder = DatastoreContext.newBuilderFrom(builder.build());
DatastoreContextFactory mockContextFactory = Mockito.mock(DatastoreContextFactory.class);
private static void sendDatastoreContextUpdate(DistributedDataStore dataStore, final Builder builder) {
final Builder newBuilder = DatastoreContext.newBuilderFrom(builder.build());
DatastoreContextFactory mockContextFactory = Mockito.mock(DatastoreContextFactory.class);
- Answer<DatastoreContext> answer = new Answer<DatastoreContext>() {
- @Override
- public DatastoreContext answer(InvocationOnMock invocation) {
- return newBuilder.build();
- }
- };
+ Answer<DatastoreContext> answer = invocation -> newBuilder.build();
Mockito.doAnswer(answer).when(mockContextFactory).getBaseDatastoreContext();
Mockito.doAnswer(answer).when(mockContextFactory).getShardDatastoreContext(Mockito.anyString());
dataStore.onDatastoreContextUpdated(mockContextFactory);
Mockito.doAnswer(answer).when(mockContextFactory).getBaseDatastoreContext();
Mockito.doAnswer(answer).when(mockContextFactory).getShardDatastoreContext(Mockito.anyString());
dataStore.onDatastoreContextUpdated(mockContextFactory);