FollowerTest.testCaptureSnapshotOnLastEntryInAppendEntries:1152 Persisted journal entries size: [] expected:<1> but was:<0>
The test waits on the deletion of journal entries after the snapshot is saved
to occur and then checks the persistent journal for the remaining
ApplyJournalEntries. But occasionally the persisting of the ApplyJournalEntries
message occurs after the deletion so the assertion fails b/c the
ApplyJournalEntries wasn;t persisted yet. This is a little odd b/c the
sequencing in the raft code is that the ApplyJournalEntries write is done
before the delete so it should also be observed the same way in the
InMemoryJournal, even though it doesn't really matter either way.
To alleviate the problem I added a wait for the ApplyJournalEntries
message in the journal in the 3 similar tests.
I also made a couple other minor changes that I observed while running the
tests.
Change-Id: I67cbb8fd79c91cd1cc23c363b78e7f5e9b9f2bbe
Signed-off-by: Tom Pantelis <tompantelis@gmail.com>
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.Identify;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.Identify;
+import akka.actor.InvalidActorNameException;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.pattern.Patterns;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.pattern.Patterns;
*/
@SuppressWarnings("unchecked")
public <T extends Actor> TestActorRef<T> createTestActor(Props props, String actorId) {
*/
@SuppressWarnings("unchecked")
public <T extends Actor> TestActorRef<T> createTestActor(Props props, String actorId) {
- TestActorRef<T> actorRef = TestActorRef.create(system, props, actorId);
- return (TestActorRef<T>) addActor(actorRef, true);
+ InvalidActorNameException lastError = null;
+ for (int i = 0; i < 10; i++) {
+ try {
+ TestActorRef<T> actorRef = TestActorRef.create(system, props, actorId);
+ return (TestActorRef<T>) addActor(actorRef, true);
+ } catch (InvalidActorNameException e) {
+ lastError = e;
+ Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ throw lastError;
import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
+import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
import org.slf4j.LoggerFactory;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
import org.slf4j.LoggerFactory;
+
+ InMemoryJournal.clear();
+ InMemorySnapshotStore.clear();
}
protected void logStart(String name) {
}
protected void logStart(String name) {
- LoggerFactory.getLogger(LeaderTest.class).info("Starting " + name);
+ LoggerFactory.getLogger(getClass()).info("Starting " + name);
}
protected RaftPolicy createRaftPolicy(final boolean automaticElectionsEnabled,
}
protected RaftPolicy createRaftPolicy(final boolean automaticElectionsEnabled,
InMemorySnapshotStore.addSnapshotSavedLatch(id);
InMemoryJournal.addDeleteMessagesCompleteLatch(id);
InMemorySnapshotStore.addSnapshotSavedLatch(id);
InMemoryJournal.addDeleteMessagesCompleteLatch(id);
+ InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class);
List<ReplicatedLogEntry> entries = Arrays.asList(
newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"));
List<ReplicatedLogEntry> entries = Arrays.asList(
newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"));
final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
InMemoryJournal.waitForDeleteMessagesComplete(id);
final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
InMemoryJournal.waitForDeleteMessagesComplete(id);
+ InMemoryJournal.waitForWriteMessagesComplete(id);
// We expect the ApplyJournalEntries for index 1 to remain in the persisted log b/c it's still queued for
// persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
// This is OK - on recovery it will be a no-op since index 1 has already been applied.
// We expect the ApplyJournalEntries for index 1 to remain in the persisted log b/c it's still queued for
// persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
// This is OK - on recovery it will be a no-op since index 1 has already been applied.
InMemorySnapshotStore.addSnapshotSavedLatch(id);
InMemoryJournal.addDeleteMessagesCompleteLatch(id);
InMemorySnapshotStore.addSnapshotSavedLatch(id);
InMemoryJournal.addDeleteMessagesCompleteLatch(id);
+ InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class);
List<ReplicatedLogEntry> entries = Arrays.asList(
newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"),
List<ReplicatedLogEntry> entries = Arrays.asList(
newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"),
final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
InMemoryJournal.waitForDeleteMessagesComplete(id);
final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
InMemoryJournal.waitForDeleteMessagesComplete(id);
+ InMemoryJournal.waitForWriteMessagesComplete(id);
// We expect the ApplyJournalEntries for index 2 to remain in the persisted log b/c it's still queued for
// persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
// This is OK - on recovery it will be a no-op since index 2 has already been applied.
// We expect the ApplyJournalEntries for index 2 to remain in the persisted log b/c it's still queued for
// persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
// This is OK - on recovery it will be a no-op since index 2 has already been applied.
InMemorySnapshotStore.addSnapshotSavedLatch(id);
InMemoryJournal.addDeleteMessagesCompleteLatch(id);
InMemorySnapshotStore.addSnapshotSavedLatch(id);
InMemoryJournal.addDeleteMessagesCompleteLatch(id);
+ InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class);
List<ReplicatedLogEntry> entries = Arrays.asList(
newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"),
List<ReplicatedLogEntry> entries = Arrays.asList(
newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"),
final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
InMemoryJournal.waitForDeleteMessagesComplete(id);
final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
InMemoryJournal.waitForDeleteMessagesComplete(id);
+ InMemoryJournal.waitForWriteMessagesComplete(id);
// We expect the ApplyJournalEntries for index 0 to remain in the persisted log b/c it's still queued for
// persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
// This is OK - on recovery it will be a no-op since index 0 has already been applied.
// We expect the ApplyJournalEntries for index 0 to remain in the persisted log b/c it's still queued for
// persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
// This is OK - on recovery it will be a no-op since index 0 has already been applied.
public static void clear() {
JOURNALS.clear();
public static void clear() {
JOURNALS.clear();
+ DELETE_MESSAGES_COMPLETE_LATCHES.clear();
+ WRITE_MESSAGES_COMPLETE.clear();
+ BLOCK_READ_MESSAGES_LATCHES.clear();
}
@SuppressWarnings("unchecked")
}
@SuppressWarnings("unchecked")