2 * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import static com.google.common.base.Verify.verify;
11 import static com.google.common.base.Verify.verifyNotNull;
13 import akka.actor.ActorRef;
14 import akka.actor.Props;
15 import com.google.common.collect.Iterables;
16 import java.util.ArrayDeque;
17 import java.util.ArrayList;
18 import java.util.Collection;
19 import java.util.Deque;
20 import java.util.Iterator;
21 import java.util.LinkedHashMap;
22 import java.util.List;
24 import org.opendaylight.controller.cluster.datastore.messages.DataTreeChanged;
25 import org.opendaylight.controller.cluster.datastore.messages.OnInitialData;
26 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
27 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
28 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
29 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
30 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
31 import org.opendaylight.yangtools.yang.data.api.schema.builder.DataContainerNodeBuilder;
32 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
33 import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidate;
34 import org.opendaylight.yangtools.yang.data.tree.spi.DataTreeCandidateNodes;
35 import org.opendaylight.yangtools.yang.data.tree.spi.DataTreeCandidates;
36 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
38 final class RootDataTreeChangeListenerActor extends DataTreeChangeListenerActor {
39 private final int shardCount;
41 // Initial messages, retaining order in which we have received them
42 private Map<ActorRef, Object> initialMessages = new LinkedHashMap<>();
43 private Deque<DataTreeChanged> otherMessages = new ArrayDeque<>();
45 private RootDataTreeChangeListenerActor(final DOMDataTreeChangeListener listener, final int shardCount) {
46 super(listener, YangInstanceIdentifier.empty());
47 this.shardCount = shardCount;
51 void onInitialData(final OnInitialData message) {
52 final ActorRef sender = getSender();
53 verifyNotNull(initialMessages, "Received OnInitialData from %s after initial convergence", sender);
55 final Object prev = initialMessages.put(sender, message);
56 verify(prev == null, "Received OnInitialData from %s after %s", sender, prev);
57 checkInitialConvergence();
61 void dataTreeChanged(final DataTreeChanged message) {
62 if (initialMessages == null) {
63 super.dataTreeChanged(message);
65 processMessage(message);
69 private void processMessage(final DataTreeChanged message) {
70 // Put the message into initial messages if we do not have a message from that actor yet. If we do, just stash
71 // it to other messages for later processing.
72 if (initialMessages.putIfAbsent(getSender(), message) == null) {
73 checkInitialConvergence();
75 otherMessages.addLast(message);
79 private void checkInitialConvergence() {
80 if (initialMessages.size() != shardCount) {
81 // We do not have initial state from all shards yet
86 * We need to make-pretend that the data coming into the listener is coming from a single logical entity, where
87 * ordering is partially guaranteed (on shard boundaries). The data layout in shards is such that each DataTree
88 * is rooted at YangInstanceIdentifier.empty(), but their contents vary:
90 * 1) non-default shards contain immediate children of root from one module
91 * 2) default shard contains everything else
92 * 3) there is no overlap between shards
94 * When we subscribe to each of the shards, each of them will report root as being written, which is an accurate
95 * view from each shard's perspective, but it does not reflect the aggregate reality.
97 * Construct an overall NormalizedNode view of the entire datastore by combining first-level children from all
98 * reported initial state reports, report that node as written and then report any additional deltas.
100 final List<DataTreeCandidate> initialChanges = new ArrayList<>();
101 // Reserve first item
102 initialChanges.add(null);
104 final DataContainerNodeBuilder<NodeIdentifier, ContainerNode> rootBuilder = Builders.containerBuilder()
105 .withNodeIdentifier(NodeIdentifier.create(SchemaContext.NAME));
106 for (Object message : initialMessages.values()) {
107 if (message instanceof DataTreeChanged) {
108 final Collection<DataTreeCandidate> changes = ((DataTreeChanged) message).getChanges();
109 final DataTreeCandidate initial;
110 if (changes.size() != 1) {
111 final Iterator<DataTreeCandidate> it = changes.iterator();
113 // Append to changes to report as initial. This should not be happening (often?).
114 it.forEachRemaining(initialChanges::add);
116 initial = Iterables.get(changes, 0);
119 final NormalizedNode root = initial.getRootNode().getDataAfter().orElseThrow();
120 verify(root instanceof ContainerNode, "Unexpected root node %s", root);
121 ((ContainerNode) root).body().forEach(rootBuilder::withChild);
124 // We will not be intercepting any other messages, allow initial state to be reclaimed as soon as possible
125 initialMessages = null;
127 // Replace first element with the combined initial change, report initial changes and clear the map
128 initialChanges.set(0, DataTreeCandidates.newDataTreeCandidate(YangInstanceIdentifier.empty(),
129 DataTreeCandidateNodes.written(rootBuilder.build())));
130 super.dataTreeChanged(new DataTreeChanged(initialChanges));
132 // Now go through all messages we have held back and report them. Note we are removing them from the queue
133 // to allow them to be reclaimed as soon as possible.
134 for (DataTreeChanged message = otherMessages.poll(); message != null; message = otherMessages.poll()) {
135 super.dataTreeChanged(message);
137 otherMessages = null;
140 static Props props(final DOMDataTreeChangeListener instance, final int shardCount) {
141 return Props.create(RootDataTreeChangeListenerActor.class, instance, shardCount);