- public void testOnRecoveryJournalIsEmptied(){
- MyJournal.addToJournal(1L, new ShardManager.SchemaContextModules(
- ImmutableSet.of("foo")));
-
- assertEquals(1, MyJournal.get().size());
-
- new JavaTestKit(system) {{
- final Props props = ShardManager
- .props("config", new MockClusterWrapper(),
- new MockConfiguration(), new DatastoreContext());
-
- final ActorRef subject = getSystem().actorOf(props);
-
- // Send message to check that ShardManager is ready
- subject.tell(new FindPrimary("unknown").toSerializable(), getRef());
-
- expectMsgClass(duration("3 seconds"), PrimaryNotFound.SERIALIZABLE_CLASS);
-
- assertEquals(0, MyJournal.get().size());
+ public void testOnRecoveryJournalIsCleaned() {
+ InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
+ ImmutableSet.of("foo")));
+ InMemoryJournal.addEntry(shardMgrID, 2L, new ShardManager.SchemaContextModules(
+ ImmutableSet.of("bar")));
+ InMemoryJournal.addDeleteMessagesCompleteLatch(shardMgrID);
+
+ new JavaTestKit(getSystem()) {{
+ TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
+ Props.create(new TestShardManagerCreator(shardMrgIDSuffix)));
+
+ shardManager.underlyingActor().waitForRecoveryComplete();
+ InMemoryJournal.waitForDeleteMessagesComplete(shardMgrID);
+
+ // Journal entries up to the last one should've been deleted
+ Map<Long, Object> journal = InMemoryJournal.get(shardMgrID);
+ synchronized (journal) {
+ assertEquals("Journal size", 1, journal.size());
+ assertEquals("Journal entry seq #", Long.valueOf(2), journal.keySet().iterator().next());
+ }