2 * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import static com.google.common.base.Verify.verify;
11 import static com.google.common.base.Verify.verifyNotNull;
13 import akka.actor.ActorRef;
14 import akka.actor.Props;
15 import com.google.common.collect.Iterables;
16 import java.util.ArrayDeque;
17 import java.util.Collection;
18 import java.util.Deque;
19 import java.util.Iterator;
20 import java.util.LinkedHashMap;
22 import org.opendaylight.controller.cluster.datastore.messages.DataTreeChanged;
23 import org.opendaylight.controller.cluster.datastore.messages.OnInitialData;
24 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
25 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
26 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
27 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
28 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
29 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
30 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNodes;
31 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
32 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
33 import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.DataContainerNodeBuilder;
34 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
36 final class RootDataTreeChangeListenerActor extends DataTreeChangeListenerActor {
37 private final int shardCount;
39 // Initial messages, retaining order in which we have received them
40 private Map<ActorRef, Object> initialMessages = new LinkedHashMap<>();
41 private Deque<DataTreeChanged> otherMessages = new ArrayDeque<>();
43 private RootDataTreeChangeListenerActor(final DOMDataTreeChangeListener listener, final int shardCount) {
44 super(listener, YangInstanceIdentifier.empty());
45 this.shardCount = shardCount;
49 void onInitialData(final OnInitialData message) {
50 final ActorRef sender = getSender();
51 verifyNotNull(initialMessages, "Received OnInitialData from %s after initial convergence", sender);
53 final Object prev = initialMessages.put(sender, message);
54 verify(prev == null, "Received OnInitialData from %s after %s", sender, prev);
55 checkInitialConvergence();
59 void dataTreeChanged(final DataTreeChanged message) {
60 if (initialMessages == null) {
61 super.dataTreeChanged(message);
63 processMessage(message);
67 private void processMessage(final DataTreeChanged message) {
68 // Put the message into initial messages if we do not have a message from that actor yet. If we do, just stash
69 // it to other messages for later processing.
70 if (initialMessages.putIfAbsent(getSender(), message) == null) {
71 checkInitialConvergence();
73 otherMessages.addLast(message);
77 private void checkInitialConvergence() {
78 if (initialMessages.size() != shardCount) {
79 // We do not have initial state from all shards yet
84 * We need to make-pretend that the data coming into the listener is coming from a single logical entity, where
85 * ordering is partially guaranteed (on shard boundaries). The data layout in shards is such that each DataTree
86 * is rooted at YangInstanceIdentifier.empty(), but their contents vary:
88 * 1) non-default shards contain immediate children of root from one module
89 * 2) default shard contains everything else
90 * 3) there is no overlap between shards
92 * When we subscribe to each of the shards, each of them will report root as being written, which is an accurate
93 * view from each shard's perspective, but it does not reflect the aggregate reality.
95 * Construct an overall NormalizedNode view of the entire datastore by combining first-level children from all
96 * reported initial state reports, report that node as written and then report any additional deltas.
98 final Deque<DataTreeCandidate> initialChanges = new ArrayDeque<>();
99 final DataContainerNodeBuilder<NodeIdentifier, ContainerNode> rootBuilder = Builders.containerBuilder()
100 .withNodeIdentifier(NodeIdentifier.create(SchemaContext.NAME));
101 for (Object message : initialMessages.values()) {
102 if (message instanceof DataTreeChanged) {
103 final Collection<DataTreeCandidate> changes = ((DataTreeChanged) message).getChanges();
104 final DataTreeCandidate initial;
105 if (changes.size() != 1) {
106 final Iterator<DataTreeCandidate> it = changes.iterator();
108 // Append to changes to report as initial. This should not be happening (often?).
109 it.forEachRemaining(initialChanges::addLast);
111 initial = Iterables.get(changes, 0);
114 final NormalizedNode<?, ?> root = initial.getRootNode().getDataAfter().orElseThrow();
115 verify(root instanceof ContainerNode, "Unexpected root node %s", root);
116 ((ContainerNode) root).getValue().forEach(rootBuilder::withChild);
119 // We will not be intercepting any other messages, allow initial state to be reclaimed as soon as possible
120 initialMessages = null;
122 // Prepend combined initial changed and report initial changes and clear the map
123 initialChanges.addFirst(DataTreeCandidates.newDataTreeCandidate(YangInstanceIdentifier.empty(),
124 DataTreeCandidateNodes.written(rootBuilder.build())));
125 super.dataTreeChanged(new DataTreeChanged(initialChanges));
127 // Now go through all messages we have held back and report them. Note we are removing them from the queue
128 // to allow them to be reclaimed as soon as possible.
129 for (DataTreeChanged message = otherMessages.poll(); message != null; message = otherMessages.poll()) {
130 super.dataTreeChanged(message);
132 otherMessages = null;
135 static Props props(final DOMDataTreeChangeListener instance, final int shardCount) {
136 return Props.create(RootDataTreeChangeListenerActor.class, instance, shardCount);