verifyOuterListEntry(shard, 1);
- verifyLastApplied(shard, 2);
+ verifyLastApplied(shard, 5);
}
};
}
inOrder.verify(dataTree).prepare(any(DataTreeModification.class));
inOrder.verify(dataTree).commit(any(DataTreeCandidate.class));
+ // Purge request is scheduled as asynchronous, wait for two heartbeats to let it propagate into
+ // the journal
+ Thread.sleep(HEARTBEAT_MILLIS * 2);
+
shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
// Commit index should advance as we do not have an empty
// modification
- assertEquals(0, shardStats.getCommitIndex());
+ assertEquals(1, shardStats.getCommitIndex());
}
};
}
waitUntilLeader(shard);
- final TestActorRef<MessageCollectorActor> listener =
- TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
+ final ActorRef listener = getSystem().actorOf(MessageCollectorActor.props());
shard.tell(new RegisterRoleChangeListener(), listener);
@Test
public void testServerRemoved() throws Exception {
- final TestActorRef<MessageCollectorActor> parent = actorFactory.createTestActor(MessageCollectorActor.props());
+ final TestActorRef<MessageCollectorActor> parent = actorFactory.createTestActor(MessageCollectorActor.props()
+ .withDispatcher(Dispatchers.DefaultDispatcherId()));
final ActorRef shard = parent.underlyingActor().context().actorOf(
newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),