2 * Copyright (c) 2017 Pantheon Technologies, s.ro. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.mdsal.dom.spi.shard;
10 import static java.util.Objects.requireNonNull;
12 import com.google.common.base.Stopwatch;
13 import com.google.common.collect.ImmutableList;
14 import com.google.common.collect.ImmutableMap;
15 import com.google.common.collect.Iterables;
16 import com.google.common.collect.Iterators;
17 import java.util.ArrayList;
18 import java.util.Collection;
19 import java.util.HashMap;
20 import java.util.Iterator;
21 import java.util.List;
23 import java.util.Optional;
24 import javax.annotation.concurrent.GuardedBy;
25 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
26 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
27 import org.opendaylight.mdsal.dom.api.DOMDataTreeListener;
28 import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
29 import org.opendaylight.yangtools.concepts.Identifiable;
30 import org.opendaylight.yangtools.concepts.ListenerRegistration;
31 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
32 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
33 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
34 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
39 * A compatibility class for bridging DOMDataTreeChangeListener, which can listen on only single subtree with
40 * {@link DOMDataTreeListener} interface.
42 * @author Robert Varga
43 * @deprecated This class is scheduled for removal when we remove compatibility with dom.spi.store APIs.
46 final class DOMDataTreeChangeListenerAggregator
47 extends AbstractStateAggregator<DOMDataTreeChangeListenerAggregator.State> {
49 static final class State extends AbstractStateAggregator.State implements Identifiable<DOMDataTreeIdentifier> {
50 private final DOMDataTreeIdentifier identifier;
51 final List<DataTreeCandidate> changes;
53 State(final DOMDataTreeIdentifier identifier, final List<DataTreeCandidate> changes) {
54 this.identifier = requireNonNull(identifier);
55 this.changes = requireNonNull(changes);
59 public DOMDataTreeIdentifier getIdentifier() {
64 private static final class StateBuilder extends AbstractStateAggregator.StateBuilder<State> {
66 private final List<DataTreeCandidate> changes = new ArrayList<>();
67 private final DOMDataTreeIdentifier identifier;
69 StateBuilder(final DOMDataTreeIdentifier identifier) {
70 this.identifier = requireNonNull(identifier);
74 protected synchronized void append(final State state) {
75 changes.addAll(state.changes);
79 protected synchronized void appendInitial(final State state) {
80 // We are still starting up, so all we need to do is squash reported changes to an initial write event
81 final DataTreeCandidate last = Iterables.getLast(state.changes);
83 final Optional<NormalizedNode<?, ?>> lastData = last.getRootNode().getDataAfter();
84 if (lastData.isPresent()) {
85 changes.add(DataTreeCandidates.fromNormalizedNode(last.getRootPath(), lastData.get()));
90 public synchronized State build() {
91 final State ret = new State(identifier, ImmutableList.copyOf(changes));
97 private static final class Operational extends AbstractStateAggregator.Operational<State> {
98 private final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees = new HashMap<>();
99 private final DOMDataTreeListener listener;
101 Operational(final Collection<AbstractStateAggregator.StateBuilder<State>> builders,
102 final DOMDataTreeListener listener) {
104 this.listener = requireNonNull(listener);
108 protected void notifyListener(final Iterator<State> iterator) {
109 final Stopwatch clock = Stopwatch.createStarted();
110 final List<DataTreeCandidate> changes = new ArrayList<>();
111 while (iterator.hasNext()) {
112 final State state = iterator.next();
113 final List<DataTreeCandidate> candidates = state.changes;
114 if (!candidates.isEmpty()) {
115 // Update current subtree snapshot based on last candidate node
116 final DataTreeCandidateNode lastRoot = candidates.get(candidates.size() - 1).getRootNode();
117 final Optional<NormalizedNode<?, ?>> optData = lastRoot.getDataAfter();
118 if (optData.isPresent()) {
119 subtrees.put(state.getIdentifier(), optData.get());
121 subtrees.remove(state.getIdentifier());
125 changes.addAll(candidates);
129 final int size = changes.size();
131 // Note: it is okay to leak changes, we must never leak mutable subtrees.
132 listener.onDataTreeChanged(changes, ImmutableMap.copyOf(subtrees));
133 LOG.trace("Listener {} processed {} changes in {}", listener, clock);
138 private static final Logger LOG = LoggerFactory.getLogger(DOMDataTreeChangeListenerAggregator.class);
140 private final boolean allowRxMerges;
142 DOMDataTreeChangeListenerAggregator(final int sizeHint, final boolean allowRxMerges) {
144 this.allowRxMerges = allowRxMerges;
147 DOMDataTreeChangeListener createListener(final DOMDataTreeIdentifier treeId) {
148 // TODO: do not ignore allowRxMerges, but rather create a dedicated subclass or something
149 final StateBuilder builder = new StateBuilder(treeId);
152 return changes -> receiveState(builder, new State(treeId, ImmutableList.copyOf(changes)));
155 <L extends DOMDataTreeListener> ListenerRegistration<L> start(final L listener,
156 final Collection<ListenerRegistration<?>> regs) {
158 final Operational ret = new Operational(builders, listener);
159 ret.notifyListener(Iterators.transform(builders.iterator(), AbstractStateAggregator.StateBuilder::build));
163 return new AbstractListenerRegistration<L>(listener) {
165 protected void removeRegistration() {
166 regs.forEach(ListenerRegistration::close);