- public <L extends AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L> registerChangeListener(InstanceIdentifier path, L listener, AsyncDataBroker.DataChangeScope scope) {
- return new ListenerRegistrationProxy();
+ public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
+ ListenerRegistration<L> registerChangeListener(
+ YangInstanceIdentifier path, L listener,
+ AsyncDataBroker.DataChangeScope scope) {
+
+ Preconditions.checkNotNull(path, "path should not be null");
+ Preconditions.checkNotNull(listener, "listener should not be null");
+
+ LOG.debug("Registering listener: {} for path: {} scope: {}", listener, path, scope);
+
+ ActorRef dataChangeListenerActor = actorContext.getActorSystem().actorOf(
+ DataChangeListener.props(schemaContext,listener,path ));
+
+ String shardName = ShardStrategyFactory.getStrategy(path).findShard(path);
+
+ Object result = actorContext.executeLocalShardOperation(shardName,
+ new RegisterChangeListener(path, dataChangeListenerActor.path(), scope),
+ ActorContext.ASK_DURATION);
+
+ if (result != null) {
+ RegisterChangeListenerReply reply = (RegisterChangeListenerReply) result;
+ return new DataChangeListenerRegistrationProxy(actorContext
+ .actorSelection(reply.getListenerRegistrationPath()), listener,
+ dataChangeListenerActor);
+ }
+
+ LOG.debug(
+ "No local shard for shardName {} was found so returning a noop registration",
+ shardName);
+
+ return new NoOpDataChangeListenerRegistration(listener);