Merge "Remove SimpleDataTreeCandidate"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / DataTreeChangeListenerProxy.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import akka.actor.ActorRef;
11 import akka.actor.ActorSelection;
12 import akka.actor.PoisonPill;
13 import akka.dispatch.OnComplete;
14 import com.google.common.base.Preconditions;
15 import javax.annotation.concurrent.GuardedBy;
16 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
17 import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeChangeListenerRegistration;
18 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
19 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
20 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
21 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
22 import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
23 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26 import scala.concurrent.Future;
27
28 /**
29  * Proxy class for holding required state to lazily instantiate a listener registration with an
30  * asynchronously-discovered actor.
31  *
32  * @param <T> listener type
33  */
34 final class DataTreeChangeListenerProxy<T extends DOMDataTreeChangeListener> extends AbstractListenerRegistration<T> {
35     private static final Logger LOG = LoggerFactory.getLogger(DataTreeChangeListenerProxy.class);
36     private final ActorRef dataChangeListenerActor;
37     private final ActorContext actorContext;
38
39     @GuardedBy("this")
40     private ActorSelection listenerRegistrationActor;
41
42     public DataTreeChangeListenerProxy(final ActorContext actorContext, final T listener) {
43         super(listener);
44         this.actorContext = Preconditions.checkNotNull(actorContext);
45         this.dataChangeListenerActor = actorContext.getActorSystem().actorOf(
46             DataTreeChangeListenerActor.props(getInstance()).withDispatcher(actorContext.getNotificationDispatcherPath()));
47     }
48
49     @Override
50     protected synchronized void removeRegistration() {
51         if (listenerRegistrationActor != null) {
52             listenerRegistrationActor.tell(CloseDataTreeChangeListenerRegistration.getInstance(), ActorRef.noSender());
53             listenerRegistrationActor = null;
54         }
55
56         dataChangeListenerActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
57     }
58
59     void init(final String shardName, final YangInstanceIdentifier treeId) {
60         Future<ActorRef> findFuture = actorContext.findLocalShardAsync(shardName);
61         findFuture.onComplete(new OnComplete<ActorRef>() {
62             @Override
63             public void onComplete(final Throwable failure, final ActorRef shard) {
64                 if (failure instanceof LocalShardNotFoundException) {
65                     LOG.debug("No local shard found for {} - DataTreeChangeListener {} at path {} " +
66                             "cannot be registered", shardName, getInstance(), treeId);
67                 } else if (failure != null) {
68                     LOG.error("Failed to find local shard {} - DataTreeChangeListener {} at path {} " +
69                             "cannot be registered: {}", shardName, getInstance(), treeId, failure);
70                 } else {
71                     doRegistration(shard, treeId);
72                 }
73             }
74         }, actorContext.getClientDispatcher());
75     }
76
77     private void setListenerRegistrationActor(final ActorSelection actor) {
78         if (actor == null) {
79             LOG.debug("Ignoring null actor on {}", this);
80             return;
81         }
82
83         synchronized (this) {
84             if (!isClosed()) {
85                 this.listenerRegistrationActor = actor;
86                 return;
87             }
88         }
89
90         // This registration has already been closed, notify the actor
91         actor.tell(CloseDataTreeChangeListenerRegistration.getInstance(), null);
92     }
93
94     private void doRegistration(final ActorRef shard, final YangInstanceIdentifier path) {
95
96         Future<Object> future = actorContext.executeOperationAsync(shard,
97                 new RegisterDataTreeChangeListener(path, dataChangeListenerActor),
98                 actorContext.getDatastoreContext().getShardInitializationTimeout());
99
100         future.onComplete(new OnComplete<Object>(){
101             @Override
102             public void onComplete(final Throwable failure, final Object result) {
103                 if (failure != null) {
104                     LOG.error("Failed to register DataTreeChangeListener {} at path {}",
105                             getInstance(), path.toString(), failure);
106                 } else {
107                     RegisterDataTreeChangeListenerReply reply = (RegisterDataTreeChangeListenerReply) result;
108                     setListenerRegistrationActor(actorContext.actorSelection(
109                             reply.getListenerRegistrationPath().path()));
110                 }
111             }
112         }, actorContext.getClientDispatcher());
113     }
114 }