import akka.pattern.Patterns;
import akka.testkit.JavaTestKit;
import com.google.common.base.Optional;
+import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
+import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import org.junit.After;
import org.junit.Assume;
import org.junit.Before;
initDatastoresWithCars(testName);
final String followerCarShardName = "member-2-shard-cars-" + testName;
- InMemoryJournal.addWriteMessagesCompleteLatch(followerCarShardName, 2, ApplyJournalEntries.class);
DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
assertNotNull("newWriteOnlyTransaction returned null", writeTx);
// Re-instate the follower member 2 as a single-node to verify replication and recovery.
- InMemoryJournal.waitForWriteMessagesComplete(followerCarShardName);
+ // The following is a bit tricky. Before we reinstate the follower we need to ensure it has persisted and
+ // applied and all the log entries from the leader. Since we've verified the car data above we know that
+ // all the transactions have been applied on the leader so we first read and capture its lastAppliedIndex.
+ final AtomicLong leaderLastAppliedIndex = new AtomicLong();
+ IntegrationTestKit.verifyShardState(leaderDistributedDataStore, CARS[0],
+ state -> leaderLastAppliedIndex.set(state.getLastApplied()));
+
+ // Now we need to make sure the follower has persisted the leader's lastAppliedIndex via ApplyJournalEntries.
+ // However we don't know exactly how many ApplyJournalEntries messages there will be as it can differ between
+ // the tell-based and ask-based front-ends. For ask-based there will be exactly 2 ApplyJournalEntries but
+ // tell-based persists additional payloads which could be replicated and applied in a batch resulting in
+ // either 2 or 3 ApplyJournalEntries. To handle this we read the follower's persisted ApplyJournalEntries
+ // until we find the one that encompasses the leader's lastAppliedIndex.
+ Stopwatch sw = Stopwatch.createStarted();
+ boolean done = false;
+ while (!done) {
+ final List<ApplyJournalEntries> entries = InMemoryJournal.get(followerCarShardName,
+ ApplyJournalEntries.class);
+ for (ApplyJournalEntries aje: entries) {
+ if (aje.getToIndex() >= leaderLastAppliedIndex.get()) {
+ done = true;
+ break;
+ }
+ }
+
+ assertTrue("Follower did not persist ApplyJournalEntries containing leader's lastAppliedIndex "
+ + leaderLastAppliedIndex + ". Entries persisted: " + entries, sw.elapsed(TimeUnit.SECONDS) <= 5);
+
+ Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+ }
- JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
- JavaTestKit.shutdownActorSystem(followerSystem, null, true);
+ JavaTestKit.shutdownActorSystem(leaderSystem, null, Boolean.TRUE);
+ JavaTestKit.shutdownActorSystem(followerSystem, null, Boolean.TRUE);
- final ActorSystem newSystem = ActorSystem.create("reinstated-member2", ConfigFactory.load()
- .getConfig("Member2"));
+ final ActorSystem newSystem = newActorSystem("reinstated-member2", "Member2");
try (final AbstractDataStore member2Datastore = new IntegrationTestKit(newSystem, leaderDatastoreContextBuilder)
- .setupAbstractDataStore(testParameter, testName, "module-shards-member2", true, CARS_AND_PEOPLE)) {
+ .setupAbstractDataStore(testParameter, testName, "module-shards-member2", true, CARS)) {
verifyCars(member2Datastore.newReadOnlyTransaction(), car2);
}
-
- JavaTestKit.shutdownActorSystem(newSystem);
}
@Test