2 * Copyright (c) 2017 Pantheon Technologies, s.ro. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.mdsal.dom.spi.shard;
10 import com.google.common.annotations.Beta;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Verify;
13 import com.google.common.collect.ImmutableList;
14 import com.google.common.collect.Iterators;
15 import java.util.ArrayList;
16 import java.util.Collection;
17 import java.util.Iterator;
18 import java.util.concurrent.atomic.AtomicBoolean;
19 import java.util.concurrent.atomic.AtomicLong;
20 import java.util.function.Function;
21 import javax.annotation.concurrent.GuardedBy;
22 import org.opendaylight.yangtools.concepts.Builder;
23 import org.opendaylight.yangtools.concepts.Immutable;
24 import org.opendaylight.yangtools.concepts.Mutable;
27 * Aggregator which combines state reported by potentially multiple threads into a single state report. State is
28 * received concurrently to reports and reporter threads are hijacked when there is state to be reported and no thread
31 * @param <S> State type
32 * @author Robert Varga
35 public abstract class AbstractStateAggregator<S extends AbstractStateAggregator.State> {
38 * Marker interface for state as both reported up and down.
40 public abstract static class State implements Immutable {
45 * State aggregator, which receives state chunks and creates an aggregated state object via the build method.
46 * Note all internal state must be protected by the the lock on the builder project itself.
48 * @param <S> State type
50 protected abstract static class StateBuilder<S extends State> implements Builder<S>, Mutable {
52 protected abstract void append(S state);
54 protected abstract void appendInitial(S state);
57 protected abstract static class Behavior<B extends Behavior<B, S>, S extends State> {
59 abstract Collection<StateBuilder<S>> builders();
61 abstract void receiveState(StateBuilder<S> builder, S state);
64 private static final class Starting<S extends State> extends Behavior<Starting<S>, S> {
65 private final Collection<StateBuilder<S>> builders;
67 private Started<S> successor;
69 Starting(final int sizeHint) {
70 builders = new ArrayList<>(sizeHint);
73 void add(final StateBuilder<S> builder) {
74 builders.add(Preconditions.checkNotNull(builder));
78 Collection<StateBuilder<S>> builders() {
83 synchronized void receiveState(final StateBuilder<S> builder, final S state) {
84 if (successor != null) {
85 successor.receiveState(builder, state);
89 builder.appendInitial(state);
92 synchronized Started<S> start(final Function<Collection<StateBuilder<S>>, Started<S>> function) {
93 Preconditions.checkState(successor == null, "Attempted to start an already-started aggregator");
94 final Started<S> next = Verify.verifyNotNull(function.apply(ImmutableList.copyOf(builders)));
100 protected abstract static class Started<S extends State> extends Behavior<Started<S>, S> {
101 private final Collection<StateBuilder<S>> builders;
103 Started(final Collection<? extends StateBuilder<S>> builders) {
104 this.builders = ImmutableList.copyOf(builders);
108 final Collection<StateBuilder<S>> builders() {
113 protected static final class Failed<S extends State> extends Started<S> {
114 protected Failed(final Collection<? extends StateBuilder<S>> builders) {
119 void receiveState(final StateBuilder<S> builder, final S state) {
124 protected abstract static class Operational<S extends State> extends Started<S> {
125 // Locking is a combination of a generation counter and a semaphore. Generation is bumped and remembered
126 // on stack when new state is being appended. Processed generations are recorded separately. This can cause
127 // false-positives when we loop on empty state, but that should not happen often and is harmless.
128 private final AtomicBoolean semaphore = new AtomicBoolean();
129 private final AtomicLong generation = new AtomicLong();
131 private volatile long processed;
133 protected Operational(final Collection<? extends StateBuilder<S>> builders) {
137 protected abstract void notifyListener(Iterator<S> iterator);
140 final void receiveState(final StateBuilder<S> builder, final S state) {
141 synchronized (builder) {
142 // Generation has to be bumbed atomically with state delivery, otherwise tryNotifyListener could
143 // observe state with after generation was bumped and before the state was appended
144 final long gen = generation.incrementAndGet();
146 builder.append(state);
148 tryNotifyListener(gen);
153 private void tryNotifyListener(final long initGen) {
156 // We now have to re-sync, as we may end up being the last thread in position to observe the complete state
157 // of the queues. Since queues are updated independently to iteration, notifyListener() may have missed
158 // some updates, in which case we must eventually run it.
160 // Check if this generation was processed by someone else (while we were inserting items) or if there is
161 // somebody else already running this loop (which means they will re-check and spin again).
162 while (gen != processed && semaphore.compareAndSet(false, true)) {
165 notifyListener(Iterators.transform(builders().iterator(), StateBuilder::build));
167 semaphore.set(false);
170 final long nextGen = generation.get();
171 if (nextGen == gen) {
172 // No modifications happened, we are done
181 private volatile Behavior<?, S> behavior;
183 protected AbstractStateAggregator(final int sizeHint) {
184 this.behavior = new Starting<>(sizeHint);
187 protected final void addBuilder(final StateBuilder<S> builder) {
188 checkStarting().add(builder);
191 protected final synchronized Started<S> start(final Function<Collection<StateBuilder<S>>, Started<S>> function) {
192 final Started<S> ret = checkStarting().start(function);
197 protected final void receiveState(final StateBuilder<S> builder, final S state) {
198 behavior.receiveState(builder, state);
201 @SuppressWarnings("unchecked")
202 private Starting<S> checkStarting() {
203 final Behavior<?, S> local = behavior;
204 Preconditions.checkState(local instanceof Starting, "Unexpected behavior %s", local);
205 return (Starting<S>) local;