}
protected boolean isLeaderActive() {
- return currentBehavior.state() != RaftState.IsolatedLeader && !shuttingDown && leadershipTransferInProgress == null;
+ return currentBehavior.state() != RaftState.IsolatedLeader && !shuttingDown &&
+ !isLeadershipTransferInProgress();
+ }
+
+ private boolean isLeadershipTransferInProgress() {
+ return leadershipTransferInProgress != null && leadershipTransferInProgress.isTransferring();
}
/**
private final List<OnComplete> onCompleteCallbacks = new ArrayList<>();
private long newLeaderTimeoutInMillis = 2000;
private final Stopwatch transferTimer = Stopwatch.createUnstarted();
+ private boolean isTransferring;
RaftActorLeadershipTransferCohort(RaftActor raftActor, ActorRef replyTo) {
this.raftActor = raftActor;
RaftActorBehavior behavior = raftActor.getCurrentBehavior();
// Sanity check...
if(behavior instanceof Leader) {
+ isTransferring = true;
((Leader)behavior).transferLeadership(this);
} else {
LOG.debug("{}: No longer the leader - skipping transfer", raftActor.persistenceId());
}
private void finish(boolean success) {
+ isTransferring = false;
if(transferTimer.isRunning()) {
transferTimer.stop();
if(success) {
onCompleteCallbacks.add(onComplete);
}
+ boolean isTransferring() {
+ return isTransferring;
+ }
+
@VisibleForTesting
void setNewLeaderTimeoutInMillis(long newLeaderTimeoutInMillis) {
this.newLeaderTimeoutInMillis = newLeaderTimeoutInMillis;
return commitCoordinator.getQueueSize();
}
+ public int getCohortCacheSize() {
+ return commitCoordinator.getCohortCacheSize();
+ }
+
@Override
protected Optional<ActorRef> getRoleChangeNotifier() {
return roleChangeNotifier;
// the primary/leader shard. However with timing and caching on the front-end, there's a small
// window where it could have a stale leader during leadership transitions.
//
- boolean isIsolatedLeader = isIsolatedLeader();
- if (isLeader() && !isIsolatedLeader) {
+ boolean isLeaderActive = isLeaderActive();
+ if (isLeader() && isLeaderActive) {
handleBatchedModificationsLocal(batched, getSender());
} else {
ActorSelection leader = getLeader();
- if (isIsolatedLeader || leader == null) {
+ if (!isLeaderActive || leader == null) {
messageRetrySupport.addMessageToRetry(batched, getSender(),
"Could not commit transaction " + batched.getTransactionID());
} else {
private void handleReadyLocalTransaction(final ReadyLocalTransaction message) {
LOG.debug("{}: handleReadyLocalTransaction for {}", persistenceId(), message.getTransactionID());
- boolean isIsolatedLeader = isIsolatedLeader();
- if (isLeader() && !isIsolatedLeader) {
+ boolean isLeaderActive = isLeaderActive();
+ if (isLeader() && isLeaderActive) {
try {
commitCoordinator.handleReadyLocalTransaction(message, getSender(), this);
} catch (Exception e) {
}
} else {
ActorSelection leader = getLeader();
- if (isIsolatedLeader || leader == null) {
+ if (!isLeaderActive || leader == null) {
messageRetrySupport.addMessageToRetry(message, getSender(),
"Could not commit transaction " + message.getTransactionID());
} else {
private void handleForwardedReadyTransaction(ForwardedReadyTransaction forwardedReady) {
LOG.debug("{}: handleForwardedReadyTransaction for {}", persistenceId(), forwardedReady.getTransactionID());
- boolean isIsolatedLeader = isIsolatedLeader();
- if (isLeader() && !isIsolatedLeader) {
+ boolean isLeaderActive = isLeaderActive();
+ if (isLeader() && isLeaderActive) {
commitCoordinator.handleForwardedReadyTransaction(forwardedReady, getSender(), this);
} else {
ActorSelection leader = getLeader();
- if (isIsolatedLeader || leader == null) {
+ if (!isLeaderActive || leader == null) {
messageRetrySupport.addMessageToRetry(forwardedReady, getSender(),
"Could not commit transaction " + forwardedReady.getTransactionID());
} else {
}
}
+ @Override
+ protected void pauseLeader(Runnable operation) {
+ LOG.debug("{}: In pauseLeader, operation: {}", persistenceId(), operation);
+ commitCoordinator.setRunOnPendingTransactionsComplete(operation);
+ }
+
@Override
public String persistenceId() {
return this.name;
private ReadyTransactionReply readyTransactionReply;
+ private Runnable runOnPendingTransactionsComplete;
+
ShardCommitCoordinator(ShardDataTree dataTree,
long cacheExpiryTimeoutInMillis, int queueCapacity, Logger log, String name) {
return queuedCohortEntries.size();
}
+ int getCohortCacheSize() {
+ return cohortCache.size();
+ }
+
void setQueueCapacity(int queueCapacity) {
this.queueCapacity = queueCapacity;
}
iter.remove();
cohortCache.remove(next.getTransactionID());
}
+
+ maybeRunOperationOnPendingTransactionsComplete();
}
void cleanupExpiredCohortEntries() {
maybeProcessNextCohortEntry();
}
+ void setRunOnPendingTransactionsComplete(Runnable operation) {
+ runOnPendingTransactionsComplete = operation;
+ maybeRunOperationOnPendingTransactionsComplete();
+ }
+
+ private void maybeRunOperationOnPendingTransactionsComplete() {
+ if(runOnPendingTransactionsComplete != null && currentCohortEntry == null && queuedCohortEntries.isEmpty()) {
+ log.debug("{}: Pending transactions complete - running operation {}", name, runOnPendingTransactionsComplete);
+
+ runOnPendingTransactionsComplete.run();
+ runOnPendingTransactionsComplete = null;
+ }
+ }
+
@VisibleForTesting
void setCohortDecorator(CohortDecorator cohortDecorator) {
this.cohortDecorator = cohortDecorator;
return shard.getPendingTxCommitQueueSize();
}
+ public int getTxCohortCacheSize() {
+ return shard.getCohortCacheSize();
+ }
+
@Override
public void captureSnapshot() {
if(shard != null) {
int getPendingTxCommitQueueSize();
- void captureSnapshot();
+ int getTxCohortCacheSize();
+ void captureSnapshot();
}
import akka.cluster.Cluster;
import akka.dispatch.Futures;
import akka.pattern.AskTimeoutException;
+import akka.pattern.Patterns;
import akka.testkit.JavaTestKit;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
+import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder;
import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
/**
* End-to-end distributed data store tests that exercise remote shards and transactions.
verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2);
}
+ @Test
+ public void testLeadershipTransferOnShutdown() throws Exception {
+ leaderDatastoreContextBuilder.shardBatchedModificationCount(1);
+ followerDatastoreContextBuilder.shardElectionTimeoutFactor(10).customRaftPolicyImplementation(null);
+ String testName = "testLeadershipTransferOnShutdown";
+ initDatastores(testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, CARS_AND_PEOPLE);
+
+ IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System, followerDatastoreContextBuilder);
+ DistributedDataStore follower2DistributedDataStore = follower2TestKit.setupDistributedDataStore(testName,
+ MODULE_SHARDS_CARS_PEOPLE_1_2_3, false);
+
+ // Create and submit a couple tx's so they're pending.
+
+ DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
+ writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+ writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
+ writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
+ DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
+
+ IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", new ShardStatsVerifier() {
+ @Override
+ public void verify(ShardStats stats) {
+ assertEquals("getTxCohortCacheSize", 1, stats.getTxCohortCacheSize());
+ }
+ });
+
+ writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
+ MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
+ writeTx.write(CarsModel.newCarPath("optima"), car);
+ DOMStoreThreePhaseCommitCohort cohort2 = writeTx.ready();
+
+ IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", new ShardStatsVerifier() {
+ @Override
+ public void verify(ShardStats stats) {
+ assertEquals("getTxCohortCacheSize", 2, stats.getTxCohortCacheSize());
+ }
+ });
+
+ // Gracefully stop the leader via a Shutdown message.
+
+ FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS);
+ Future<ActorRef> future = leaderDistributedDataStore.getActorContext().findLocalShardAsync("cars");
+ ActorRef leaderActor = Await.result(future, duration);
+
+ Future<Boolean> stopFuture = Patterns.gracefulStop(leaderActor, duration, new Shutdown());
+
+ // Commit the 2 transactions. They should finish and succeed.
+
+ followerTestKit.doCommit(cohort1);
+ followerTestKit.doCommit(cohort2);
+
+ // Wait for the leader actor stopped.
+
+ Boolean stopped = Await.result(stopFuture, duration);
+ assertEquals("Stopped", Boolean.TRUE, stopped);
+
+ // Verify leadership was transferred by reading the committed data from the other nodes.
+
+ verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car);
+ verifyCars(follower2DistributedDataStore.newReadOnlyTransaction(), car);
+ }
+
@Test
public void testTransactionWithIsolatedLeader() throws Throwable {
leaderDatastoreContextBuilder.shardIsolatedLeaderCheckIntervalInMillis(200);