Bump odlparent/yangtools/mdsal
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / RootDataTreeChangeListenerActor.java
1 /*
2  * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static com.google.common.base.Verify.verify;
11 import static com.google.common.base.Verify.verifyNotNull;
12
13 import akka.actor.ActorRef;
14 import akka.actor.Props;
15 import com.google.common.collect.Iterables;
16 import java.util.ArrayDeque;
17 import java.util.Collection;
18 import java.util.Deque;
19 import java.util.Iterator;
20 import java.util.LinkedHashMap;
21 import java.util.Map;
22 import org.opendaylight.controller.cluster.datastore.messages.DataTreeChanged;
23 import org.opendaylight.controller.cluster.datastore.messages.OnInitialData;
24 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
25 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
26 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
27 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
28 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
29 import org.opendaylight.yangtools.yang.data.api.schema.builder.DataContainerNodeBuilder;
30 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
31 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNodes;
32 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
33 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
34 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
35
36 final class RootDataTreeChangeListenerActor extends DataTreeChangeListenerActor {
37     private final int shardCount;
38
39     // Initial messages, retaining order in which we have received them
40     private Map<ActorRef, Object> initialMessages = new LinkedHashMap<>();
41     private Deque<DataTreeChanged> otherMessages = new ArrayDeque<>();
42
43     private RootDataTreeChangeListenerActor(final DOMDataTreeChangeListener listener, final int shardCount) {
44         super(listener, YangInstanceIdentifier.empty());
45         this.shardCount = shardCount;
46     }
47
48     @Override
49     void onInitialData(final OnInitialData message) {
50         final ActorRef sender = getSender();
51         verifyNotNull(initialMessages, "Received OnInitialData from %s after initial convergence", sender);
52
53         final Object prev = initialMessages.put(sender, message);
54         verify(prev == null, "Received OnInitialData from %s after %s", sender, prev);
55         checkInitialConvergence();
56     }
57
58     @Override
59     void dataTreeChanged(final DataTreeChanged message) {
60         if (initialMessages == null) {
61             super.dataTreeChanged(message);
62         } else {
63             processMessage(message);
64         }
65     }
66
67     private void processMessage(final DataTreeChanged message) {
68         // Put the message into initial messages if we do not have a message from that actor yet. If we do, just stash
69         // it to other messages for later processing.
70         if (initialMessages.putIfAbsent(getSender(), message) == null) {
71             checkInitialConvergence();
72         } else {
73             otherMessages.addLast(message);
74         }
75     }
76
77     private void checkInitialConvergence() {
78         if (initialMessages.size() != shardCount) {
79             // We do not have initial state from all shards yet
80             return;
81         }
82
83         /*
84          * We need to make-pretend that the data coming into the listener is coming from a single logical entity, where
85          * ordering is partially guaranteed (on shard boundaries). The data layout in shards is such that each DataTree
86          * is rooted at YangInstanceIdentifier.empty(), but their contents vary:
87          *
88          * 1) non-default shards contain immediate children of root from one module
89          * 2) default shard contains everything else
90          * 3) there is no overlap between shards
91          *
92          * When we subscribe to each of the shards, each of them will report root as being written, which is an accurate
93          * view from each shard's perspective, but it does not reflect the aggregate reality.
94          *
95          * Construct an overall NormalizedNode view of the entire datastore by combining first-level children from all
96          * reported initial state reports, report that node as written and then report any additional deltas.
97          */
98         final Deque<DataTreeCandidate> initialChanges = new ArrayDeque<>();
99         final DataContainerNodeBuilder<NodeIdentifier, ContainerNode> rootBuilder = Builders.containerBuilder()
100                 .withNodeIdentifier(NodeIdentifier.create(SchemaContext.NAME));
101         for (Object message : initialMessages.values()) {
102             if (message instanceof DataTreeChanged) {
103                 final Collection<DataTreeCandidate> changes = ((DataTreeChanged) message).getChanges();
104                 final DataTreeCandidate initial;
105                 if (changes.size() != 1) {
106                     final Iterator<DataTreeCandidate> it = changes.iterator();
107                     initial = it.next();
108                     // Append to changes to report as initial. This should not be happening (often?).
109                     it.forEachRemaining(initialChanges::addLast);
110                 } else {
111                     initial = Iterables.get(changes, 0);
112                 }
113
114                 final NormalizedNode root = initial.getRootNode().getDataAfter().orElseThrow();
115                 verify(root instanceof ContainerNode, "Unexpected root node %s", root);
116                 ((ContainerNode) root).body().forEach(rootBuilder::withChild);
117             }
118         }
119         // We will not be intercepting any other messages, allow initial state to be reclaimed as soon as possible
120         initialMessages = null;
121
122         // Prepend combined initial changed and report initial changes and clear the map
123         initialChanges.addFirst(DataTreeCandidates.newDataTreeCandidate(YangInstanceIdentifier.empty(),
124             DataTreeCandidateNodes.written(rootBuilder.build())));
125         super.dataTreeChanged(new DataTreeChanged(initialChanges));
126
127         // Now go through all messages we have held back and report them. Note we are removing them from the queue
128         // to allow them to be reclaimed as soon as possible.
129         for (DataTreeChanged message = otherMessages.poll(); message != null; message = otherMessages.poll()) {
130             super.dataTreeChanged(message);
131         }
132         otherMessages = null;
133     }
134
135     static Props props(final DOMDataTreeChangeListener instance, final int shardCount) {
136         return Props.create(RootDataTreeChangeListenerActor.class, instance, shardCount);
137     }
138 }