Add support for root DTCL listening on all shards in DS
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / RootDataTreeChangeListenerActor.java
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RootDataTreeChangeListenerActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RootDataTreeChangeListenerActor.java
new file mode 100644 (file)
index 0000000..963dea9
--- /dev/null
@@ -0,0 +1,138 @@
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import static com.google.common.base.Verify.verify;
+import static com.google.common.base.Verify.verifyNotNull;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import com.google.common.collect.Iterables;
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import org.opendaylight.controller.cluster.datastore.messages.DataTreeChanged;
+import org.opendaylight.controller.cluster.datastore.messages.OnInitialData;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNodes;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
+import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.DataContainerNodeBuilder;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
+final class RootDataTreeChangeListenerActor extends DataTreeChangeListenerActor {
+    private final int shardCount;
+
+    // Initial messages, retaining order in which we have received them
+    private Map<ActorRef, Object> initialMessages = new LinkedHashMap<>();
+    private Deque<DataTreeChanged> otherMessages = new ArrayDeque<>();
+
+    private RootDataTreeChangeListenerActor(final DOMDataTreeChangeListener listener, final int shardCount) {
+        super(listener, YangInstanceIdentifier.empty());
+        this.shardCount = shardCount;
+    }
+
+    @Override
+    void onInitialData(final OnInitialData message) {
+        final ActorRef sender = getSender();
+        verifyNotNull(initialMessages, "Received OnInitialData from %s after initial convergence", sender);
+
+        final Object prev = initialMessages.put(sender, message);
+        verify(prev == null, "Received OnInitialData from %s after %s", sender, prev);
+        checkInitialConvergence();
+    }
+
+    @Override
+    void dataTreeChanged(final DataTreeChanged message) {
+        if (initialMessages == null) {
+            super.dataTreeChanged(message);
+        } else {
+            processMessage(message);
+        }
+    }
+
+    private void processMessage(final DataTreeChanged message) {
+        // Put the message into initial messages if we do not have a message from that actor yet. If we do, just stash
+        // it to other messages for later processing.
+        if (initialMessages.putIfAbsent(getSender(), message) == null) {
+            checkInitialConvergence();
+        } else {
+            otherMessages.addLast(message);
+        }
+    }
+
+    private void checkInitialConvergence() {
+        if (initialMessages.size() != shardCount) {
+            // We do not have initial state from all shards yet
+            return;
+        }
+
+        /*
+         * We need to make-pretend that the data coming into the listener is coming from a single logical entity, where
+         * ordering is partially guaranteed (on shard boundaries). The data layout in shards is such that each DataTree
+         * is rooted at YangInstanceIdentifier.empty(), but their contents vary:
+         *
+         * 1) non-default shards contain immediate children of root from one module
+         * 2) default shard contains everything else
+         * 3) there is no overlap between shards
+         *
+         * When we subscribe to each of the shards, each of them will report root as being written, which is an accurate
+         * view from each shard's perspective, but it does not reflect the aggregate reality.
+         *
+         * Construct an overall NormalizedNode view of the entire datastore by combining first-level children from all
+         * reported initial state reports, report that node as written and then report any additional deltas.
+         */
+        final Deque<DataTreeCandidate> initialChanges = new ArrayDeque<>();
+        final DataContainerNodeBuilder<NodeIdentifier, ContainerNode> rootBuilder = Builders.containerBuilder()
+                .withNodeIdentifier(NodeIdentifier.create(SchemaContext.NAME));
+        for (Object message : initialMessages.values()) {
+            if (message instanceof DataTreeChanged) {
+                final Collection<DataTreeCandidate> changes = ((DataTreeChanged) message).getChanges();
+                final DataTreeCandidate initial;
+                if (changes.size() != 1) {
+                    final Iterator<DataTreeCandidate> it = changes.iterator();
+                    initial = it.next();
+                    // Append to changes to report as initial. This should not be happening (often?).
+                    it.forEachRemaining(initialChanges::addLast);
+                } else {
+                    initial = Iterables.get(changes, 0);
+                }
+
+                final NormalizedNode<?, ?> root = initial.getRootNode().getDataAfter().orElseThrow();
+                verify(root instanceof ContainerNode, "Unexpected root node %s", root);
+                ((ContainerNode) root).getValue().forEach(rootBuilder::withChild);
+            }
+        }
+        // We will not be intercepting any other messages, allow initial state to be reclaimed as soon as possible
+        initialMessages = null;
+
+        // Prepend combined initial changed and report initial changes and clear the map
+        initialChanges.addFirst(DataTreeCandidates.newDataTreeCandidate(YangInstanceIdentifier.empty(),
+            DataTreeCandidateNodes.written(rootBuilder.build())));
+        super.dataTreeChanged(new DataTreeChanged(initialChanges));
+
+        // Now go through all messages we have held back and report them. Note we are removing them from the queue
+        // to allow them to be reclaimed as soon as possible.
+        for (DataTreeChanged message = otherMessages.poll(); message != null; message = otherMessages.poll()) {
+            super.dataTreeChanged(message);
+        }
+        otherMessages = null;
+    }
+
+    static Props props(final DOMDataTreeChangeListener instance, final int shardCount) {
+        return Props.create(RootDataTreeChangeListenerActor.class, shardCount);
+    }
+}