BUG-8733: Add ListenableDOMDataTreeShard
[mdsal.git] / dom / mdsal-dom-spi / src / main / java / org / opendaylight / mdsal / dom / spi / shard / AbstractStateAggregator.java
1 /*
2  * Copyright (c) 2017 Pantheon Technologies, s.ro. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.mdsal.dom.spi.shard;
9
10 import com.google.common.annotations.Beta;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Verify;
13 import com.google.common.collect.ImmutableList;
14 import com.google.common.collect.Iterators;
15 import java.util.ArrayList;
16 import java.util.Collection;
17 import java.util.Iterator;
18 import java.util.concurrent.atomic.AtomicBoolean;
19 import java.util.concurrent.atomic.AtomicLong;
20 import java.util.function.Function;
21 import javax.annotation.concurrent.GuardedBy;
22 import org.opendaylight.yangtools.concepts.Builder;
23 import org.opendaylight.yangtools.concepts.Immutable;
24 import org.opendaylight.yangtools.concepts.Mutable;
25
26 /**
27  * Aggregator which combines state reported by potentially multiple threads into a single state report. State is
28  * received concurrently to reports and reporter threads are hijacked when there is state to be reported and no thread
29  * is reporting it.
30  *
31  * @param <S> State type
32  * @author Robert Varga
33  */
34 @Beta
35 public abstract class AbstractStateAggregator<S extends AbstractStateAggregator.State> {
36
37     /**
38      * Marker interface for state as both reported up and down.
39      */
40     public abstract static class State implements Immutable {
41
42     }
43
44     /**
45      * State aggregator, which receives state chunks and creates an aggregated state object via the build method.
46      * Note all internal state must be protected by the the lock on the builder project itself.
47      *
48      * @param <S> State type
49      */
50     protected abstract static class StateBuilder<S extends State> implements Builder<S>, Mutable {
51
52         protected abstract void append(S state);
53
54         protected abstract void appendInitial(S state);
55     }
56
57     protected abstract static class Behavior<B extends Behavior<B, S>, S extends State> {
58
59         abstract Collection<StateBuilder<S>> builders();
60
61         abstract void receiveState(StateBuilder<S> builder, S state);
62     }
63
64     private static final class Starting<S extends State> extends Behavior<Starting<S>, S> {
65         private final Collection<StateBuilder<S>> builders;
66         @GuardedBy("this")
67         private Started<S> successor;
68
69         Starting(final int sizeHint) {
70             builders = new ArrayList<>(sizeHint);
71         }
72
73         void add(final StateBuilder<S> builder) {
74             builders.add(Preconditions.checkNotNull(builder));
75         }
76
77         @Override
78         Collection<StateBuilder<S>> builders() {
79             return builders;
80         }
81
82         @Override
83         synchronized void receiveState(final StateBuilder<S> builder, final S state) {
84             if (successor != null) {
85                 successor.receiveState(builder, state);
86                 return;
87             }
88
89             builder.appendInitial(state);
90         }
91
92         synchronized Started<S> start(final Function<Collection<StateBuilder<S>>, Started<S>> function) {
93             Preconditions.checkState(successor == null, "Attempted to start an already-started aggregator");
94             final Started<S> next = Verify.verifyNotNull(function.apply(ImmutableList.copyOf(builders)));
95             successor = next;
96             return next;
97         }
98     }
99
100     protected abstract static class Started<S extends State> extends Behavior<Started<S>, S> {
101         private final Collection<StateBuilder<S>> builders;
102
103         Started(final Collection<? extends StateBuilder<S>> builders) {
104             this.builders = ImmutableList.copyOf(builders);
105         }
106
107         @Override
108         final Collection<StateBuilder<S>> builders() {
109             return builders;
110         }
111     }
112
113     protected static final class Failed<S extends State> extends Started<S> {
114         protected Failed(final Collection<? extends StateBuilder<S>> builders) {
115             super(builders);
116         }
117
118         @Override
119         void receiveState(final StateBuilder<S> builder, final S state) {
120             // Intentional no-op
121         }
122     }
123
124     protected abstract static class Operational<S extends State> extends Started<S> {
125         // Locking is a combination of a generation counter and a semaphore. Generation is bumped and remembered
126         // on stack when new state is being appended. Processed generations are recorded separately. This can cause
127         // false-positives when we loop on empty state, but that should not happen often and is harmless.
128         private final AtomicBoolean semaphore = new AtomicBoolean();
129         private final AtomicLong generation = new AtomicLong();
130
131         private volatile long processed;
132
133         protected Operational(final Collection<? extends StateBuilder<S>> builders) {
134             super(builders);
135         }
136
137         protected abstract void notifyListener(Iterator<S> iterator);
138
139         @Override
140         final void receiveState(final StateBuilder<S> builder, final S state) {
141             synchronized (builder) {
142                 // Generation has to be bumbed atomically with state delivery, otherwise tryNotifyListener could
143                 // observe state with after generation was bumped and before the state was appended
144                 final long gen = generation.incrementAndGet();
145                 try {
146                     builder.append(state);
147                 } finally {
148                     tryNotifyListener(gen);
149                 }
150             }
151         }
152
153         private void tryNotifyListener(final long initGen) {
154             long gen = initGen;
155
156             // We now have to re-sync, as we may end up being the last thread in position to observe the complete state
157             // of the queues. Since queues are updated independently to iteration, notifyListener() may have missed
158             // some updates, in which case we must eventually run it.
159             //
160             // Check if this generation was processed by someone else (while we were inserting items) or if there is
161             // somebody else already running this loop (which means they will re-check and spin again).
162             while (gen != processed && semaphore.compareAndSet(false, true)) {
163                 try {
164                     processed = gen;
165                     notifyListener(Iterators.transform(builders().iterator(), StateBuilder::build));
166                 } finally {
167                     semaphore.set(false);
168                 }
169
170                 final long nextGen = generation.get();
171                 if (nextGen == gen) {
172                     // No modifications happened, we are done
173                     return;
174                 }
175
176                 gen = nextGen;
177             }
178         }
179     }
180
181     private volatile Behavior<?, S> behavior;
182
183     protected AbstractStateAggregator(final int sizeHint) {
184         this.behavior = new Starting<>(sizeHint);
185     }
186
187     protected final void addBuilder(final StateBuilder<S> builder) {
188         checkStarting().add(builder);
189     }
190
191     protected final synchronized Started<S> start(final Function<Collection<StateBuilder<S>>, Started<S>> function) {
192         final Started<S> ret = checkStarting().start(function);
193         behavior = ret;
194         return ret;
195     }
196
197     protected final void receiveState(final StateBuilder<S> builder, final S state) {
198         behavior.receiveState(builder, state);
199     }
200
201     @SuppressWarnings("unchecked")
202     private Starting<S> checkStarting() {
203         final Behavior<?, S> local = behavior;
204         Preconditions.checkState(local instanceof Starting, "Unexpected behavior %s", local);
205         return (Starting<S>) local;
206     }
207 }