NormalizedNodeAggregator should also report empty
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / RootDataTreeChangeListenerActor.java
1 /*
2  * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static com.google.common.base.Verify.verify;
11 import static com.google.common.base.Verify.verifyNotNull;
12
13 import akka.actor.ActorRef;
14 import akka.actor.Props;
15 import com.google.common.collect.Iterables;
16 import java.util.ArrayDeque;
17 import java.util.ArrayList;
18 import java.util.Collection;
19 import java.util.Deque;
20 import java.util.Iterator;
21 import java.util.LinkedHashMap;
22 import java.util.List;
23 import java.util.Map;
24 import org.opendaylight.controller.cluster.datastore.messages.DataTreeChanged;
25 import org.opendaylight.controller.cluster.datastore.messages.OnInitialData;
26 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
27 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
28 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
29 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
30 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
31 import org.opendaylight.yangtools.yang.data.api.schema.builder.DataContainerNodeBuilder;
32 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
33 import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidate;
34 import org.opendaylight.yangtools.yang.data.tree.spi.DataTreeCandidateNodes;
35 import org.opendaylight.yangtools.yang.data.tree.spi.DataTreeCandidates;
36 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
37
38 final class RootDataTreeChangeListenerActor extends DataTreeChangeListenerActor {
39     private final int shardCount;
40
41     // Initial messages, retaining order in which we have received them
42     private Map<ActorRef, Object> initialMessages = new LinkedHashMap<>();
43     private Deque<DataTreeChanged> otherMessages = new ArrayDeque<>();
44
45     private RootDataTreeChangeListenerActor(final DOMDataTreeChangeListener listener, final int shardCount) {
46         super(listener, YangInstanceIdentifier.empty());
47         this.shardCount = shardCount;
48     }
49
50     @Override
51     void onInitialData(final OnInitialData message) {
52         final ActorRef sender = getSender();
53         verifyNotNull(initialMessages, "Received OnInitialData from %s after initial convergence", sender);
54
55         final Object prev = initialMessages.put(sender, message);
56         verify(prev == null, "Received OnInitialData from %s after %s", sender, prev);
57         checkInitialConvergence();
58     }
59
60     @Override
61     void dataTreeChanged(final DataTreeChanged message) {
62         if (initialMessages == null) {
63             super.dataTreeChanged(message);
64         } else {
65             processMessage(message);
66         }
67     }
68
69     private void processMessage(final DataTreeChanged message) {
70         // Put the message into initial messages if we do not have a message from that actor yet. If we do, just stash
71         // it to other messages for later processing.
72         if (initialMessages.putIfAbsent(getSender(), message) == null) {
73             checkInitialConvergence();
74         } else {
75             otherMessages.addLast(message);
76         }
77     }
78
79     private void checkInitialConvergence() {
80         if (initialMessages.size() != shardCount) {
81             // We do not have initial state from all shards yet
82             return;
83         }
84
85         /*
86          * We need to make-pretend that the data coming into the listener is coming from a single logical entity, where
87          * ordering is partially guaranteed (on shard boundaries). The data layout in shards is such that each DataTree
88          * is rooted at YangInstanceIdentifier.empty(), but their contents vary:
89          *
90          * 1) non-default shards contain immediate children of root from one module
91          * 2) default shard contains everything else
92          * 3) there is no overlap between shards
93          *
94          * When we subscribe to each of the shards, each of them will report root as being written, which is an accurate
95          * view from each shard's perspective, but it does not reflect the aggregate reality.
96          *
97          * Construct an overall NormalizedNode view of the entire datastore by combining first-level children from all
98          * reported initial state reports, report that node as written and then report any additional deltas.
99          */
100         final List<DataTreeCandidate> initialChanges = new ArrayList<>();
101         // Reserve first item
102         initialChanges.add(null);
103
104         final DataContainerNodeBuilder<NodeIdentifier, ContainerNode> rootBuilder = Builders.containerBuilder()
105                 .withNodeIdentifier(NodeIdentifier.create(SchemaContext.NAME));
106         for (Object message : initialMessages.values()) {
107             if (message instanceof DataTreeChanged) {
108                 final Collection<DataTreeCandidate> changes = ((DataTreeChanged) message).getChanges();
109                 final DataTreeCandidate initial;
110                 if (changes.size() != 1) {
111                     final Iterator<DataTreeCandidate> it = changes.iterator();
112                     initial = it.next();
113                     // Append to changes to report as initial. This should not be happening (often?).
114                     it.forEachRemaining(initialChanges::add);
115                 } else {
116                     initial = Iterables.get(changes, 0);
117                 }
118
119                 final NormalizedNode root = initial.getRootNode().getDataAfter().orElseThrow();
120                 verify(root instanceof ContainerNode, "Unexpected root node %s", root);
121                 ((ContainerNode) root).body().forEach(rootBuilder::withChild);
122             }
123         }
124         // We will not be intercepting any other messages, allow initial state to be reclaimed as soon as possible
125         initialMessages = null;
126
127         // Replace first element with the combined initial change, report initial changes and clear the map
128         initialChanges.set(0, DataTreeCandidates.newDataTreeCandidate(YangInstanceIdentifier.empty(),
129             DataTreeCandidateNodes.written(rootBuilder.build())));
130         super.dataTreeChanged(new DataTreeChanged(initialChanges));
131
132         // Now go through all messages we have held back and report them. Note we are removing them from the queue
133         // to allow them to be reclaimed as soon as possible.
134         for (DataTreeChanged message = otherMessages.poll(); message != null; message = otherMessages.poll()) {
135             super.dataTreeChanged(message);
136         }
137         otherMessages = null;
138     }
139
140     static Props props(final DOMDataTreeChangeListener instance, final int shardCount) {
141         return Props.create(RootDataTreeChangeListenerActor.class, instance, shardCount);
142     }
143 }