import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker;
import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
-import org.opendaylight.controller.cluster.datastore.IntegrationTestKit.ShardStatsVerifier;
-import org.opendaylight.controller.cluster.datastore.MemberNode.RaftStateVerifier;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException;
-import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
-import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
+import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
// Wait for the commit to be replicated to the follower.
- MemberNode.verifyRaftState(followerDistributedDataStore, "cars", new RaftStateVerifier() {
- @Override
- public void verify(OnDemandRaftState raftState) {
- assertEquals("getLastApplied", 0, raftState.getLastApplied());
- }
- });
+ MemberNode.verifyRaftState(followerDistributedDataStore, "cars",
+ raftState -> assertEquals("getLastApplied", 0, raftState.getLastApplied()));
- MemberNode.verifyRaftState(followerDistributedDataStore, "people", new RaftStateVerifier() {
- @Override
- public void verify(OnDemandRaftState raftState) {
- assertEquals("getLastApplied", 0, raftState.getLastApplied());
- }
- });
+ MemberNode.verifyRaftState(followerDistributedDataStore, "people",
+ raftState -> assertEquals("getLastApplied", 0, raftState.getLastApplied()));
// Prepare, ready and canCommit a WO tx that writes to 2 shards. This will become the current tx in
// the leader shard.
cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex)));
readWriteTx.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
- IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", new ShardStatsVerifier() {
- @Override
- public void verify(ShardStats stats) {
- assertEquals("getReadWriteTransactionCount", 1, stats.getReadWriteTransactionCount());
- }
- });
+ IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars",
+ stats -> assertEquals("getReadWriteTransactionCount", 1, stats.getReadWriteTransactionCount()));
// Disable elections on the leader so it switches to follower.
writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
- IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", new ShardStatsVerifier() {
- @Override
- public void verify(ShardStats stats) {
- assertEquals("getTxCohortCacheSize", 1, stats.getTxCohortCacheSize());
- }
- });
+ IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars",
+ stats -> assertEquals("getTxCohortCacheSize", 1, stats.getTxCohortCacheSize()));
writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
writeTx.write(CarsModel.newCarPath("optima"), car);
DOMStoreThreePhaseCommitCohort cohort2 = writeTx.ready();
- IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", new ShardStatsVerifier() {
- @Override
- public void verify(ShardStats stats) {
- assertEquals("getTxCohortCacheSize", 2, stats.getTxCohortCacheSize());
- }
- });
+ IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars",
+ stats -> assertEquals("getTxCohortCacheSize", 2, stats.getTxCohortCacheSize()));
// Gracefully stop the leader via a Shutdown message.
@Test
public void testTransactionWithIsolatedLeader() throws Throwable {
- leaderDatastoreContextBuilder.shardIsolatedLeaderCheckIntervalInMillis(200);
+ // Set the isolated leader check interval high so we can control the switch to IsolatedLeader.
+ leaderDatastoreContextBuilder.shardIsolatedLeaderCheckIntervalInMillis(10000000);
String testName = "testTransactionWithIsolatedLeader";
initDatastoresWithCars(testName);
- DOMStoreWriteTransaction failWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction();
- failWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+ // Tx that is submitted after the follower is stopped but before the leader transitions to IsolatedLeader.
+ DOMStoreWriteTransaction preIsolatedLeaderWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction();
+ preIsolatedLeaderWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+ // Tx that is submitted after the leader transitions to IsolatedLeader.
+ DOMStoreWriteTransaction noShardLeaderWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction();
+ noShardLeaderWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+
+ // Tx that is submitted after the follower is reinstated.
DOMStoreWriteTransaction successWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction();
successWriteTx.merge(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+ // Stop the follower
followerTestKit.watch(followerDistributedDataStore.getActorContext().getShardManager());
followerDistributedDataStore.close();
followerTestKit.expectTerminated(followerDistributedDataStore.getActorContext().getShardManager());
- MemberNode.verifyRaftState(leaderDistributedDataStore, "cars", new RaftStateVerifier() {
- @Override
- public void verify(OnDemandRaftState raftState) {
- assertEquals("getRaftState", "IsolatedLeader", raftState.getRaftState());
- }
- });
+ // Submit the preIsolatedLeaderWriteTx so it's pending
+ DOMStoreThreePhaseCommitCohort preIsolatedLeaderTxCohort = preIsolatedLeaderWriteTx.ready();
+
+ // Change the isolated leader check interval low so it changes to IsolatedLeader.
+ sendDatastoreContextUpdate(leaderDistributedDataStore, leaderDatastoreContextBuilder.
+ shardIsolatedLeaderCheckIntervalInMillis(200));
+
+ MemberNode.verifyRaftState(leaderDistributedDataStore, "cars",
+ raftState -> assertEquals("getRaftState", "IsolatedLeader", raftState.getRaftState()));
try {
- leaderTestKit.doCommit(failWriteTx.ready());
+ leaderTestKit.doCommit(noShardLeaderWriteTx.ready());
fail("Expected NoShardLeaderException");
} catch (ExecutionException e) {
assertEquals("getCause", NoShardLeaderException.class, e.getCause().getClass());
sendDatastoreContextUpdate(leaderDistributedDataStore, leaderDatastoreContextBuilder.
shardElectionTimeoutFactor(100));
- DOMStoreThreePhaseCommitCohort writeTxCohort = successWriteTx.ready();
+ DOMStoreThreePhaseCommitCohort successTxCohort = successWriteTx.ready();
followerDistributedDataStore = followerTestKit.setupDistributedDataStore(testName,
MODULE_SHARDS_CARS_ONLY_1_2, false, CARS);
- leaderTestKit.doCommit(writeTxCohort);
+ leaderTestKit.doCommit(preIsolatedLeaderTxCohort);
+ leaderTestKit.doCommit(successTxCohort);
}
@Test(expected=AskTimeoutException.class)
private static void sendDatastoreContextUpdate(DistributedDataStore dataStore, final Builder builder) {
final Builder newBuilder = DatastoreContext.newBuilderFrom(builder.build());
DatastoreContextFactory mockContextFactory = Mockito.mock(DatastoreContextFactory.class);
- Answer<DatastoreContext> answer = new Answer<DatastoreContext>() {
- @Override
- public DatastoreContext answer(InvocationOnMock invocation) {
- return newBuilder.build();
- }
- };
+ Answer<DatastoreContext> answer = invocation -> newBuilder.build();
Mockito.doAnswer(answer).when(mockContextFactory).getBaseDatastoreContext();
Mockito.doAnswer(answer).when(mockContextFactory).getShardDatastoreContext(Mockito.anyString());
dataStore.onDatastoreContextUpdated(mockContextFactory);