Add support for root DTCL listening on all shards in DS 04/88804/15
authorRobert Varga <robert.varga@pantheon.tech>
Tue, 19 May 2020 14:37:47 +0000 (16:37 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Thu, 21 May 2020 11:17:58 +0000 (13:17 +0200)
Devide DTCLProxy into DTCLSingleShardProxy, DTCLMultiShardProxy and
DTCLPrefixShardProxy to address the different registration mechanisms
used in all three cases.

JIRA: CONTROLLER-1932
Change-Id: I48732577f26fa5844b69a2feaddb02fe53909da7
Signed-off-by: Tibor Král <tibor.kral@pantheon.tech>
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/AbstractDOMBrokerWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerActor.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RootDataTreeChangeListenerActor.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RootDataTreeChangeListenerProxy.java [new file with mode: 0644]

index 2e66571cea712bb50804a9f4b72b5415203e33f5..39703f0f67b9168d5d400c223b0f59fe939ad50f 100644 (file)
@@ -84,8 +84,10 @@ public abstract class AbstractDOMBrokerWriteTransaction<T extends DOMStoreWriteT
             final NormalizedNode<?, ?> data) {
         checkArgument(data != null, "Attempted to store null data at %s", path);
         final PathArgument lastArg = path.getLastPathArgument();
-        checkArgument(lastArg == data.getIdentifier() || lastArg != null && lastArg.equals(data.getIdentifier()),
+        if (lastArg != null) {
+            checkArgument(lastArg.equals(data.getIdentifier()),
                 "Instance identifier references %s but data identifier is %s", lastArg, data);
+        }
     }
 
     @Override
index cddf234ffc47274947956d84a76322aa9bc7d174..8537c85698fa6a2759c224da3a7488d8c24dd95d 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.Objects.requireNonNull;
 
 import akka.actor.ActorRef;
@@ -17,6 +18,7 @@ import com.google.common.annotations.Beta;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Throwables;
 import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -159,6 +161,21 @@ public abstract class AbstractDataStore implements DistributedDataStoreInterface
         requireNonNull(treeId, "treeId should not be null");
         requireNonNull(listener, "listener should not be null");
 
+        /*
+         * We need to potentially deal with multi-shard composition for registration targeting the root of the data
+         * store. If that is the case, we delegate to a more complicated setup invol
+         */
+        if (treeId.isEmpty()) {
+            // User is targeting root of the datastore. If there is more than one shard, we have to register with them
+            // all and perform data composition.
+            final Set<String> shardNames = actorUtils.getConfiguration().getAllShardNames();
+            if (shardNames.size() > 1) {
+                checkArgument(listener instanceof ClusteredDOMDataTreeChangeListener,
+                    "Cannot listen on root without non-clustered listener %s", listener);
+                return new RootDataTreeChangeListenerProxy<>(actorUtils, listener, shardNames);
+            }
+        }
+
         final String shardName = actorUtils.getShardStrategyFactory().getStrategy(treeId).findShard(treeId);
         LOG.debug("Registering tree listener: {} for tree: {} shard: {}", listener, treeId, shardName);
 
@@ -169,7 +186,6 @@ public abstract class AbstractDataStore implements DistributedDataStoreInterface
         return listenerRegistrationProxy;
     }
 
