+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import static com.google.common.base.Verify.verify;
+import static com.google.common.base.Verify.verifyNotNull;
+import static java.util.Objects.requireNonNull;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.PoisonPill;
+import akka.dispatch.OnComplete;
+import com.google.common.collect.Maps;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import org.checkerframework.checker.lock.qual.GuardedBy;
+import org.checkerframework.checker.lock.qual.Holding;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeNotificationListenerReply;
+import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class RootDataTreeChangeListenerProxy<L extends DOMDataTreeChangeListener>
+ extends AbstractListenerRegistration<L> {
+ private abstract static class State {
+
+ }
+
+ private static final class ResolveShards extends State {
+ final Map<String, Object> localShards = new HashMap<>();
+ final int shardCount;
+
+ ResolveShards(final int shardCount) {
+ this.shardCount = shardCount;
+ }
+ }
+
+ private static final class Subscribed extends State {
+ final List<ActorSelection> subscriptions;
+ final ActorRef dtclActor;
+
+ Subscribed(final ActorRef dtclActor, final int shardCount) {
+ this.dtclActor = requireNonNull(dtclActor);
+ subscriptions = new ArrayList<>(shardCount);
+ }
+ }
+
+ private static final class Terminated extends State {
+
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(RootDataTreeChangeListenerProxy.class);
+
+ private final ActorUtils actorUtils;
+
+ @GuardedBy("this")
+ private State state;
+
+ RootDataTreeChangeListenerProxy(final ActorUtils actorUtils, final @NonNull L listener,
+ final Set<String> shardNames) {
+ super(listener);
+ this.actorUtils = requireNonNull(actorUtils);
+ this.state = new ResolveShards(shardNames.size());
+
+ for (String shardName : shardNames) {
+ actorUtils.findLocalShardAsync(shardName).onComplete(new OnComplete<ActorRef>() {
+ @Override
+ public void onComplete(final Throwable failure, final ActorRef success) {
+ onFindLocalShardComplete(shardName, failure, success);
+ }
+ }, actorUtils.getClientDispatcher());
+ }
+ }
+
+ @Override
+ protected synchronized void removeRegistration() {
+ if (state instanceof Terminated) {
+ // Trivial case: we have already terminated on a failure, so this is a no-op
+ } else if (state instanceof ResolveShards) {
+ // Simple case: just mark the fact we were closed, terminating when resolution finishes
+ state = new Terminated();
+ } else if (state instanceof Subscribed) {
+ terminate((Subscribed) state);
+ } else {
+ throw new IllegalStateException("Unhandled close in state " + state);
+ }
+ }
+
+ @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
+ justification = "https://github.com/spotbugs/spotbugs/issues/811")
+ private synchronized void onFindLocalShardComplete(final String shardName, final Throwable failure,
+ final ActorRef shard) {
+ if (state instanceof ResolveShards) {
+ localShardsResolved((ResolveShards) state, shardName, failure, shard);
+ } else {
+ LOG.debug("{}: lookup for shard {} turned into a noop on state {}", logContext(), shardName, state);
+ }
+ }
+
+ @Holding("this")
+ private void localShardsResolved(final ResolveShards current, final String shardName, final Throwable failure,
+ final ActorRef shard) {
+ final Object result = failure != null ? failure : verifyNotNull(shard);
+ LOG.debug("{}: lookup for shard {} resulted in {}", logContext(), shardName, result);
+ current.localShards.put(shardName, result);
+
+ if (current.localShards.size() == current.shardCount) {
+ // We have all the responses we need
+ if (current.localShards.values().stream().anyMatch(Throwable.class::isInstance)) {
+ reportFailure(current.localShards);
+ } else {
+ subscribeToShards(current.localShards);
+ }
+ }
+ }
+
+ @Holding("this")
+ private void reportFailure(final Map<String, Object> localShards) {
+ for (Entry<String, Object> entry : Maps.filterValues(localShards, Throwable.class::isInstance).entrySet()) {
+ final Throwable cause = (Throwable) entry.getValue();
+ LOG.error("{}: Failed to find local shard {}, cannot register {} at root", logContext(), entry.getKey(),
+ getInstance(), cause);
+ }
+ state = new Terminated();
+ }
+
+ @Holding("this")
+ private void subscribeToShards(final Map<String, Object> localShards) {
+ // Safety check before we start doing anything
+ for (Entry<String, Object> entry : localShards.entrySet()) {
+ final Object obj = entry.getValue();
+ verify(obj instanceof ActorRef, "Unhandled response %s for shard %s", obj, entry.getKey());
+ }
+
+ // Instantiate the DTCL actor and update state
+ final ActorRef dtclActor = actorUtils.getActorSystem().actorOf(
+ RootDataTreeChangeListenerActor.props(getInstance(), localShards.size())
+ .withDispatcher(actorUtils.getNotificationDispatcherPath()));
+ state = new Subscribed(dtclActor, localShards.size());
+
+ // Subscribe to all shards
+ final RegisterDataTreeChangeListener regMessage = new RegisterDataTreeChangeListener(
+ YangInstanceIdentifier.empty(), dtclActor, true);
+ for (Entry<String, Object> entry : localShards.entrySet()) {
+ // Do not retain references to localShards
+ final String shardName = entry.getKey();
+ final ActorRef shard = (ActorRef) entry.getValue();
+
+ actorUtils.executeOperationAsync(shard, regMessage,
+ actorUtils.getDatastoreContext().getShardInitializationTimeout()).onComplete(new OnComplete<>() {
+ @Override
+ public void onComplete(final Throwable failure, final Object result) {
+ onShardSubscribed(shardName, failure, result);
+ }
+ }, actorUtils.getClientDispatcher());
+ }
+ }
+
+ @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
+ justification = "https://github.com/spotbugs/spotbugs/issues/811")
+ private synchronized void onShardSubscribed(final String shardName, final Throwable failure, final Object result) {
+ if (state instanceof Subscribed) {
+ final Subscribed current = (Subscribed) state;
+ if (failure != null) {
+ LOG.error("{}: Shard {} failed to subscribe, terminating listener {}", logContext(),
+ shardName,getInstance(), failure);
+ terminate(current);
+ } else {
+ onSuccessfulSubscription(current, shardName, (RegisterDataTreeNotificationListenerReply) result);
+ }
+ } else {
+ terminateSubscription(shardName, failure, result);
+ }
+ }
+
+ @Holding("this")
+ private void onSuccessfulSubscription(final Subscribed current, final String shardName,
+ final RegisterDataTreeNotificationListenerReply reply) {
+ final ActorSelection regActor = actorUtils.actorSelection(reply.getListenerRegistrationPath());
+ LOG.debug("{}: Shard {} subscribed at {}", logContext(), shardName, regActor);
+ current.subscriptions.add(regActor);
+ }
+
+ @Holding("this")
+ private void terminate(final Subscribed current) {
+ // Terminate the listener
+ current.dtclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ // Terminate all subscriptions
+ for (ActorSelection regActor : current.subscriptions) {
+ regActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), ActorRef.noSender());
+ }
+ state = new Terminated();
+ }
+
+ // This method should not modify internal state
+ private void terminateSubscription(final String shardName, final Throwable failure, final Object result) {
+ if (failure == null) {
+ final ActorSelection regActor = actorUtils.actorSelection(
+ ((RegisterDataTreeNotificationListenerReply) result).getListenerRegistrationPath());
+ LOG.debug("{}: Shard {} registered late, terminating subscription at {}", logContext(), shardName,
+ regActor);
+ regActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), ActorRef.noSender());
+ } else {
+ LOG.debug("{}: Shard {} reported late failure", logContext(), shardName, failure);
+ }
+ }
+
+ private String logContext() {
+ return actorUtils.getDatastoreContext().getLogicalStoreType().toString();
+ }
+}