+ protected String testActorPath(final String id) {
+ return factory.createTestActorPath(id);
+ }
+
+ protected void verifyLeadersTrimmedLog(final long lastIndex) {
+ verifyTrimmedLog("Leader", leaderActor, lastIndex, lastIndex - 1);
+ }
+
+ protected void verifyLeadersTrimmedLog(final long lastIndex, final long replicatedToAllIndex) {
+ verifyTrimmedLog("Leader", leaderActor, lastIndex, replicatedToAllIndex);
+ }
+
+ protected void verifyFollowersTrimmedLog(final int num, final TestActorRef<TestRaftActor> actorRef,
+ final long lastIndex) {
+ verifyTrimmedLog("Follower " + num, actorRef, lastIndex, lastIndex - 1);
+ }
+
+ protected void verifyTrimmedLog(final String name, final TestActorRef<TestRaftActor> actorRef, final long lastIndex,
+ final long replicatedToAllIndex) {
+ TestRaftActor actor = actorRef.underlyingActor();
+ RaftActorContext context = actor.getRaftActorContext();
+ long snapshotIndex = lastIndex - 1;
+ assertEquals(name + " snapshot term", snapshotIndex < 0 ? -1 : currentTerm,
+ context.getReplicatedLog().getSnapshotTerm());
+ assertEquals(name + " snapshot index", snapshotIndex, context.getReplicatedLog().getSnapshotIndex());
+ assertEquals(name + " journal log size", 1, context.getReplicatedLog().size());
+ assertEquals(name + " journal last index", lastIndex, context.getReplicatedLog().lastIndex());
+ assertEquals(name + " commit index", lastIndex, context.getCommitIndex());
+ assertEquals(name + " last applied", lastIndex, context.getLastApplied());
+ assertEquals(name + " replicatedToAllIndex", replicatedToAllIndex,
+ actor.getCurrentBehavior().getReplicatedToAllIndex());
+ }
+
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ static void verifyRaftState(final ActorRef raftActor, final Consumer<OnDemandRaftState> verifier) {
+ Timeout timeout = new Timeout(500, TimeUnit.MILLISECONDS);
+ AssertionError lastError = null;
+ Stopwatch sw = Stopwatch.createStarted();
+ while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
+ try {
+ OnDemandRaftState raftState = (OnDemandRaftState)Await.result(ask(raftActor,
+ GetOnDemandRaftState.INSTANCE, timeout), timeout.duration());
+ verifier.accept(raftState);
+ return;
+ } catch (AssertionError e) {
+ lastError = e;
+ Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+ } catch (Exception e) {
+ lastError = new AssertionError("OnDemandRaftState failed", e);
+ Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ throw lastError;