-
     @Override
     public <C extends DOMDataTreeCommitCohort> DOMDataTreeCommitCohortRegistration<C> registerCommitCohort(
             final DOMDataTreeIdentifier subtree, final C cohort) {
index 8df53b276ad79cd3c37f2ad1338f4693559c2740..5533fc8fad683756861305a45589115b422e4800 100644 (file)
@@ -24,25 +24,26 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
  * Proxy actor which acts as a facade to the user-provided listener. Responsible for decapsulating
  * DataTreeChanged messages and dispatching their context to the user.
  */
-final class DataTreeChangeListenerActor extends AbstractUntypedActor {
+class DataTreeChangeListenerActor extends AbstractUntypedActor {
     private final DOMDataTreeChangeListener listener;
     private final YangInstanceIdentifier registeredPath;
+
     private boolean notificationsEnabled = false;
     private long notificationCount;
     private String logContext = "";
 
-    private DataTreeChangeListenerActor(final DOMDataTreeChangeListener listener,
+    DataTreeChangeListenerActor(final DOMDataTreeChangeListener listener,
             final YangInstanceIdentifier registeredPath) {
         this.listener = requireNonNull(listener);
         this.registeredPath = requireNonNull(registeredPath);
     }
 
     @Override
-    protected void handleReceive(final Object message) {
+    protected final void handleReceive(final Object message) {
         if (message instanceof DataTreeChanged) {
-            dataChanged((DataTreeChanged)message);
+            dataTreeChanged((DataTreeChanged) message);
         } else if (message instanceof OnInitialData) {
-            onInitialData();
+            onInitialData((OnInitialData) message);
         } else if (message instanceof EnableNotification) {
             enableNotification((EnableNotification) message);
         } else if (message instanceof GetInfo) {
@@ -54,7 +55,7 @@ final class DataTreeChangeListenerActor extends AbstractUntypedActor {
     }
 
     @SuppressWarnings("checkstyle:IllegalCatch")
-    private void onInitialData() {
+    void onInitialData(final OnInitialData message) {
         LOG.debug("{}: Notifying onInitialData to listener {}", logContext, listener);
 
         try {
@@ -65,7 +66,7 @@ final class DataTreeChangeListenerActor extends AbstractUntypedActor {
     }
 
     @SuppressWarnings("checkstyle:IllegalCatch")
-    private void dataChanged(final DataTreeChanged message) {
+    void dataTreeChanged(final DataTreeChanged message) {
         // Do nothing if notifications are not enabled
         if (!notificationsEnabled) {
             LOG.debug("{}: Notifications not enabled for listener {} - dropping change notification",
@@ -99,7 +100,7 @@ final class DataTreeChangeListenerActor extends AbstractUntypedActor {
                 listener);
     }
 
-    public static Props props(final DOMDataTreeChangeListener listener, final YangInstanceIdentifier registeredPath) {
+    static Props props(final DOMDataTreeChangeListener listener, final YangInstanceIdentifier registeredPath) {
         return Props.create(DataTreeChangeListenerActor.class, listener, registeredPath);
     }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RootDataTreeChangeListenerActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RootDataTreeChangeListenerActor.java
new file mode 100644 (file)
index 0000000..963dea9
--- /dev/null
@@ -0,0 +1,138 @@
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import static com.google.common.base.Verify.verify;
+import static com.google.common.base.Verify.verifyNotNull;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import com.google.common.collect.Iterables;
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import org.opendaylight.controller.cluster.datastore.messages.DataTreeChanged;
+import org.opendaylight.controller.cluster.datastore.messages.OnInitialData;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNodes;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
+import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.DataContainerNodeBuilder;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
+final class RootDataTreeChangeListenerActor extends DataTreeChangeListenerActor {
+    private final int shardCount;
+
+    // Initial messages, retaining order in which we have received them
+    private Map<ActorRef, Object> initialMessages = new LinkedHashMap<>();
+    private Deque<DataTreeChanged> otherMessages = new ArrayDeque<>();
+
+    private RootDataTreeChangeListenerActor(final DOMDataTreeChangeListener listener, final int shardCount) {
+        super(listener, YangInstanceIdentifier.empty());
+        this.shardCount = shardCount;
+    }
+
+    @Override
+    void onInitialData(final OnInitialData message) {
+        final ActorRef sender = getSender();
+        verifyNotNull(initialMessages, "Received OnInitialData from %s after initial convergence", sender);
+
+        final Object prev = initialMessages.put(sender, message);
+        verify(prev == null, "Received OnInitialData from %s after %s", sender, prev);
+        checkInitialConvergence();
+    }
+
+    @Override
+    void dataTreeChanged(final DataTreeChanged message) {
+        if (initialMessages == null) {
+            super.dataTreeChanged(message);
+        } else {
+            processMessage(message);
+        }
+    }
+
+    private void processMessage(final DataTreeChanged message) {
+        // Put the message into initial messages if we do not have a message from that actor yet. If we do, just stash
+        // it to other messages for later processing.
+        if (initialMessages.putIfAbsent(getSender(), message) == null) {
+            checkInitialConvergence();
+        } else {
+            otherMessages.addLast(message);
+        }
+    }
+
+    private void checkInitialConvergence() {
+        if (initialMessages.size() != shardCount) {
+            // We do not have initial state from all shards yet
+            return;
+        }
+
+        /*
+         * We need to make-pretend that the data coming into the listener is coming from a single logical entity, where
+         * ordering is partially guaranteed (on shard boundaries). The data layout in shards is such that each DataTree
+         * is rooted at YangInstanceIdentifier.empty(), but their contents vary:
+         *
+         * 1) non-default shards contain immediate children of root from one module
+         * 2) default shard contains everything else
+         * 3) there is no overlap between shards
+         *
+         * When we subscribe to each of the shards, each of them will report root as being written, which is an accurate
+         * view from each shard's perspective, but it does not reflect the aggregate reality.
+         *
+         * Construct an overall NormalizedNode view of the entire datastore by combining first-level children from all
+         * reported initial state reports, report that node as written and then report any additional deltas.
+         */
+        final Deque<DataTreeCandidate> initialChanges = new ArrayDeque<>();
+        final DataContainerNodeBuilder<NodeIdentifier, ContainerNode> rootBuilder = Builders.containerBuilder()
+                .withNodeIdentifier(NodeIdentifier.create(SchemaContext.NAME));
+        for (Object message : initialMessages.values()) {
+            if (message instanceof DataTreeChanged) {
+                final Collection<DataTreeCandidate> changes = ((DataTreeChanged) message).getChanges();
+                final DataTreeCandidate initial;
+                if (changes.size() != 1) {
+                    final Iterator<DataTreeCandidate> it = changes.iterator();
+                    initial = it.next();
+                    // Append to changes to report as initial. This should not be happening (often?).
+                    it.forEachRemaining(initialChanges::addLast);
+                } else {
+                    initial = Iterables.get(changes, 0);
+                }
+
+                final NormalizedNode<?, ?> root = initial.getRootNode().getDataAfter().orElseThrow();
+                verify(root instanceof ContainerNode, "Unexpected root node %s", root);
+                ((ContainerNode) root).getValue().forEach(rootBuilder::withChild);
+            }
+        }
+        // We will not be intercepting any other messages, allow initial state to be reclaimed as soon as possible
+        initialMessages = null;
+
+        // Prepend combined initial changed and report initial changes and clear the map
+        initialChanges.addFirst(DataTreeCandidates.newDataTreeCandidate(YangInstanceIdentifier.empty(),
+            DataTreeCandidateNodes.written(rootBuilder.build())));
+        super.dataTreeChanged(new DataTreeChanged(initialChanges));
+
+        // Now go through all messages we have held back and report them. Note we are removing them from the queue
+        // to allow them to be reclaimed as soon as possible.
+        for (DataTreeChanged message = otherMessages.poll(); message != null; message = otherMessages.poll()) {
+            super.dataTreeChanged(message);
+        }
+        otherMessages = null;
+    }
+
+    static Props props(final DOMDataTreeChangeListener instance, final int shardCount) {
+        return Props.create(RootDataTreeChangeListenerActor.class, shardCount);
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RootDataTreeChangeListenerProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RootDataTreeChangeListenerProxy.java
new file mode 100644 (file)
index 0000000..6f4a5f1
--- /dev/null
@@ -0,0 +1,227 @@
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import static com.google.common.base.Verify.verify;
+import static com.google.common.base.Verify.verifyNotNull;
+import static java.util.Objects.requireNonNull;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.PoisonPill;
+import akka.dispatch.OnComplete;
+import com.google.common.collect.Maps;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import org.checkerframework.checker.lock.qual.GuardedBy;
+import org.checkerframework.checker.lock.qual.Holding;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeNotificationListenerReply;
+import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class RootDataTreeChangeListenerProxy<L extends DOMDataTreeChangeListener>
+        extends AbstractListenerRegistration<L> {
+    private abstract static class State {
+
+    }
+
+    private static final class ResolveShards extends State {
+        final Map<String, Object> localShards = new HashMap<>();
+        final int shardCount;
+
+        ResolveShards(final int shardCount) {
+            this.shardCount = shardCount;
+        }
+    }
+
+    private static final class Subscribed extends State {
+        final List<ActorSelection> subscriptions;
+        final ActorRef dtclActor;
+
+        Subscribed(final ActorRef dtclActor, final int shardCount) {
+            this.dtclActor = requireNonNull(dtclActor);
+            subscriptions = new ArrayList<>(shardCount);
+        }
+    }
+
+    private static final class Terminated extends State {
+
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(RootDataTreeChangeListenerProxy.class);
+
+    private final ActorUtils actorUtils;
+
+    @GuardedBy("this")
+    private State state;
+
+    RootDataTreeChangeListenerProxy(final ActorUtils actorUtils, final @NonNull L listener,
+            final Set<String> shardNames) {
+        super(listener);
+        this.actorUtils = requireNonNull(actorUtils);
+        this.state = new ResolveShards(shardNames.size());
+
+        for (String shardName : shardNames) {
+            actorUtils.findLocalShardAsync(shardName).onComplete(new OnComplete<ActorRef>() {
+                @Override
+                public void onComplete(final Throwable failure, final ActorRef success) {
+                    onFindLocalShardComplete(shardName, failure, success);
+                }
+            }, actorUtils.getClientDispatcher());
+        }
+    }
+
+    @Override
+    protected synchronized void removeRegistration() {
+        if (state instanceof Terminated) {
+            // Trivial case: we have already terminated on a failure, so this is a no-op
+        } else if (state instanceof ResolveShards) {
+            // Simple case: just mark the fact we were closed, terminating when resolution finishes
+            state = new Terminated();
+        } else if (state instanceof Subscribed) {
+            terminate((Subscribed) state);
+        } else {
+            throw new IllegalStateException("Unhandled close in state " + state);
+        }
+    }
+
+    @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
+            justification = "https://github.com/spotbugs/spotbugs/issues/811")
+    private synchronized void onFindLocalShardComplete(final String shardName, final Throwable failure,
+            final ActorRef shard) {
+        if (state instanceof ResolveShards) {
+            localShardsResolved((ResolveShards) state, shardName, failure, shard);
+        } else {
+            LOG.debug("{}: lookup for shard {} turned into a noop on state {}", logContext(), shardName, state);
+        }
+    }
+
+    @Holding("this")
+    private void localShardsResolved(final ResolveShards current, final String shardName, final Throwable failure,
+            final ActorRef shard) {
+        final Object result = failure != null ? failure : verifyNotNull(shard);
+        LOG.debug("{}: lookup for shard {} resulted in {}", logContext(), shardName, result);
+        current.localShards.put(shardName, result);
+
+        if (current.localShards.size() == current.shardCount) {
+            // We have all the responses we need
+            if (current.localShards.values().stream().anyMatch(Throwable.class::isInstance)) {
+                reportFailure(current.localShards);
+            } else {
+                subscribeToShards(current.localShards);
+            }
+        }
+    }
+
+    @Holding("this")
+    private void reportFailure(final Map<String, Object> localShards) {
+        for (Entry<String, Object> entry : Maps.filterValues(localShards, Throwable.class::isInstance).entrySet()) {
+            final Throwable cause = (Throwable) entry.getValue();
+            LOG.error("{}: Failed to find local shard {}, cannot register {} at root", logContext(), entry.getKey(),
+                getInstance(), cause);
+        }
+        state = new Terminated();
+    }
+
+    @Holding("this")
+    private void subscribeToShards(final Map<String, Object> localShards) {
+        // Safety check before we start doing anything
+        for (Entry<String, Object> entry : localShards.entrySet()) {
+            final Object obj = entry.getValue();
+            verify(obj instanceof ActorRef, "Unhandled response %s for shard %s", obj, entry.getKey());
+        }
+
+        // Instantiate the DTCL actor and update state
+        final ActorRef dtclActor = actorUtils.getActorSystem().actorOf(
+            RootDataTreeChangeListenerActor.props(getInstance(), localShards.size())
+              .withDispatcher(actorUtils.getNotificationDispatcherPath()));
+        state = new Subscribed(dtclActor, localShards.size());
+
+        // Subscribe to all shards
+        final RegisterDataTreeChangeListener regMessage = new RegisterDataTreeChangeListener(
+            YangInstanceIdentifier.empty(), dtclActor, true);
+        for (Entry<String, Object> entry : localShards.entrySet()) {
+            // Do not retain references to localShards
+            final String shardName = entry.getKey();
+            final ActorRef shard = (ActorRef) entry.getValue();
+
+            actorUtils.executeOperationAsync(shard, regMessage,
+                actorUtils.getDatastoreContext().getShardInitializationTimeout()).onComplete(new OnComplete<>() {
+                    @Override
+                    public void onComplete(final Throwable failure, final Object result) {
+                        onShardSubscribed(shardName, failure, result);
+                    }
+                }, actorUtils.getClientDispatcher());
+        }
+    }
+
+    @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
+            justification = "https://github.com/spotbugs/spotbugs/issues/811")
+    private synchronized void onShardSubscribed(final String shardName, final Throwable failure, final Object result) {
+        if (state instanceof Subscribed) {
+            final Subscribed current = (Subscribed) state;
+            if (failure != null) {
+                LOG.error("{}: Shard {} failed to subscribe, terminating listener {}", logContext(),
+                    shardName,getInstance(), failure);
+                terminate(current);
+            } else {
+                onSuccessfulSubscription(current, shardName, (RegisterDataTreeNotificationListenerReply) result);
+            }
+        } else {
+            terminateSubscription(shardName, failure, result);
+        }
+    }
+
+    @Holding("this")
+    private void onSuccessfulSubscription(final Subscribed current, final String shardName,
+            final RegisterDataTreeNotificationListenerReply reply) {
+        final ActorSelection regActor = actorUtils.actorSelection(reply.getListenerRegistrationPath());
+        LOG.debug("{}: Shard {} subscribed at {}", logContext(), shardName, regActor);
+        current.subscriptions.add(regActor);
+    }
+
+    @Holding("this")
+    private void terminate(final Subscribed current) {
+        // Terminate the listener
+        current.dtclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        // Terminate all subscriptions
+        for (ActorSelection regActor : current.subscriptions) {
+            regActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), ActorRef.noSender());
+        }
+        state = new Terminated();
+    }
+
+    // This method should not modify internal state
+    private void terminateSubscription(final String shardName, final Throwable failure, final Object result) {
+        if (failure == null) {
+            final ActorSelection regActor = actorUtils.actorSelection(
+                ((RegisterDataTreeNotificationListenerReply) result).getListenerRegistrationPath());
+            LOG.debug("{}: Shard {} registered late, terminating subscription at {}", logContext(), shardName,
+                regActor);
+            regActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), ActorRef.noSender());
+        } else {
+            LOG.debug("{}: Shard {} reported late failure", logContext(), shardName, failure);
+        }
+    }
+
+    private String logContext() {
+        return actorUtils.getDatastoreContext().getLogicalStoreType().toString();
+    }
+}