BUG-5280: do not cache modify responses
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / DataTreeChangeListenerProxy.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import akka.actor.ActorRef;
11 import akka.actor.ActorSelection;
12 import akka.actor.PoisonPill;
13 import akka.dispatch.OnComplete;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.base.Preconditions;
16 import javax.annotation.concurrent.GuardedBy;
17 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
18 import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeChangeListenerRegistration;
19 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
20 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
21 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
22 import org.opendaylight.controller.md.sal.dom.api.ClusteredDOMDataTreeChangeListener;
23 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
24 import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
25 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28 import scala.concurrent.Future;
29
30 /**
31  * Proxy class for holding required state to lazily instantiate a listener registration with an
32  * asynchronously-discovered actor.
33  *
34  * @param <T> listener type
35  */
36 final class DataTreeChangeListenerProxy<T extends DOMDataTreeChangeListener> extends AbstractListenerRegistration<T> {
37     private static final Logger LOG = LoggerFactory.getLogger(DataTreeChangeListenerProxy.class);
38     private final ActorRef dataChangeListenerActor;
39     private final ActorContext actorContext;
40
41     @GuardedBy("this")
42     private ActorSelection listenerRegistrationActor;
43
44     DataTreeChangeListenerProxy(final ActorContext actorContext, final T listener) {
45         super(listener);
46         this.actorContext = Preconditions.checkNotNull(actorContext);
47         this.dataChangeListenerActor = actorContext.getActorSystem().actorOf(
48                 DataTreeChangeListenerActor.props(getInstance())
49                     .withDispatcher(actorContext.getNotificationDispatcherPath()));
50     }
51
52     @Override
53     protected synchronized void removeRegistration() {
54         if (listenerRegistrationActor != null) {
55             listenerRegistrationActor.tell(CloseDataTreeChangeListenerRegistration.getInstance(), ActorRef.noSender());
56             listenerRegistrationActor = null;
57         }
58
59         dataChangeListenerActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
60     }
61
62     void init(final String shardName, final YangInstanceIdentifier treeId) {
63         Future<ActorRef> findFuture = actorContext.findLocalShardAsync(shardName);
64         findFuture.onComplete(new OnComplete<ActorRef>() {
65             @Override
66             public void onComplete(final Throwable failure, final ActorRef shard) {
67                 if (failure instanceof LocalShardNotFoundException) {
68                     LOG.debug("No local shard found for {} - DataTreeChangeListener {} at path {} "
69                             + "cannot be registered", shardName, getInstance(), treeId);
70                 } else if (failure != null) {
71                     LOG.error("Failed to find local shard {} - DataTreeChangeListener {} at path {} "
72                             + "cannot be registered: {}", shardName, getInstance(), treeId, failure);
73                 } else {
74                     doRegistration(shard, treeId);
75                 }
76             }
77         }, actorContext.getClientDispatcher());
78     }
79
80     private void setListenerRegistrationActor(final ActorSelection actor) {
81         if (actor == null) {
82             LOG.debug("Ignoring null actor on {}", this);
83             return;
84         }
85
86         synchronized (this) {
87             if (!isClosed()) {
88                 this.listenerRegistrationActor = actor;
89                 return;
90             }
91         }
92
93         // This registration has already been closed, notify the actor
94         actor.tell(CloseDataTreeChangeListenerRegistration.getInstance(), null);
95     }
96
97     private void doRegistration(final ActorRef shard, final YangInstanceIdentifier path) {
98
99         Future<Object> future = actorContext.executeOperationAsync(shard,
100                 new RegisterDataTreeChangeListener(path, dataChangeListenerActor,
101                         getInstance() instanceof ClusteredDOMDataTreeChangeListener),
102                 actorContext.getDatastoreContext().getShardInitializationTimeout());
103
104         future.onComplete(new OnComplete<Object>() {
105             @Override
106             public void onComplete(final Throwable failure, final Object result) {
107                 if (failure != null) {
108                     LOG.error("Failed to register DataTreeChangeListener {} at path {}",
109                             getInstance(), path.toString(), failure);
110                 } else {
111                     RegisterDataTreeChangeListenerReply reply = (RegisterDataTreeChangeListenerReply) result;
112                     setListenerRegistrationActor(actorContext.actorSelection(
113                             reply.getListenerRegistrationPath()));
114                 }
115             }
116         }, actorContext.getClientDispatcher());
117     }
118
119     @VisibleForTesting
120     synchronized ActorSelection getListenerRegistrationActor() {
121         return listenerRegistrationActor;
122     }
123
124     @VisibleForTesting
125     ActorRef getDataChangeListenerActor() {
126         return dataChangeListenerActor;
127     }
128 }