2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import static java.util.Objects.requireNonNull;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.PoisonPill;
15 import akka.dispatch.OnComplete;
16 import com.google.common.annotations.VisibleForTesting;
17 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
18 import org.checkerframework.checker.lock.qual.GuardedBy;
19 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
20 import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration;
21 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
22 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeNotificationListenerReply;
23 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
24 import org.opendaylight.mdsal.dom.api.ClusteredDOMDataTreeChangeListener;
25 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
26 import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
27 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30 import scala.concurrent.Future;
33 * Proxy class for holding required state to lazily instantiate a listener registration with an
34 * asynchronously-discovered actor.
36 * @param <T> listener type
38 final class DataTreeChangeListenerProxy<T extends DOMDataTreeChangeListener> extends AbstractListenerRegistration<T> {
39 private static final Logger LOG = LoggerFactory.getLogger(DataTreeChangeListenerProxy.class);
40 private final ActorRef dataChangeListenerActor;
41 private final ActorUtils actorUtils;
42 private final YangInstanceIdentifier registeredPath;
45 private ActorSelection listenerRegistrationActor;
47 DataTreeChangeListenerProxy(final ActorUtils actorUtils, final T listener,
48 final YangInstanceIdentifier registeredPath) {
50 this.actorUtils = requireNonNull(actorUtils);
51 this.registeredPath = requireNonNull(registeredPath);
52 this.dataChangeListenerActor = actorUtils.getActorSystem().actorOf(
53 DataTreeChangeListenerActor.props(getInstance(), registeredPath)
54 .withDispatcher(actorUtils.getNotificationDispatcherPath()));
56 LOG.debug("{}: Created actor {} for DTCL {}", actorUtils.getDatastoreContext().getLogicalStoreType(),
57 dataChangeListenerActor, listener);
61 protected synchronized void removeRegistration() {
62 if (listenerRegistrationActor != null) {
63 listenerRegistrationActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(),
65 listenerRegistrationActor = null;
68 dataChangeListenerActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
71 void init(final String shardName) {
72 Future<ActorRef> findFuture = actorUtils.findLocalShardAsync(shardName);
73 findFuture.onComplete(new OnComplete<ActorRef>() {
75 public void onComplete(final Throwable failure, final ActorRef shard) {
76 if (failure instanceof LocalShardNotFoundException) {
77 LOG.debug("{}: No local shard found for {} - DataTreeChangeListener {} at path {} "
78 + "cannot be registered", logContext(), shardName, getInstance(), registeredPath);
79 } else if (failure != null) {
80 LOG.error("{}: Failed to find local shard {} - DataTreeChangeListener {} at path {} "
81 + "cannot be registered", logContext(), shardName, getInstance(), registeredPath,
84 doRegistration(shard);
87 }, actorUtils.getClientDispatcher());
90 @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
91 justification = "https://github.com/spotbugs/spotbugs/issues/811")
92 private void setListenerRegistrationActor(final ActorSelection actor) {
94 LOG.debug("{}: Ignoring null actor on {}", logContext(), this);
100 this.listenerRegistrationActor = actor;
105 // This registration has already been closed, notify the actor
106 actor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), null);
109 @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
110 justification = "https://github.com/spotbugs/spotbugs/issues/811")
111 private void doRegistration(final ActorRef shard) {
113 Future<Object> future = actorUtils.executeOperationAsync(shard,
114 new RegisterDataTreeChangeListener(registeredPath, dataChangeListenerActor,
115 getInstance() instanceof ClusteredDOMDataTreeChangeListener),
116 actorUtils.getDatastoreContext().getShardInitializationTimeout());
118 future.onComplete(new OnComplete<Object>() {
120 public void onComplete(final Throwable failure, final Object result) {
121 if (failure != null) {
122 LOG.error("{}: Failed to register DataTreeChangeListener {} at path {}", logContext(),
123 getInstance(), registeredPath, failure);
125 RegisterDataTreeNotificationListenerReply reply = (RegisterDataTreeNotificationListenerReply)result;
126 setListenerRegistrationActor(actorUtils.actorSelection(
127 reply.getListenerRegistrationPath()));
130 }, actorUtils.getClientDispatcher());
134 synchronized ActorSelection getListenerRegistrationActor() {
135 return listenerRegistrationActor;
139 ActorRef getDataChangeListenerActor() {
140 return dataChangeListenerActor;
143 private String logContext() {
144 return actorUtils.getDatastoreContext().getLogicalStoreType().toString();