+ @Test
+ public void testOnUpdateSchemaContextDoNotUpdateKnownModulesIfTheyDoNotContainASuperSetOfKnownModules()
+ throws Exception {
+ new JavaTestKit(system) {{
+ final Props props = ShardManager
+ .props("config", new MockClusterWrapper(),
+ new MockConfiguration(), new DatastoreContext());
+ final TestActorRef<ShardManager> subject =
+ TestActorRef.create(system, props);
+
+ Collection<String> knownModules = subject.underlyingActor().getKnownModules();
+
+ assertEquals(0, knownModules.size());
+
+ SchemaContext schemaContext = mock(SchemaContext.class);
+ Set<ModuleIdentifier> moduleIdentifierSet = new HashSet<>();
+
+ ModuleIdentifier foo = mock(ModuleIdentifier.class);
+ when(foo.getNamespace()).thenReturn(new URI("foo"));
+
+ moduleIdentifierSet.add(foo);
+
+ when(schemaContext.getAllModuleIdentifiers()).thenReturn(moduleIdentifierSet);
+
+ subject.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
+
+ assertTrue(knownModules.contains("foo"));
+
+ assertEquals(1, knownModules.size());
+
+ //Create a completely different SchemaContext with only the bar module in it
+ schemaContext = mock(SchemaContext.class);
+ moduleIdentifierSet = new HashSet<>();
+ ModuleIdentifier bar = mock(ModuleIdentifier.class);
+ when(bar.getNamespace()).thenReturn(new URI("bar"));
+
+ moduleIdentifierSet.add(bar);
+
+ subject.underlyingActor().onReceiveCommand(new UpdateSchemaContext(schemaContext));
+
+ assertFalse(knownModules.contains("bar"));
+
+ assertEquals(1, knownModules.size());
+
+ }};
+
+ }
+
+
+ private void sleep(long period){
+ Uninterruptibles.sleepUninterruptibly(period, TimeUnit.MILLISECONDS);
+ }
+
+ public static class MyJournal extends AsyncWriteJournal {
+
+ private static Map<Long, Object> journal = Maps.newTreeMap();
+
+ public static void addToJournal(Long sequenceNr, Object value){
+ journal.put(sequenceNr, value);
+ }
+
+ public static Map<Long, Object> get(){
+ return journal;
+ }
+
+ public static void clear(){
+ journal.clear();
+ }
+
+ @Override public Future<Void> doAsyncReplayMessages(final String persistenceId, long fromSequenceNr, long toSequenceNr, long max,
+ final Procedure<PersistentRepr> replayCallback) {
+ if(journal.size() == 0){
+ return Futures.successful(null);
+ }
+ return Futures.future(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ for (Map.Entry<Long, Object> entry : journal.entrySet()) {
+ PersistentRepr persistentMessage =
+ new PersistentImpl(entry.getValue(), entry.getKey(), persistenceId,
+ false, null, null);
+ replayCallback.apply(persistentMessage);
+ }
+ return null;
+ }
+ }, context().dispatcher());
+ }
+
+ @Override public Future<Long> doAsyncReadHighestSequenceNr(String s, long l) {
+ return Futures.successful(-1L);
+ }
+
+ @Override public Future<Void> doAsyncWriteMessages(
+ final Iterable<PersistentRepr> persistentReprs) {
+ return Futures.future(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ for (PersistentRepr repr : persistentReprs){
+ if(repr.payload() instanceof ShardManager.SchemaContextModules) {
+ journal.put(repr.sequenceNr(), repr.payload());
+ }
+ }
+ return null;
+ }
+ }, context().dispatcher());
+ }
+
+ @Override public Future<Void> doAsyncWriteConfirmations(
+ Iterable<PersistentConfirmation> persistentConfirmations) {
+ return Futures.successful(null);
+ }
+
+ @Override public Future<Void> doAsyncDeleteMessages(Iterable<PersistentId> persistentIds,
+ boolean b) {
+ clear();
+ return Futures.successful(null);
+ }
+
+ @Override public Future<Void> doAsyncDeleteMessagesTo(String s, long l, boolean b) {
+ clear();
+ return Futures.successful(null);
+ }
+ }