Deprecate common.util.Arguments for removal
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / DataTreeChangeListenerProxy.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static java.util.Objects.requireNonNull;
11
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.PoisonPill;
15 import akka.dispatch.OnComplete;
16 import com.google.common.annotations.VisibleForTesting;
17 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
18 import org.checkerframework.checker.lock.qual.GuardedBy;
19 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
20 import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration;
21 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
22 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeNotificationListenerReply;
23 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
24 import org.opendaylight.mdsal.dom.api.ClusteredDOMDataTreeChangeListener;
25 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
26 import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
27 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30 import scala.concurrent.Future;
31
32 /**
33  * Proxy class for holding required state to lazily instantiate a listener registration with an
34  * asynchronously-discovered actor.
35  *
36  * @param <T> listener type
37  */
38 final class DataTreeChangeListenerProxy<T extends DOMDataTreeChangeListener> extends AbstractListenerRegistration<T> {
39     private static final Logger LOG = LoggerFactory.getLogger(DataTreeChangeListenerProxy.class);
40     private final ActorRef dataChangeListenerActor;
41     private final ActorUtils actorUtils;
42     private final YangInstanceIdentifier registeredPath;
43
44     @GuardedBy("this")
45     private ActorSelection listenerRegistrationActor;
46
47     DataTreeChangeListenerProxy(final ActorUtils actorUtils, final T listener,
48             final YangInstanceIdentifier registeredPath) {
49         super(listener);
50         this.actorUtils = requireNonNull(actorUtils);
51         this.registeredPath = requireNonNull(registeredPath);
52         this.dataChangeListenerActor = actorUtils.getActorSystem().actorOf(
53                 DataTreeChangeListenerActor.props(getInstance(), registeredPath)
54                     .withDispatcher(actorUtils.getNotificationDispatcherPath()));
55
56         LOG.debug("{}: Created actor {} for DTCL {}", actorUtils.getDatastoreContext().getLogicalStoreType(),
57                 dataChangeListenerActor, listener);
58     }
59
60     @Override
61     protected synchronized void removeRegistration() {
62         if (listenerRegistrationActor != null) {
63             listenerRegistrationActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(),
64                     ActorRef.noSender());
65             listenerRegistrationActor = null;
66         }
67
68         dataChangeListenerActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
69     }
70
71     void init(final String shardName) {
72         Future<ActorRef> findFuture = actorUtils.findLocalShardAsync(shardName);
73         findFuture.onComplete(new OnComplete<ActorRef>() {
74             @Override
75             public void onComplete(final Throwable failure, final ActorRef shard) {
76                 if (failure instanceof LocalShardNotFoundException) {
77                     LOG.debug("{}: No local shard found for {} - DataTreeChangeListener {} at path {} "
78                             + "cannot be registered", logContext(), shardName, getInstance(), registeredPath);
79                 } else if (failure != null) {
80                     LOG.error("{}: Failed to find local shard {} - DataTreeChangeListener {} at path {} "
81                             + "cannot be registered", logContext(), shardName, getInstance(), registeredPath,
82                             failure);
83                 } else {
84                     doRegistration(shard);
85                 }
86             }
87         }, actorUtils.getClientDispatcher());
88     }
89
90     @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
91             justification = "https://github.com/spotbugs/spotbugs/issues/811")
92     private void setListenerRegistrationActor(final ActorSelection actor) {
93         if (actor == null) {
94             LOG.debug("{}: Ignoring null actor on {}", logContext(), this);
95             return;
96         }
97
98         synchronized (this) {
99             if (!isClosed()) {
100                 this.listenerRegistrationActor = actor;
101                 return;
102             }
103         }
104
105         // This registration has already been closed, notify the actor
106         actor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), null);
107     }
108
109     @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
110             justification = "https://github.com/spotbugs/spotbugs/issues/811")
111     private void doRegistration(final ActorRef shard) {
112
113         Future<Object> future = actorUtils.executeOperationAsync(shard,
114                 new RegisterDataTreeChangeListener(registeredPath, dataChangeListenerActor,
115                         getInstance() instanceof ClusteredDOMDataTreeChangeListener),
116                 actorUtils.getDatastoreContext().getShardInitializationTimeout());
117
118         future.onComplete(new OnComplete<Object>() {
119             @Override
120             public void onComplete(final Throwable failure, final Object result) {
121                 if (failure != null) {
122                     LOG.error("{}: Failed to register DataTreeChangeListener {} at path {}", logContext(),
123                             getInstance(), registeredPath, failure);
124                 } else {
125                     RegisterDataTreeNotificationListenerReply reply = (RegisterDataTreeNotificationListenerReply)result;
126                     setListenerRegistrationActor(actorUtils.actorSelection(
127                             reply.getListenerRegistrationPath()));
128                 }
129             }
130         }, actorUtils.getClientDispatcher());
131     }
132
133     @VisibleForTesting
134     synchronized ActorSelection getListenerRegistrationActor() {
135         return listenerRegistrationActor;
136     }
137
138     @VisibleForTesting
139     ActorRef getDataChangeListenerActor() {
140         return dataChangeListenerActor;
141     }
142
143     private String logContext() {
144         return actorUtils.getDatastoreContext().getLogicalStoreType().toString();
145     }
146 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.