2 * Copyright (c) 2017 Pantheon Technologies, s.ro. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.mdsal.dom.spi.shard;
10 import static com.google.common.base.Preconditions.checkState;
11 import static java.util.Objects.requireNonNull;
13 import com.google.common.annotations.Beta;
14 import com.google.common.base.Verify;
15 import com.google.common.collect.ImmutableList;
16 import com.google.common.collect.Iterators;
17 import java.util.ArrayList;
18 import java.util.Collection;
19 import java.util.Iterator;
20 import java.util.concurrent.atomic.AtomicBoolean;
21 import java.util.concurrent.atomic.AtomicLong;
22 import java.util.function.Function;
23 import org.checkerframework.checker.lock.qual.GuardedBy;
24 import org.opendaylight.yangtools.concepts.Builder;
25 import org.opendaylight.yangtools.concepts.Immutable;
26 import org.opendaylight.yangtools.concepts.Mutable;
29 * Aggregator which combines state reported by potentially multiple threads into a single state report. State is
30 * received concurrently to reports and reporter threads are hijacked when there is state to be reported and no thread
33 * @param <S> State type
34 * @author Robert Varga
36 * @deprecated This interface is scheduled for removal in the next major release.
39 @Deprecated(forRemoval = true)
40 public abstract class AbstractStateAggregator<S extends AbstractStateAggregator.State> {
43 * Marker interface for state as both reported up and down.
45 public abstract static class State implements Immutable {
50 * State aggregator, which receives state chunks and creates an aggregated state object via the build method.
51 * Note all internal state must be protected by the the lock on the builder project itself.
53 * @param <S> State type
55 protected abstract static class StateBuilder<S extends State> implements Builder<S>, Mutable {
57 protected abstract void append(S state);
59 protected abstract void appendInitial(S state);
62 protected abstract static class Behavior<B extends Behavior<B, S>, S extends State> {
64 abstract Collection<StateBuilder<S>> builders();
66 abstract void receiveState(StateBuilder<S> builder, S state);
69 private static final class Starting<S extends State> extends Behavior<Starting<S>, S> {
70 private final Collection<StateBuilder<S>> builders;
72 private Started<S> successor;
74 Starting(final int sizeHint) {
75 builders = new ArrayList<>(sizeHint);
78 void add(final StateBuilder<S> builder) {
79 builders.add(requireNonNull(builder));
83 Collection<StateBuilder<S>> builders() {
88 synchronized void receiveState(final StateBuilder<S> builder, final S state) {
89 if (successor != null) {
90 successor.receiveState(builder, state);
94 builder.appendInitial(state);
97 synchronized Started<S> start(final Function<Collection<StateBuilder<S>>, Started<S>> function) {
98 checkState(successor == null, "Attempted to start an already-started aggregator");
99 final Started<S> next = Verify.verifyNotNull(function.apply(ImmutableList.copyOf(builders)));
105 protected abstract static class Started<S extends State> extends Behavior<Started<S>, S> {
106 private final Collection<StateBuilder<S>> builders;
108 Started(final Collection<? extends StateBuilder<S>> builders) {
109 this.builders = ImmutableList.copyOf(builders);
113 final Collection<StateBuilder<S>> builders() {
118 protected static final class Failed<S extends State> extends Started<S> {
119 protected Failed(final Collection<? extends StateBuilder<S>> builders) {
124 void receiveState(final StateBuilder<S> builder, final S state) {
129 protected abstract static class Operational<S extends State> extends Started<S> {
130 // Locking is a combination of a generation counter and a semaphore. Generation is bumped and remembered
131 // on stack when new state is being appended. Processed generations are recorded separately. This can cause
132 // false-positives when we loop on empty state, but that should not happen often and is harmless.
133 private final AtomicBoolean semaphore = new AtomicBoolean();
134 private final AtomicLong generation = new AtomicLong();
136 private volatile long processed;
138 protected Operational(final Collection<? extends StateBuilder<S>> builders) {
142 protected abstract void notifyListener(Iterator<S> iterator);
145 final void receiveState(final StateBuilder<S> builder, final S state) {
146 synchronized (builder) {
147 // Generation has to be bumbed atomically with state delivery, otherwise tryNotifyListener could
148 // observe state with after generation was bumped and before the state was appended
149 final long gen = generation.incrementAndGet();
151 builder.append(state);
153 tryNotifyListener(gen);
158 private void tryNotifyListener(final long initGen) {
161 // We now have to re-sync, as we may end up being the last thread in position to observe the complete state
162 // of the queues. Since queues are updated independently to iteration, notifyListener() may have missed
163 // some updates, in which case we must eventually run it.
165 // Check if this generation was processed by someone else (while we were inserting items) or if there is
166 // somebody else already running this loop (which means they will re-check and spin again).
167 while (gen != processed && semaphore.compareAndSet(false, true)) {
170 notifyListener(Iterators.transform(builders().iterator(), StateBuilder::build));
172 semaphore.set(false);
175 final long nextGen = generation.get();
176 if (nextGen == gen) {
177 // No modifications happened, we are done
186 private volatile Behavior<?, S> behavior;
188 protected AbstractStateAggregator(final int sizeHint) {
189 this.behavior = new Starting<>(sizeHint);
192 protected final void addBuilder(final StateBuilder<S> builder) {
193 checkStarting().add(builder);
196 protected final synchronized Started<S> start(final Function<Collection<StateBuilder<S>>, Started<S>> function) {
197 final Started<S> ret = checkStarting().start(function);
202 protected final void receiveState(final StateBuilder<S> builder, final S state) {
203 behavior.receiveState(builder, state);
206 @SuppressWarnings("unchecked")
207 private Starting<S> checkStarting() {
208 final Behavior<?, S> local = behavior;
209 checkState(local instanceof Starting, "Unexpected behavior %s", local);
210 return (Starting<S>) local;