/*
- * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ * Copyright (c) 2015, 2017 Brocade Communications Systems, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
import akka.actor.AddressFromURIString;
import akka.cluster.Cluster;
import akka.dispatch.Futures;
-import akka.pattern.AskTimeoutException;
import akka.pattern.Patterns;
import akka.testkit.JavaTestKit;
import com.google.common.base.Optional;
+import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
+import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import org.junit.After;
import org.junit.Assume;
import org.junit.Before;
import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
+import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
@Parameters(name = "{0}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
- { DistributedDataStore.class }, { ClientBackedDataStore.class }
+ { DistributedDataStore.class, 7}, { ClientBackedDataStore.class, 60 }
});
}
- @Parameter
+ @Parameter(0)
public Class<? extends AbstractDataStore> testParameter;
+ @Parameter(1)
+ public int commitTimeout;
private static final String[] CARS_AND_PEOPLE = {"cars", "people"};
private static final String[] CARS = {"cars"};
private void initDatastores(final String type, final String moduleShardsConfig, final String[] shards)
throws Exception {
- leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder);
+ leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder, commitTimeout);
leaderDistributedDataStore = leaderTestKit.setupAbstractDataStore(
testParameter, type, moduleShardsConfig, false, shards);
- followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder);
+ followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder, commitTimeout);
followerDistributedDataStore = followerTestKit.setupAbstractDataStore(
testParameter, type, moduleShardsConfig, false, shards);
leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), shards);
+
+ leaderTestKit.waitForMembersUp("member-2");
+ followerTestKit.waitForMembersUp("member-1");
}
private static void verifyCars(final DOMStoreReadTransaction readTx, final MapEntryNode... entries)
initDatastoresWithCars(testName);
final String followerCarShardName = "member-2-shard-cars-" + testName;
- InMemoryJournal.addWriteMessagesCompleteLatch(followerCarShardName, 2, ApplyJournalEntries.class);
DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
assertNotNull("newWriteOnlyTransaction returned null", writeTx);
// Re-instate the follower member 2 as a single-node to verify replication and recovery.
- InMemoryJournal.waitForWriteMessagesComplete(followerCarShardName);
+ // The following is a bit tricky. Before we reinstate the follower we need to ensure it has persisted and
+ // applied and all the log entries from the leader. Since we've verified the car data above we know that
+ // all the transactions have been applied on the leader so we first read and capture its lastAppliedIndex.
+ final AtomicLong leaderLastAppliedIndex = new AtomicLong();
+ IntegrationTestKit.verifyShardState(leaderDistributedDataStore, CARS[0],
+ state -> leaderLastAppliedIndex.set(state.getLastApplied()));
+
+ // Now we need to make sure the follower has persisted the leader's lastAppliedIndex via ApplyJournalEntries.
+ // However we don't know exactly how many ApplyJournalEntries messages there will be as it can differ between
+ // the tell-based and ask-based front-ends. For ask-based there will be exactly 2 ApplyJournalEntries but
+ // tell-based persists additional payloads which could be replicated and applied in a batch resulting in
+ // either 2 or 3 ApplyJournalEntries. To handle this we read the follower's persisted ApplyJournalEntries
+ // until we find the one that encompasses the leader's lastAppliedIndex.
+ Stopwatch sw = Stopwatch.createStarted();
+ boolean done = false;
+ while (!done) {
+ final List<ApplyJournalEntries> entries = InMemoryJournal.get(followerCarShardName,
+ ApplyJournalEntries.class);
+ for (ApplyJournalEntries aje: entries) {
+ if (aje.getToIndex() >= leaderLastAppliedIndex.get()) {
+ done = true;
+ break;
+ }
+ }
+
+ assertTrue("Follower did not persist ApplyJournalEntries containing leader's lastAppliedIndex "
+ + leaderLastAppliedIndex + ". Entries persisted: " + entries, sw.elapsed(TimeUnit.SECONDS) <= 5);
+
+ Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+ }
- JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
- JavaTestKit.shutdownActorSystem(followerSystem, null, true);
+ JavaTestKit.shutdownActorSystem(leaderSystem, null, Boolean.TRUE);
+ JavaTestKit.shutdownActorSystem(followerSystem, null, Boolean.TRUE);
- final ActorSystem newSystem = ActorSystem.create("reinstated-member2", ConfigFactory.load()
- .getConfig("Member2"));
+ final ActorSystem newSystem = newActorSystem("reinstated-member2", "Member2");
- try (final AbstractDataStore member2Datastore = new IntegrationTestKit(newSystem, leaderDatastoreContextBuilder)
- .setupAbstractDataStore(testParameter, testName, "module-shards-member2", true, CARS_AND_PEOPLE)) {
+ try (AbstractDataStore member2Datastore = new IntegrationTestKit(newSystem, leaderDatastoreContextBuilder,
+ commitTimeout)
+ .setupAbstractDataStore(testParameter, testName, "module-shards-member2", true, CARS)) {
verifyCars(member2Datastore.newReadOnlyTransaction(), car2);
}
-
- JavaTestKit.shutdownActorSystem(newSystem);
}
@Test
@Test
public void testSingleShardTransactionsWithLeaderChanges() throws Exception {
- //TODO remove when test passes also for ClientBackedDataStore
- Assume.assumeTrue(testParameter.equals(DistributedDataStore.class));
final String testName = "testSingleShardTransactionsWithLeaderChanges";
initDatastoresWithCars(testName);
final DatastoreContext.Builder newMember1Builder = DatastoreContext.newBuilder()
.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5);
- IntegrationTestKit newMember1TestKit = new IntegrationTestKit(leaderSystem, newMember1Builder);
+ IntegrationTestKit newMember1TestKit = new IntegrationTestKit(leaderSystem, newMember1Builder, commitTimeout);
- try (final AbstractDataStore ds =
+ try (AbstractDataStore ds =
newMember1TestKit.setupAbstractDataStore(
testParameter, testName, MODULE_SHARDS_CARS_ONLY_1_2, false, CARS)) {
.customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName())
.shardElectionTimeoutFactor(10));
- Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS);
leaderTestKit.waitUntilNoLeader(leaderDistributedDataStore.getActorContext(), "cars");
// Submit all tx's - the messages should get queued for retry.
sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder
.customRaftPolicyImplementation(null).shardElectionTimeoutFactor(1));
+ IntegrationTestKit.findLocalShard(followerDistributedDataStore.getActorContext(), "cars")
+ .tell(TimeoutNow.INSTANCE, ActorRef.noSender());
+ IntegrationTestKit.findLocalShard(followerDistributedDataStore.getActorContext(), "people")
+ .tell(TimeoutNow.INSTANCE, ActorRef.noSender());
followerTestKit.doCommit(writeTx1CanCommit, writeTx1Cohort);
followerTestKit.doCommit(writeTx2CanCommit, writeTx2Cohort);
initDatastores(testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, CARS_AND_PEOPLE);
final IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System,
- DatastoreContext.newBuilderFrom(followerDatastoreContextBuilder.build()).operationTimeoutInMillis(100));
- try (final AbstractDataStore follower2DistributedDataStore = follower2TestKit.setupAbstractDataStore(
+ DatastoreContext.newBuilderFrom(followerDatastoreContextBuilder.build()).operationTimeoutInMillis(100),
+ commitTimeout);
+ try (AbstractDataStore follower2DistributedDataStore = follower2TestKit.setupAbstractDataStore(
testParameter, testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, false)) {
+ followerTestKit.waitForMembersUp("member-3");
+ follower2TestKit.waitForMembersUp("member-1", "member-2");
+
// Create and submit a couple tx's so they're pending.
DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
leaderTestKit.doCommit(successTxCohort);
}
- @Test(expected = AskTimeoutException.class)
+ @Test
public void testTransactionWithShardLeaderNotResponding() throws Exception {
- //TODO remove when test passes also for ClientBackedDataStore
- Assume.assumeTrue(testParameter.equals(DistributedDataStore.class));
followerDatastoreContextBuilder.shardElectionTimeoutFactor(50);
initDatastoresWithCars("testTransactionWithShardLeaderNotResponding");
try {
followerTestKit.doCommit(rwTx.ready());
+ fail("Exception expected");
} catch (final ExecutionException e) {
- assertTrue("Expected ShardLeaderNotRespondingException cause. Actual: " + e.getCause(),
- e.getCause() instanceof ShardLeaderNotRespondingException);
- assertNotNull("Expected a nested cause", e.getCause().getCause());
- Throwables.propagateIfInstanceOf(e.getCause().getCause(), Exception.class);
- Throwables.propagate(e.getCause().getCause());
+ final String msg = "Unexpected exception: " + Throwables.getStackTraceAsString(e.getCause());
+ assertTrue(msg, Throwables.getRootCause(e) instanceof NoShardLeaderException
+ || e.getCause() instanceof ShardLeaderNotRespondingException);
}
}
- @Test(expected = NoShardLeaderException.class)
+ @Test
public void testTransactionWithCreateTxFailureDueToNoLeader() throws Exception {
- //TODO remove when test passes also for ClientBackedDataStore
- Assume.assumeTrue(testParameter.equals(DistributedDataStore.class));
initDatastoresWithCars("testTransactionWithCreateTxFailureDueToNoLeader");
// Do an initial read to get the primary shard info cached.
try {
followerTestKit.doCommit(rwTx.ready());
+ fail("Exception expected");
} catch (final ExecutionException e) {
- Throwables.propagateIfInstanceOf(e.getCause(), Exception.class);
- Throwables.propagate(e.getCause());
+ final String msg = "Expected instance of NoShardLeaderException, actual: \n"
+ + Throwables.getStackTraceAsString(e.getCause());
+ assertTrue(msg, Throwables.getRootCause(e) instanceof NoShardLeaderException);
}
}
@Test
public void testTransactionRetryWithInitialAskTimeoutExOnCreateTx() throws Exception {
- //TODO remove when test passes also for ClientBackedDataStore
- Assume.assumeTrue(testParameter.equals(DistributedDataStore.class));
String testName = "testTransactionRetryWithInitialAskTimeoutExOnCreateTx";
initDatastores(testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, CARS);
final DatastoreContext.Builder follower2DatastoreContextBuilder = DatastoreContext.newBuilder()
.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5);
final IntegrationTestKit follower2TestKit = new IntegrationTestKit(
- follower2System, follower2DatastoreContextBuilder);
+ follower2System, follower2DatastoreContextBuilder, commitTimeout);
- try (final AbstractDataStore ds =
+ try (AbstractDataStore ds =
follower2TestKit.setupAbstractDataStore(
testParameter, testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, false, CARS)) {