- RegisterChangeListenerReply reply = (RegisterChangeListenerReply) result;
- return new DataChangeListenerRegistrationProxy(actorContext.actorSelection(reply.getListenerRegistrationPath()), listener, dataChangeListenerActor);
- }
+ String shardName = ShardStrategyFactory.getStrategy(path).findShard(path);
+
+ Optional<ActorRef> shard = actorContext.findLocalShard(shardName);
+
+ //if shard is NOT local
+ if (!shard.isPresent()) {
+ LOG.debug("No local shard for shardName {} was found so returning a noop registration", shardName);
+ return new NoOpDataChangeListenerRegistration(listener);
+ }
+ //if shard is local
+ ActorRef dataChangeListenerActor = actorContext.getActorSystem().actorOf(DataChangeListener.props(listener));
+ Future future = actorContext.executeOperationAsync(shard.get(),
+ new RegisterChangeListener(path, dataChangeListenerActor.path(), scope),
+ new Timeout(actorContext.getOperationDuration().$times(REGISTER_DATA_CHANGE_LISTENER_TIMEOUT_FACTOR)));