Separate out RaftEntryMeta
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / DataTreeChangeListenerProxy.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static java.util.Objects.requireNonNull;
11
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.PoisonPill;
15 import akka.dispatch.OnComplete;
16 import com.google.common.annotations.VisibleForTesting;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import java.util.concurrent.Executor;
19 import org.checkerframework.checker.lock.qual.GuardedBy;
20 import org.eclipse.jdt.annotation.NonNull;
21 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
22 import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration;
23 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
24 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeNotificationListenerReply;
25 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
26 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
27 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
28 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 /**
33  * Proxy class for holding required state to lazily instantiate a listener registration with an
34  * asynchronously-discovered actor.
35  *
36  * @param <T> listener type
37  */
38 final class DataTreeChangeListenerProxy extends AbstractObjectRegistration<DOMDataTreeChangeListener> {
39     private static final Logger LOG = LoggerFactory.getLogger(DataTreeChangeListenerProxy.class);
40     private final ActorRef dataChangeListenerActor;
41     private final ActorUtils actorUtils;
42     private final YangInstanceIdentifier registeredPath;
43     private final boolean clustered;
44
45     @GuardedBy("this")
46     private ActorSelection listenerRegistrationActor;
47
48     @VisibleForTesting
49     private DataTreeChangeListenerProxy(final ActorUtils actorUtils, final DOMDataTreeChangeListener listener,
50             final YangInstanceIdentifier registeredPath, final boolean clustered, final String shardName) {
51         super(listener);
52         this.actorUtils = requireNonNull(actorUtils);
53         this.registeredPath = requireNonNull(registeredPath);
54         this.clustered = clustered;
55         dataChangeListenerActor = actorUtils.getActorSystem().actorOf(
56                 DataTreeChangeListenerActor.props(getInstance(), registeredPath)
57                     .withDispatcher(actorUtils.getNotificationDispatcherPath()));
58         LOG.debug("{}: Created actor {} for DTCL {}", actorUtils.getDatastoreContext().getLogicalStoreType(),
59                 dataChangeListenerActor, listener);
60     }
61
62     static @NonNull DataTreeChangeListenerProxy of(final ActorUtils actorUtils,
63             final DOMDataTreeChangeListener listener, final YangInstanceIdentifier registeredPath,
64             final boolean clustered, final String shardName) {
65         return ofTesting(actorUtils, listener, registeredPath, clustered, shardName, MoreExecutors.directExecutor());
66     }
67
68     @VisibleForTesting
69     static @NonNull DataTreeChangeListenerProxy ofTesting(final ActorUtils actorUtils,
70             final DOMDataTreeChangeListener listener, final YangInstanceIdentifier registeredPath,
71             final boolean clustered, final String shardName, final Executor executor) {
72         final var ret = new DataTreeChangeListenerProxy(actorUtils, listener, registeredPath, clustered, shardName);
73         executor.execute(() -> {
74             LOG.debug("{}: Starting discovery of shard {}", ret.logContext(), shardName);
75             actorUtils.findLocalShardAsync(shardName).onComplete(new OnComplete<>() {
76                 @Override
77                 public void onComplete(final Throwable failure, final ActorRef shard) {
78                     if (failure instanceof LocalShardNotFoundException) {
79                         LOG.debug("{}: No local shard found for {} - DataTreeChangeListener {} at path {} cannot be "
80                             + "registered", ret.logContext(), shardName, listener, registeredPath);
81                     } else if (failure != null) {
82                         LOG.error("{}: Failed to find local shard {} - DataTreeChangeListener {} at path {} cannot be "
83                             + "registered", ret.logContext(), shardName, listener, registeredPath, failure);
84                     } else {
85                         ret.doRegistration(shard);
86                     }
87                 }
88             }, actorUtils.getClientDispatcher());
89         });
90         return ret;
91     }
92
93     @Override
94     protected synchronized void removeRegistration() {
95         if (listenerRegistrationActor != null) {
96             listenerRegistrationActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(),
97                     ActorRef.noSender());
98             listenerRegistrationActor = null;
99         }
100
101         dataChangeListenerActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
102     }
103
104     private void setListenerRegistrationActor(final ActorSelection actor) {
105         if (actor == null) {
106             LOG.debug("{}: Ignoring null actor on {}", logContext(), this);
107             return;
108         }
109
110         synchronized (this) {
111             if (!isClosed()) {
112                 listenerRegistrationActor = actor;
113                 return;
114             }
115         }
116
117         // This registration has already been closed, notify the actor
118         actor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), null);
119     }
120
121     private void doRegistration(final ActorRef shard) {
122         actorUtils.executeOperationAsync(shard,
123             new RegisterDataTreeChangeListener(registeredPath, dataChangeListenerActor, clustered),
124             actorUtils.getDatastoreContext().getShardInitializationTimeout()).onComplete(new OnComplete<>() {
125                 @Override
126                 public void onComplete(final Throwable failure, final Object result) {
127                     if (failure != null) {
128                         LOG.error("{}: Failed to register DataTreeChangeListener {} at path {}", logContext(),
129                             getInstance(), registeredPath, failure);
130                     } else {
131                         setListenerRegistrationActor(actorUtils.actorSelection(
132                             ((RegisterDataTreeNotificationListenerReply) result).getListenerRegistrationPath()));
133                     }
134                 }
135             }, actorUtils.getClientDispatcher());
136     }
137
138     @VisibleForTesting
139     synchronized ActorSelection getListenerRegistrationActor() {
140         return listenerRegistrationActor;
141     }
142
143     @VisibleForTesting
144     ActorRef getDataChangeListenerActor() {
145         return dataChangeListenerActor;
146     }
147
148     private String logContext() {
149         return actorUtils.getDatastoreContext().getLogicalStoreType().toString();
150     }
151 }