Do not generate 'isFoo()' methods
[mdsal.git] / dom / mdsal-dom-spi / src / main / java / org / opendaylight / mdsal / dom / spi / shard / AbstractStateAggregator.java
1 /*
2  * Copyright (c) 2017 Pantheon Technologies, s.ro. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.mdsal.dom.spi.shard;
9
10 import static com.google.common.base.Preconditions.checkState;
11 import static java.util.Objects.requireNonNull;
12
13 import com.google.common.annotations.Beta;
14 import com.google.common.base.Verify;
15 import com.google.common.collect.ImmutableList;
16 import com.google.common.collect.Iterators;
17 import java.util.ArrayList;
18 import java.util.Collection;
19 import java.util.Iterator;
20 import java.util.concurrent.atomic.AtomicBoolean;
21 import java.util.concurrent.atomic.AtomicLong;
22 import java.util.function.Function;
23 import org.checkerframework.checker.lock.qual.GuardedBy;
24 import org.opendaylight.yangtools.concepts.Builder;
25 import org.opendaylight.yangtools.concepts.Immutable;
26 import org.opendaylight.yangtools.concepts.Mutable;
27
28 /**
29  * Aggregator which combines state reported by potentially multiple threads into a single state report. State is
30  * received concurrently to reports and reporter threads are hijacked when there is state to be reported and no thread
31  * is reporting it.
32  *
33  * @param <S> State type
34  * @author Robert Varga
35  *
36  * @deprecated This interface is scheduled for removal in the next major release.
37  */
38 @Beta
39 @Deprecated(forRemoval = true)
40 public abstract class AbstractStateAggregator<S extends AbstractStateAggregator.State> {
41
42     /**
43      * Marker interface for state as both reported up and down.
44      */
45     public abstract static class State implements Immutable {
46
47     }
48
49     /**
50      * State aggregator, which receives state chunks and creates an aggregated state object via the build method.
51      * Note all internal state must be protected by the the lock on the builder project itself.
52      *
53      * @param <S> State type
54      */
55     protected abstract static class StateBuilder<S extends State> implements Builder<S>, Mutable {
56
57         protected abstract void append(S state);
58
59         protected abstract void appendInitial(S state);
60     }
61
62     protected abstract static class Behavior<B extends Behavior<B, S>, S extends State> {
63
64         abstract Collection<StateBuilder<S>> builders();
65
66         abstract void receiveState(StateBuilder<S> builder, S state);
67     }
68
69     private static final class Starting<S extends State> extends Behavior<Starting<S>, S> {
70         private final Collection<StateBuilder<S>> builders;
71         @GuardedBy("this")
72         private Started<S> successor;
73
74         Starting(final int sizeHint) {
75             builders = new ArrayList<>(sizeHint);
76         }
77
78         void add(final StateBuilder<S> builder) {
79             builders.add(requireNonNull(builder));
80         }
81
82         @Override
83         Collection<StateBuilder<S>> builders() {
84             return builders;
85         }
86
87         @Override
88         synchronized void receiveState(final StateBuilder<S> builder, final S state) {
89             if (successor != null) {
90                 successor.receiveState(builder, state);
91                 return;
92             }
93
94             builder.appendInitial(state);
95         }
96
97         synchronized Started<S> start(final Function<Collection<StateBuilder<S>>, Started<S>> function) {
98             checkState(successor == null, "Attempted to start an already-started aggregator");
99             final Started<S> next = Verify.verifyNotNull(function.apply(ImmutableList.copyOf(builders)));
100             successor = next;
101             return next;
102         }
103     }
104
105     protected abstract static class Started<S extends State> extends Behavior<Started<S>, S> {
106         private final Collection<StateBuilder<S>> builders;
107
108         Started(final Collection<? extends StateBuilder<S>> builders) {
109             this.builders = ImmutableList.copyOf(builders);
110         }
111
112         @Override
113         final Collection<StateBuilder<S>> builders() {
114             return builders;
115         }
116     }
117
118     protected static final class Failed<S extends State> extends Started<S> {
119         protected Failed(final Collection<? extends StateBuilder<S>> builders) {
120             super(builders);
121         }
122
123         @Override
124         void receiveState(final StateBuilder<S> builder, final S state) {
125             // Intentional no-op
126         }
127     }
128
129     protected abstract static class Operational<S extends State> extends Started<S> {
130         // Locking is a combination of a generation counter and a semaphore. Generation is bumped and remembered
131         // on stack when new state is being appended. Processed generations are recorded separately. This can cause
132         // false-positives when we loop on empty state, but that should not happen often and is harmless.
133         private final AtomicBoolean semaphore = new AtomicBoolean();
134         private final AtomicLong generation = new AtomicLong();
135
136         private volatile long processed;
137
138         protected Operational(final Collection<? extends StateBuilder<S>> builders) {
139             super(builders);
140         }
141
142         protected abstract void notifyListener(Iterator<S> iterator);
143
144         @Override
145         final void receiveState(final StateBuilder<S> builder, final S state) {
146             synchronized (builder) {
147                 // Generation has to be bumbed atomically with state delivery, otherwise tryNotifyListener could
148                 // observe state with after generation was bumped and before the state was appended
149                 final long gen = generation.incrementAndGet();
150                 try {
151                     builder.append(state);
152                 } finally {
153                     tryNotifyListener(gen);
154                 }
155             }
156         }
157
158         private void tryNotifyListener(final long initGen) {
159             long gen = initGen;
160
161             // We now have to re-sync, as we may end up being the last thread in position to observe the complete state
162             // of the queues. Since queues are updated independently to iteration, notifyListener() may have missed
163             // some updates, in which case we must eventually run it.
164             //
165             // Check if this generation was processed by someone else (while we were inserting items) or if there is
166             // somebody else already running this loop (which means they will re-check and spin again).
167             while (gen != processed && semaphore.compareAndSet(false, true)) {
168                 try {
169                     processed = gen;
170                     notifyListener(Iterators.transform(builders().iterator(), StateBuilder::build));
171                 } finally {
172                     semaphore.set(false);
173                 }
174
175                 final long nextGen = generation.get();
176                 if (nextGen == gen) {
177                     // No modifications happened, we are done
178                     return;
179                 }
180
181                 gen = nextGen;
182             }
183         }
184     }
185
186     private volatile Behavior<?, S> behavior;
187
188     protected AbstractStateAggregator(final int sizeHint) {
189         this.behavior = new Starting<>(sizeHint);
190     }
191
192     protected final void addBuilder(final StateBuilder<S> builder) {
193         checkStarting().add(builder);
194     }
195
196     protected final synchronized Started<S> start(final Function<Collection<StateBuilder<S>>, Started<S>> function) {
197         final Started<S> ret = checkStarting().start(function);
198         behavior = ret;
199         return ret;
200     }
201
202     protected final void receiveState(final StateBuilder<S> builder, final S state) {
203         behavior.receiveState(builder, state);
204     }
205
206     @SuppressWarnings("unchecked")
207     private Starting<S> checkStarting() {
208         final Behavior<?, S> local = behavior;
209         checkState(local instanceof Starting, "Unexpected behavior %s", local);
210         return (Starting<S>) local;
211     }
212 }