+ private void setListenerRegistrationActor(ActorSelection listenerRegistrationActor) {
+ if(listenerRegistrationActor == null) {
+ return;
+ }
+
+ boolean sendCloseMessage = false;
+ synchronized(this) {
+ if(closed) {
+ sendCloseMessage = true;
+ } else {
+ this.listenerRegistrationActor = listenerRegistrationActor;
+ }
+ }
+
+ if(sendCloseMessage) {
+ listenerRegistrationActor.tell(new
+ CloseDataChangeListenerRegistration().toSerializable(), null);
+ }
+ }
+
+ public void init(final YangInstanceIdentifier path, final AsyncDataBroker.DataChangeScope scope) {
+
+ dataChangeListenerActor = actorContext.getActorSystem().actorOf(
+ DataChangeListener.props(listener));
+
+ Future<ActorRef> findFuture = actorContext.findLocalShardAsync(shardName);
+ findFuture.onComplete(new OnComplete<ActorRef>() {
+ @Override
+ public void onComplete(Throwable failure, ActorRef shard) {
+ if(failure instanceof LocalShardNotFoundException) {
+ LOG.debug("No local shard found for {} - DataChangeListener {} at path {} " +
+ "cannot be registered", shardName, listener, path);
+ } else if(failure != null) {
+ LOG.error("Failed to find local shard {} - DataChangeListener {} at path {} " +
+ "cannot be registered: {}", shardName, listener, path, failure);
+ } else {
+ doRegistration(shard, path, scope);
+ }
+ }
+ }, actorContext.getActorSystem().dispatcher());
+ }
+
+ private void doRegistration(ActorRef shard, final YangInstanceIdentifier path,
+ DataChangeScope scope) {
+
+ Future<Object> future = actorContext.executeOperationAsync(shard,
+ new RegisterChangeListener(path, dataChangeListenerActor.path(), scope),
+ actorContext.getDatastoreContext().getShardInitializationTimeout());
+
+ future.onComplete(new OnComplete<Object>(){
+ @Override
+ public void onComplete(Throwable failure, Object result) {
+ if(failure != null) {
+ LOG.error("Failed to register DataChangeListener {} at path {}",
+ listener, path.toString(), failure);
+ } else {
+ RegisterChangeListenerReply reply = (RegisterChangeListenerReply) result;
+ setListenerRegistrationActor(actorContext.actorSelection(
+ reply.getListenerRegistrationPath()));
+ }
+ }
+ }, actorContext.getActorSystem().dispatcher());
+ }
+