2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import static java.util.Objects.requireNonNull;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.PoisonPill;
15 import akka.dispatch.OnComplete;
16 import com.google.common.annotations.VisibleForTesting;
17 import org.checkerframework.checker.lock.qual.GuardedBy;
18 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
19 import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration;
20 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
21 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeNotificationListenerReply;
22 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
23 import org.opendaylight.mdsal.dom.api.ClusteredDOMDataTreeChangeListener;
24 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
25 import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
26 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29 import scala.concurrent.Future;
32 * Proxy class for holding required state to lazily instantiate a listener registration with an
33 * asynchronously-discovered actor.
35 * @param <T> listener type
37 final class DataTreeChangeListenerProxy<T extends DOMDataTreeChangeListener> extends AbstractListenerRegistration<T> {
38 private static final Logger LOG = LoggerFactory.getLogger(DataTreeChangeListenerProxy.class);
39 private final ActorRef dataChangeListenerActor;
40 private final ActorUtils actorUtils;
41 private final YangInstanceIdentifier registeredPath;
44 private ActorSelection listenerRegistrationActor;
46 DataTreeChangeListenerProxy(final ActorUtils actorUtils, final T listener,
47 final YangInstanceIdentifier registeredPath) {
49 this.actorUtils = requireNonNull(actorUtils);
50 this.registeredPath = requireNonNull(registeredPath);
51 this.dataChangeListenerActor = actorUtils.getActorSystem().actorOf(
52 DataTreeChangeListenerActor.props(getInstance(), registeredPath)
53 .withDispatcher(actorUtils.getNotificationDispatcherPath()));
55 LOG.debug("{}: Created actor {} for DTCL {}", actorUtils.getDatastoreContext().getLogicalStoreType(),
56 dataChangeListenerActor, listener);
60 protected synchronized void removeRegistration() {
61 if (listenerRegistrationActor != null) {
62 listenerRegistrationActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(),
64 listenerRegistrationActor = null;
67 dataChangeListenerActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
70 void init(final String shardName) {
71 Future<ActorRef> findFuture = actorUtils.findLocalShardAsync(shardName);
72 findFuture.onComplete(new OnComplete<ActorRef>() {
74 public void onComplete(final Throwable failure, final ActorRef shard) {
75 if (failure instanceof LocalShardNotFoundException) {
76 LOG.debug("{}: No local shard found for {} - DataTreeChangeListener {} at path {} "
77 + "cannot be registered", logContext(), shardName, getInstance(), registeredPath);
78 } else if (failure != null) {
79 LOG.error("{}: Failed to find local shard {} - DataTreeChangeListener {} at path {} "
80 + "cannot be registered", logContext(), shardName, getInstance(), registeredPath,
83 doRegistration(shard);
86 }, actorUtils.getClientDispatcher());
89 private void setListenerRegistrationActor(final ActorSelection actor) {
91 LOG.debug("{}: Ignoring null actor on {}", logContext(), this);
97 this.listenerRegistrationActor = actor;
102 // This registration has already been closed, notify the actor
103 actor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), null);
106 private void doRegistration(final ActorRef shard) {
108 Future<Object> future = actorUtils.executeOperationAsync(shard,
109 new RegisterDataTreeChangeListener(registeredPath, dataChangeListenerActor,
110 getInstance() instanceof ClusteredDOMDataTreeChangeListener),
111 actorUtils.getDatastoreContext().getShardInitializationTimeout());
113 future.onComplete(new OnComplete<Object>() {
115 public void onComplete(final Throwable failure, final Object result) {
116 if (failure != null) {
117 LOG.error("{}: Failed to register DataTreeChangeListener {} at path {}", logContext(),
118 getInstance(), registeredPath, failure);
120 RegisterDataTreeNotificationListenerReply reply = (RegisterDataTreeNotificationListenerReply)result;
121 setListenerRegistrationActor(actorUtils.actorSelection(
122 reply.getListenerRegistrationPath()));
125 }, actorUtils.getClientDispatcher());
129 synchronized ActorSelection getListenerRegistrationActor() {
130 return listenerRegistrationActor;
134 ActorRef getDataChangeListenerActor() {
135 return dataChangeListenerActor;
138 private String logContext() {
139 return actorUtils.getDatastoreContext().getLogicalStoreType().toString();