2 * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import static com.google.common.base.Verify.verify;
11 import static com.google.common.base.Verify.verifyNotNull;
12 import static java.util.Objects.requireNonNull;
14 import akka.actor.ActorRef;
15 import akka.actor.ActorSelection;
16 import akka.actor.PoisonPill;
17 import akka.dispatch.OnComplete;
18 import com.google.common.collect.Maps;
19 import java.util.ArrayList;
20 import java.util.HashMap;
21 import java.util.List;
23 import java.util.Map.Entry;
25 import org.checkerframework.checker.lock.qual.GuardedBy;
26 import org.checkerframework.checker.lock.qual.Holding;
27 import org.eclipse.jdt.annotation.NonNull;
28 import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration;
29 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
30 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeNotificationListenerReply;
31 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
32 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
33 import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
34 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
38 final class RootDataTreeChangeListenerProxy<L extends DOMDataTreeChangeListener>
39 extends AbstractListenerRegistration<L> {
40 private abstract static class State {
44 private static final class ResolveShards extends State {
45 final Map<String, Object> localShards = new HashMap<>();
48 ResolveShards(final int shardCount) {
49 this.shardCount = shardCount;
53 private static final class Subscribed extends State {
54 final List<ActorSelection> subscriptions;
55 final ActorRef dtclActor;
57 Subscribed(final ActorRef dtclActor, final int shardCount) {
58 this.dtclActor = requireNonNull(dtclActor);
59 subscriptions = new ArrayList<>(shardCount);
63 private static final class Terminated extends State {
67 private static final Logger LOG = LoggerFactory.getLogger(RootDataTreeChangeListenerProxy.class);
69 private final ActorUtils actorUtils;
74 RootDataTreeChangeListenerProxy(final ActorUtils actorUtils, final @NonNull L listener,
75 final Set<String> shardNames) {
77 this.actorUtils = requireNonNull(actorUtils);
78 this.state = new ResolveShards(shardNames.size());
80 for (String shardName : shardNames) {
81 actorUtils.findLocalShardAsync(shardName).onComplete(new OnComplete<ActorRef>() {
83 public void onComplete(final Throwable failure, final ActorRef success) {
84 onFindLocalShardComplete(shardName, failure, success);
86 }, actorUtils.getClientDispatcher());
91 protected synchronized void removeRegistration() {
92 if (state instanceof Terminated) {
93 // Trivial case: we have already terminated on a failure, so this is a no-op
94 } else if (state instanceof ResolveShards) {
95 // Simple case: just mark the fact we were closed, terminating when resolution finishes
96 state = new Terminated();
97 } else if (state instanceof Subscribed) {
98 terminate((Subscribed) state);
100 throw new IllegalStateException("Unhandled close in state " + state);
104 private synchronized void onFindLocalShardComplete(final String shardName, final Throwable failure,
105 final ActorRef shard) {
106 if (state instanceof ResolveShards) {
107 localShardsResolved((ResolveShards) state, shardName, failure, shard);
109 LOG.debug("{}: lookup for shard {} turned into a noop on state {}", logContext(), shardName, state);
114 private void localShardsResolved(final ResolveShards current, final String shardName, final Throwable failure,
115 final ActorRef shard) {
116 final Object result = failure != null ? failure : verifyNotNull(shard);
117 LOG.debug("{}: lookup for shard {} resulted in {}", logContext(), shardName, result);
118 current.localShards.put(shardName, result);
120 if (current.localShards.size() == current.shardCount) {
121 // We have all the responses we need
122 if (current.localShards.values().stream().anyMatch(Throwable.class::isInstance)) {
123 reportFailure(current.localShards);
125 subscribeToShards(current.localShards);
131 private void reportFailure(final Map<String, Object> localShards) {
132 for (Entry<String, Object> entry : Maps.filterValues(localShards, Throwable.class::isInstance).entrySet()) {
133 final Throwable cause = (Throwable) entry.getValue();
134 LOG.error("{}: Failed to find local shard {}, cannot register {} at root", logContext(), entry.getKey(),
135 getInstance(), cause);
137 state = new Terminated();
141 private void subscribeToShards(final Map<String, Object> localShards) {
142 // Safety check before we start doing anything
143 for (Entry<String, Object> entry : localShards.entrySet()) {
144 final Object obj = entry.getValue();
145 verify(obj instanceof ActorRef, "Unhandled response %s for shard %s", obj, entry.getKey());
148 // Instantiate the DTCL actor and update state
149 final ActorRef dtclActor = actorUtils.getActorSystem().actorOf(
150 RootDataTreeChangeListenerActor.props(getInstance(), localShards.size())
151 .withDispatcher(actorUtils.getNotificationDispatcherPath()));
152 state = new Subscribed(dtclActor, localShards.size());
154 // Subscribe to all shards
155 final RegisterDataTreeChangeListener regMessage = new RegisterDataTreeChangeListener(
156 YangInstanceIdentifier.empty(), dtclActor, true);
157 for (Entry<String, Object> entry : localShards.entrySet()) {
158 // Do not retain references to localShards
159 final String shardName = entry.getKey();
160 final ActorRef shard = (ActorRef) entry.getValue();
162 actorUtils.executeOperationAsync(shard, regMessage,
163 actorUtils.getDatastoreContext().getShardInitializationTimeout()).onComplete(new OnComplete<>() {
165 public void onComplete(final Throwable failure, final Object result) {
166 onShardSubscribed(shardName, failure, result);
168 }, actorUtils.getClientDispatcher());
172 private synchronized void onShardSubscribed(final String shardName, final Throwable failure, final Object result) {
173 if (state instanceof Subscribed) {
174 final Subscribed current = (Subscribed) state;
175 if (failure != null) {
176 LOG.error("{}: Shard {} failed to subscribe, terminating listener {}", logContext(),
177 shardName,getInstance(), failure);
180 onSuccessfulSubscription(current, shardName, (RegisterDataTreeNotificationListenerReply) result);
183 terminateSubscription(shardName, failure, result);
188 private void onSuccessfulSubscription(final Subscribed current, final String shardName,
189 final RegisterDataTreeNotificationListenerReply reply) {
190 final ActorSelection regActor = actorUtils.actorSelection(reply.getListenerRegistrationPath());
191 LOG.debug("{}: Shard {} subscribed at {}", logContext(), shardName, regActor);
192 current.subscriptions.add(regActor);
196 private void terminate(final Subscribed current) {
197 // Terminate the listener
198 current.dtclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
199 // Terminate all subscriptions
200 for (ActorSelection regActor : current.subscriptions) {
201 regActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), ActorRef.noSender());
203 state = new Terminated();
206 // This method should not modify internal state
207 private void terminateSubscription(final String shardName, final Throwable failure, final Object result) {
208 if (failure == null) {
209 final ActorSelection regActor = actorUtils.actorSelection(
210 ((RegisterDataTreeNotificationListenerReply) result).getListenerRegistrationPath());
211 LOG.debug("{}: Shard {} registered late, terminating subscription at {}", logContext(), shardName,
213 regActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), ActorRef.noSender());
215 LOG.debug("{}: Shard {} reported late failure", logContext(), shardName, failure);
219 private String logContext() {
220 return actorUtils.getDatastoreContext().getLogicalStoreType().toString();