X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FDistributedDataStoreRemotingIntegrationTest.java;h=c52e4a39a2bd845a1952051bd1ae774f96b00311;hp=da8c7cb725aad6d4b929d7c300261cff260a4920;hb=e345c2a17f737d537cda45b0f737dff417e3b359;hpb=1a6462c3cdb2cd310ab9503beb18da70f6e3779d diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java index da8c7cb725..c52e4a39a2 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * Copyright (c) 2015, 2017 Brocade Communications Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, @@ -23,10 +23,10 @@ import akka.actor.Address; import akka.actor.AddressFromURIString; import akka.cluster.Cluster; import akka.dispatch.Futures; -import akka.pattern.AskTimeoutException; import akka.pattern.Patterns; import akka.testkit.JavaTestKit; import com.google.common.base.Optional; +import com.google.common.base.Stopwatch; import com.google.common.base.Supplier; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; @@ -39,8 +39,10 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.LinkedList; +import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import org.junit.After; import org.junit.Assume; import org.junit.Before; @@ -66,6 +68,7 @@ import org.opendaylight.controller.cluster.datastore.modification.MergeModificat import org.opendaylight.controller.cluster.datastore.modification.WriteModification; import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState; +import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow; import org.opendaylight.controller.cluster.raft.client.messages.Shutdown; import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.persisted.Snapshot; @@ -115,12 +118,14 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Parameters(name = "{0}") public static Collection data() { return Arrays.asList(new Object[][] { - { DistributedDataStore.class }, { ClientBackedDataStore.class } + { DistributedDataStore.class, 7}, { ClientBackedDataStore.class, 60 } }); } - @Parameter + @Parameter(0) public Class testParameter; + @Parameter(1) + public int commitTimeout; private static final String[] CARS_AND_PEOPLE = {"cars", "people"}; private static final String[] CARS = {"cars"}; @@ -194,16 +199,19 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { private void initDatastores(final String type, final String moduleShardsConfig, final String[] shards) throws Exception { - leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder); + leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder, commitTimeout); leaderDistributedDataStore = leaderTestKit.setupAbstractDataStore( testParameter, type, moduleShardsConfig, false, shards); - followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder); + followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder, commitTimeout); followerDistributedDataStore = followerTestKit.setupAbstractDataStore( testParameter, type, moduleShardsConfig, false, shards); leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), shards); + + leaderTestKit.waitForMembersUp("member-2"); + followerTestKit.waitForMembersUp("member-1"); } private static void verifyCars(final DOMStoreReadTransaction readTx, final MapEntryNode... entries) @@ -239,7 +247,6 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { initDatastoresWithCars(testName); final String followerCarShardName = "member-2-shard-cars-" + testName; - InMemoryJournal.addWriteMessagesCompleteLatch(followerCarShardName, 2, ApplyJournalEntries.class); DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction(); assertNotNull("newWriteOnlyTransaction returned null", writeTx); @@ -277,20 +284,47 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { // Re-instate the follower member 2 as a single-node to verify replication and recovery. - InMemoryJournal.waitForWriteMessagesComplete(followerCarShardName); + // The following is a bit tricky. Before we reinstate the follower we need to ensure it has persisted and + // applied and all the log entries from the leader. Since we've verified the car data above we know that + // all the transactions have been applied on the leader so we first read and capture its lastAppliedIndex. + final AtomicLong leaderLastAppliedIndex = new AtomicLong(); + IntegrationTestKit.verifyShardState(leaderDistributedDataStore, CARS[0], + state -> leaderLastAppliedIndex.set(state.getLastApplied())); + + // Now we need to make sure the follower has persisted the leader's lastAppliedIndex via ApplyJournalEntries. + // However we don't know exactly how many ApplyJournalEntries messages there will be as it can differ between + // the tell-based and ask-based front-ends. For ask-based there will be exactly 2 ApplyJournalEntries but + // tell-based persists additional payloads which could be replicated and applied in a batch resulting in + // either 2 or 3 ApplyJournalEntries. To handle this we read the follower's persisted ApplyJournalEntries + // until we find the one that encompasses the leader's lastAppliedIndex. + Stopwatch sw = Stopwatch.createStarted(); + boolean done = false; + while (!done) { + final List entries = InMemoryJournal.get(followerCarShardName, + ApplyJournalEntries.class); + for (ApplyJournalEntries aje: entries) { + if (aje.getToIndex() >= leaderLastAppliedIndex.get()) { + done = true; + break; + } + } + + assertTrue("Follower did not persist ApplyJournalEntries containing leader's lastAppliedIndex " + + leaderLastAppliedIndex + ". Entries persisted: " + entries, sw.elapsed(TimeUnit.SECONDS) <= 5); + + Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); + } - JavaTestKit.shutdownActorSystem(leaderSystem, null, true); - JavaTestKit.shutdownActorSystem(followerSystem, null, true); + JavaTestKit.shutdownActorSystem(leaderSystem, null, Boolean.TRUE); + JavaTestKit.shutdownActorSystem(followerSystem, null, Boolean.TRUE); - final ActorSystem newSystem = ActorSystem.create("reinstated-member2", ConfigFactory.load() - .getConfig("Member2")); + final ActorSystem newSystem = newActorSystem("reinstated-member2", "Member2"); - try (final AbstractDataStore member2Datastore = new IntegrationTestKit(newSystem, leaderDatastoreContextBuilder) - .setupAbstractDataStore(testParameter, testName, "module-shards-member2", true, CARS_AND_PEOPLE)) { + try (AbstractDataStore member2Datastore = new IntegrationTestKit(newSystem, leaderDatastoreContextBuilder, + commitTimeout) + .setupAbstractDataStore(testParameter, testName, "module-shards-member2", true, CARS)) { verifyCars(member2Datastore.newReadOnlyTransaction(), car2); } - - JavaTestKit.shutdownActorSystem(newSystem); } @Test @@ -541,8 +575,6 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Test public void testSingleShardTransactionsWithLeaderChanges() throws Exception { - //TODO remove when test passes also for ClientBackedDataStore - Assume.assumeTrue(testParameter.equals(DistributedDataStore.class)); final String testName = "testSingleShardTransactionsWithLeaderChanges"; initDatastoresWithCars(testName); @@ -575,9 +607,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { final DatastoreContext.Builder newMember1Builder = DatastoreContext.newBuilder() .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5); - IntegrationTestKit newMember1TestKit = new IntegrationTestKit(leaderSystem, newMember1Builder); + IntegrationTestKit newMember1TestKit = new IntegrationTestKit(leaderSystem, newMember1Builder, commitTimeout); - try (final AbstractDataStore ds = + try (AbstractDataStore ds = newMember1TestKit.setupAbstractDataStore( testParameter, testName, MODULE_SHARDS_CARS_ONLY_1_2, false, CARS)) { @@ -814,7 +846,6 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()) .shardElectionTimeoutFactor(10)); - Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS); leaderTestKit.waitUntilNoLeader(leaderDistributedDataStore.getActorContext(), "cars"); // Submit all tx's - the messages should get queued for retry. @@ -829,6 +860,10 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder .customRaftPolicyImplementation(null).shardElectionTimeoutFactor(1)); + IntegrationTestKit.findLocalShard(followerDistributedDataStore.getActorContext(), "cars") + .tell(TimeoutNow.INSTANCE, ActorRef.noSender()); + IntegrationTestKit.findLocalShard(followerDistributedDataStore.getActorContext(), "people") + .tell(TimeoutNow.INSTANCE, ActorRef.noSender()); followerTestKit.doCommit(writeTx1CanCommit, writeTx1Cohort); followerTestKit.doCommit(writeTx2CanCommit, writeTx2Cohort); @@ -851,10 +886,14 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { initDatastores(testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, CARS_AND_PEOPLE); final IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System, - DatastoreContext.newBuilderFrom(followerDatastoreContextBuilder.build()).operationTimeoutInMillis(100)); - try (final AbstractDataStore follower2DistributedDataStore = follower2TestKit.setupAbstractDataStore( + DatastoreContext.newBuilderFrom(followerDatastoreContextBuilder.build()).operationTimeoutInMillis(100), + commitTimeout); + try (AbstractDataStore follower2DistributedDataStore = follower2TestKit.setupAbstractDataStore( testParameter, testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, false)) { + followerTestKit.waitForMembersUp("member-3"); + follower2TestKit.waitForMembersUp("member-1", "member-2"); + // Create and submit a couple tx's so they're pending. DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction(); @@ -957,10 +996,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { leaderTestKit.doCommit(successTxCohort); } - @Test(expected = AskTimeoutException.class) + @Test public void testTransactionWithShardLeaderNotResponding() throws Exception { - //TODO remove when test passes also for ClientBackedDataStore - Assume.assumeTrue(testParameter.equals(DistributedDataStore.class)); followerDatastoreContextBuilder.shardElectionTimeoutFactor(50); initDatastoresWithCars("testTransactionWithShardLeaderNotResponding"); @@ -982,19 +1019,16 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { try { followerTestKit.doCommit(rwTx.ready()); + fail("Exception expected"); } catch (final ExecutionException e) { - assertTrue("Expected ShardLeaderNotRespondingException cause. Actual: " + e.getCause(), - e.getCause() instanceof ShardLeaderNotRespondingException); - assertNotNull("Expected a nested cause", e.getCause().getCause()); - Throwables.propagateIfInstanceOf(e.getCause().getCause(), Exception.class); - Throwables.propagate(e.getCause().getCause()); + final String msg = "Unexpected exception: " + Throwables.getStackTraceAsString(e.getCause()); + assertTrue(msg, Throwables.getRootCause(e) instanceof NoShardLeaderException + || e.getCause() instanceof ShardLeaderNotRespondingException); } } - @Test(expected = NoShardLeaderException.class) + @Test public void testTransactionWithCreateTxFailureDueToNoLeader() throws Exception { - //TODO remove when test passes also for ClientBackedDataStore - Assume.assumeTrue(testParameter.equals(DistributedDataStore.class)); initDatastoresWithCars("testTransactionWithCreateTxFailureDueToNoLeader"); // Do an initial read to get the primary shard info cached. @@ -1019,25 +1053,25 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { try { followerTestKit.doCommit(rwTx.ready()); + fail("Exception expected"); } catch (final ExecutionException e) { - Throwables.propagateIfInstanceOf(e.getCause(), Exception.class); - Throwables.propagate(e.getCause()); + final String msg = "Expected instance of NoShardLeaderException, actual: \n" + + Throwables.getStackTraceAsString(e.getCause()); + assertTrue(msg, Throwables.getRootCause(e) instanceof NoShardLeaderException); } } @Test public void testTransactionRetryWithInitialAskTimeoutExOnCreateTx() throws Exception { - //TODO remove when test passes also for ClientBackedDataStore - Assume.assumeTrue(testParameter.equals(DistributedDataStore.class)); String testName = "testTransactionRetryWithInitialAskTimeoutExOnCreateTx"; initDatastores(testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, CARS); final DatastoreContext.Builder follower2DatastoreContextBuilder = DatastoreContext.newBuilder() .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5); final IntegrationTestKit follower2TestKit = new IntegrationTestKit( - follower2System, follower2DatastoreContextBuilder); + follower2System, follower2DatastoreContextBuilder, commitTimeout); - try (final AbstractDataStore ds = + try (AbstractDataStore ds = follower2TestKit.setupAbstractDataStore( testParameter, testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, false, CARS)) {