+ @Test
+ public void testOnRecoveryJournalIsCleaned() {
+ InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
+ ImmutableSet.of("foo")));
+ InMemoryJournal.addEntry(shardMgrID, 2L, new ShardManager.SchemaContextModules(
+ ImmutableSet.of("bar")));
+ InMemoryJournal.addDeleteMessagesCompleteLatch(shardMgrID);
+
+ new JavaTestKit(getSystem()) {{
+ TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
+ Props.create(new TestShardManagerCreator(shardMrgIDSuffix)));
+
+ shardManager.underlyingActor().waitForRecoveryComplete();
+ InMemoryJournal.waitForDeleteMessagesComplete(shardMgrID);
+
+ // Journal entries up to the last one should've been deleted
+ Map<Long, Object> journal = InMemoryJournal.get(shardMgrID);
+ synchronized (journal) {
+ assertEquals("Journal size", 1, journal.size());
+ assertEquals("Journal entry seq #", Long.valueOf(2), journal.keySet().iterator().next());
+ }
+ }};
+ }
+
+ @Test
+ public void testOnRecoveryPreviouslyKnownModulesAreDiscovered() throws Exception {
+ final ImmutableSet<String> persistedModules = ImmutableSet.of("foo", "bar");
+ InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
+ persistedModules));
+ new JavaTestKit(getSystem()) {{
+ TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(),
+ Props.create(new TestShardManagerCreator(shardMrgIDSuffix)));
+
+ shardManager.underlyingActor().waitForRecoveryComplete();
+
+ Collection<String> knownModules = shardManager.underlyingActor().getKnownModules();
+
+ assertEquals("getKnownModules", persistedModules, Sets.newHashSet(knownModules));
+ }};
+ }
+
+ @Test
+ public void testOnUpdateSchemaContextUpdateKnownModulesIfTheyContainASuperSetOfTheKnownModules()
+ throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final TestActorRef<ShardManager> shardManager =
+ TestActorRef.create(getSystem(), newShardMgrProps());
+
+ assertEquals("getKnownModules size", 0, shardManager.underlyingActor().getKnownModules().size());
+
+ ModuleIdentifier foo = mock(ModuleIdentifier.class);
+ when(foo.getNamespace()).thenReturn(new URI("foo"));
+
+ Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
+ moduleIdentifierSet.add(foo);
+
+ SchemaContext schemaContext = mock(SchemaContext.class);
+ when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
+
+ shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
+
+ assertEquals("getKnownModules", Sets.newHashSet("foo"),
+ Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
+
+ ModuleIdentifier bar = mock(ModuleIdentifier.class);
+ when(bar.getNamespace()).thenReturn(new URI("bar"));
+
+ moduleIdentifierSet.add(bar);
+
+ shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
+
+ assertEquals("getKnownModules", Sets.newHashSet("foo", "bar"),
+ Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
+ }};
+ }
+
+ @Test
+ public void testOnUpdateSchemaContextDoNotUpdateKnownModulesIfTheyDoNotContainASuperSetOfKnownModules()
+ throws Exception {
+ new JavaTestKit(getSystem()) {{
+ final TestActorRef<ShardManager> shardManager =
+ TestActorRef.create(getSystem(), newShardMgrProps());
+
+ SchemaContext schemaContext = mock(SchemaContext.class);
+ Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
+
+ ModuleIdentifier foo = mock(ModuleIdentifier.class);
+ when(foo.getNamespace()).thenReturn(new URI("foo"));
+
+ moduleIdentifierSet.add(foo);
+
+ when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
+
+ shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
+
+ assertEquals("getKnownModules", Sets.newHashSet("foo"),
+ Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
+
+ //Create a completely different SchemaContext with only the bar module in it
+ //schemaContext = mock(SchemaContext.class);
+ moduleIdentifierSet.clear();
+ ModuleIdentifier bar = mock(ModuleIdentifier.class);
+ when(bar.getNamespace()).thenReturn(new URI("bar"));
+
+ moduleIdentifierSet.add(bar);
+
+ shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
+
+ assertEquals("getKnownModules", Sets.newHashSet("foo"),
+ Sets.newHashSet(shardManager.underlyingActor().getKnownModules()));
+
+ }};
+ }
+
+ @Test
+ public void testRecoveryApplicable(){
+ new JavaTestKit(getSystem()) {
+ {
+ final Props persistentProps = ShardManager.props(shardMrgIDSuffix,
+ new MockClusterWrapper(),
+ new MockConfiguration(),
+ DatastoreContext.newBuilder().persistent(true).build());
+ final TestActorRef<ShardManager> persistentShardManager =
+ TestActorRef.create(getSystem(), persistentProps);
+
+ DataPersistenceProvider dataPersistenceProvider1 = persistentShardManager.underlyingActor().getDataPersistenceProvider();
+
+ assertTrue("Recovery Applicable", dataPersistenceProvider1.isRecoveryApplicable());
+
+ final Props nonPersistentProps = ShardManager.props(shardMrgIDSuffix,
+ new MockClusterWrapper(),
+ new MockConfiguration(),
+ DatastoreContext.newBuilder().persistent(false).build());
+ final TestActorRef<ShardManager> nonPersistentShardManager =
+ TestActorRef.create(getSystem(), nonPersistentProps);
+
+ DataPersistenceProvider dataPersistenceProvider2 = nonPersistentShardManager.underlyingActor().getDataPersistenceProvider();
+
+ assertFalse("Recovery Not Applicable", dataPersistenceProvider2.isRecoveryApplicable());
+
+
+ }};
+
+ }
+
+ @Test
+ public void testOnUpdateSchemaContextUpdateKnownModulesCallsDataPersistenceProvider()
+ throws Exception {
+ final CountDownLatch persistLatch = new CountDownLatch(1);
+ final Creator<ShardManager> creator = new Creator<ShardManager>() {
+ private static final long serialVersionUID = 1L;
+ @Override
+ public ShardManager create() throws Exception {
+ return new ShardManager(shardMrgIDSuffix, new MockClusterWrapper(), new MockConfiguration(), DatastoreContext.newBuilder().build()) {
+ @Override
+ protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) {
+ DataPersistenceProviderMonitor dataPersistenceProviderMonitor
+ = new DataPersistenceProviderMonitor();
+ dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
+ return dataPersistenceProviderMonitor;
+ }
+ };
+ }
+ };
+
+ new JavaTestKit(getSystem()) {{
+
+ final TestActorRef<ShardManager> shardManager =
+ TestActorRef.create(getSystem(), Props.create(new DelegatingShardManagerCreator(creator)));
+
+ ModuleIdentifier foo = mock(ModuleIdentifier.class);
+ when(foo.getNamespace()).thenReturn(new URI("foo"));
+
+ Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
+ moduleIdentifierSet.add(foo);
+
+ SchemaContext schemaContext = mock(SchemaContext.class);
+ when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
+
+ shardManager.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
+
+ assertEquals("Persisted", true,
+ Uninterruptibles.awaitUninterruptibly(persistLatch, 5, TimeUnit.SECONDS));