2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import static java.util.Objects.requireNonNull;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.PoisonPill;
15 import akka.dispatch.OnComplete;
16 import com.google.common.annotations.VisibleForTesting;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import java.util.concurrent.Executor;
19 import org.checkerframework.checker.lock.qual.GuardedBy;
20 import org.eclipse.jdt.annotation.NonNull;
21 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
22 import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration;
23 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
24 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeNotificationListenerReply;
25 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
26 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
27 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
28 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
33 * Proxy class for holding required state to lazily instantiate a listener registration with an
34 * asynchronously-discovered actor.
36 * @param <T> listener type
38 final class DataTreeChangeListenerProxy extends AbstractObjectRegistration<DOMDataTreeChangeListener> {
39 private static final Logger LOG = LoggerFactory.getLogger(DataTreeChangeListenerProxy.class);
40 private final ActorRef dataChangeListenerActor;
41 private final ActorUtils actorUtils;
42 private final YangInstanceIdentifier registeredPath;
43 private final boolean clustered;
46 private ActorSelection listenerRegistrationActor;
49 private DataTreeChangeListenerProxy(final ActorUtils actorUtils, final DOMDataTreeChangeListener listener,
50 final YangInstanceIdentifier registeredPath, final boolean clustered, final String shardName) {
52 this.actorUtils = requireNonNull(actorUtils);
53 this.registeredPath = requireNonNull(registeredPath);
54 this.clustered = clustered;
55 dataChangeListenerActor = actorUtils.getActorSystem().actorOf(
56 DataTreeChangeListenerActor.props(getInstance(), registeredPath)
57 .withDispatcher(actorUtils.getNotificationDispatcherPath()));
58 LOG.debug("{}: Created actor {} for DTCL {}", actorUtils.getDatastoreContext().getLogicalStoreType(),
59 dataChangeListenerActor, listener);
62 static @NonNull DataTreeChangeListenerProxy of(final ActorUtils actorUtils,
63 final DOMDataTreeChangeListener listener, final YangInstanceIdentifier registeredPath,
64 final boolean clustered, final String shardName) {
65 return ofTesting(actorUtils, listener, registeredPath, clustered, shardName, MoreExecutors.directExecutor());
69 static @NonNull DataTreeChangeListenerProxy ofTesting(final ActorUtils actorUtils,
70 final DOMDataTreeChangeListener listener, final YangInstanceIdentifier registeredPath,
71 final boolean clustered, final String shardName, final Executor executor) {
72 final var ret = new DataTreeChangeListenerProxy(actorUtils, listener, registeredPath, clustered, shardName);
73 executor.execute(() -> {
74 LOG.debug("{}: Starting discovery of shard {}", ret.logContext(), shardName);
75 actorUtils.findLocalShardAsync(shardName).onComplete(new OnComplete<>() {
77 public void onComplete(final Throwable failure, final ActorRef shard) {
78 if (failure instanceof LocalShardNotFoundException) {
79 LOG.debug("{}: No local shard found for {} - DataTreeChangeListener {} at path {} cannot be "
80 + "registered", ret.logContext(), shardName, listener, registeredPath);
81 } else if (failure != null) {
82 LOG.error("{}: Failed to find local shard {} - DataTreeChangeListener {} at path {} cannot be "
83 + "registered", ret.logContext(), shardName, listener, registeredPath, failure);
85 ret.doRegistration(shard);
88 }, actorUtils.getClientDispatcher());
94 protected synchronized void removeRegistration() {
95 if (listenerRegistrationActor != null) {
96 listenerRegistrationActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(),
98 listenerRegistrationActor = null;
101 dataChangeListenerActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
104 private void setListenerRegistrationActor(final ActorSelection actor) {
106 LOG.debug("{}: Ignoring null actor on {}", logContext(), this);
110 synchronized (this) {
112 listenerRegistrationActor = actor;
117 // This registration has already been closed, notify the actor
118 actor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), null);
121 private void doRegistration(final ActorRef shard) {
122 actorUtils.executeOperationAsync(shard,
123 new RegisterDataTreeChangeListener(registeredPath, dataChangeListenerActor, clustered),
124 actorUtils.getDatastoreContext().getShardInitializationTimeout()).onComplete(new OnComplete<>() {
126 public void onComplete(final Throwable failure, final Object result) {
127 if (failure != null) {
128 LOG.error("{}: Failed to register DataTreeChangeListener {} at path {}", logContext(),
129 getInstance(), registeredPath, failure);
131 setListenerRegistrationActor(actorUtils.actorSelection(
132 ((RegisterDataTreeNotificationListenerReply) result).getListenerRegistrationPath()));
135 }, actorUtils.getClientDispatcher());
139 synchronized ActorSelection getListenerRegistrationActor() {
140 return listenerRegistrationActor;
144 ActorRef getDataChangeListenerActor() {
145 return dataChangeListenerActor;
148 private String logContext() {
149 return actorUtils.getDatastoreContext().getLogicalStoreType().toString();