- private void updateSchemaContext(Object message) {
- for(ShardInformation info : localShards.values()){
- info.getActor().tell(message,getSelf());
+ private void updateSchemaContext(final Object message) {
+ final SchemaContext schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
+
+ Set<ModuleIdentifier> allModuleIdentifiers = schemaContext.getAllModuleIdentifiers();
+ Set<String> newModules = new HashSet<>(128);
+
+ for(ModuleIdentifier moduleIdentifier : allModuleIdentifiers){
+ String s = moduleIdentifier.getNamespace().toString();
+ newModules.add(s);
+ }
+
+ if(newModules.containsAll(knownModules)) {
+
+ LOG.debug("New SchemaContext has a super set of current knownModules - persisting info");
+
+ knownModules = ImmutableSet.copyOf(newModules);
+
+ dataPersistenceProvider.persist(new SchemaContextModules(newModules), new Procedure<SchemaContextModules>() {
+
+ @Override
+ public void apply(SchemaContextModules param) throws Exception {
+ LOG.debug("Sending new SchemaContext to Shards");
+ for (ShardInformation info : localShards.values()) {
+ if (info.getActor() == null) {
+ info.setActor(getContext().actorOf(Shard.props(info.getShardId(),
+ info.getPeerAddresses(), datastoreContext, schemaContext),
+ info.getShardId().toString()));
+ } else {
+ info.getActor().tell(message, getSelf());
+ }
+ }
+ }
+
+ });
+ } else {
+ LOG.debug("Rejecting schema context update - not a super set of previously known modules:\nUPDATE: {}\nKNOWN: {}",
+ newModules, knownModules);