2 * Copyright (c) 2017 Pantheon Technologies, s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.clustering.it.provider.impl;
10 import com.google.common.base.Stopwatch;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import java.util.ArrayDeque;
15 import java.util.ArrayList;
16 import java.util.Collection;
17 import java.util.List;
18 import java.util.Optional;
19 import java.util.Queue;
20 import java.util.concurrent.ExecutionException;
21 import java.util.concurrent.Executors;
22 import java.util.concurrent.ScheduledExecutorService;
23 import java.util.concurrent.ScheduledFuture;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.TimeoutException;
26 import javax.annotation.concurrent.GuardedBy;
27 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.TransactionsParams;
28 import org.opendaylight.yangtools.yang.common.QName;
29 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
33 abstract class AbstractTransactionHandler {
34 private abstract static class Phase {
35 abstract void txSuccess(ListenableFuture<Void> execFuture, long txId);
37 abstract void txFailure(ListenableFuture<Void> execFuture, long txId, Throwable cause);
40 private static final class Running extends Phase {
41 private final Queue<ListenableFuture<Void>> futures = new ArrayDeque<>();
42 private Throwable failure;
44 void addFuture(final ListenableFuture<Void> execFuture) {
45 futures.add(execFuture);
49 void txSuccess(final ListenableFuture<Void> execFuture, final long txId) {
50 futures.remove(execFuture);
54 void txFailure(final ListenableFuture<Void> execFuture, final long txId, final Throwable cause) {
55 futures.remove(execFuture);
56 if (failure == null) {
61 Optional<Throwable> getFailure() {
62 return Optional.ofNullable(failure);
66 private final class Collecting extends Phase {
67 private final List<ListenableFuture<Void>> futures;
70 Collecting(final Collection<ListenableFuture<Void>> futures) {
71 this.futures = new ArrayList<>(futures);
75 void txSuccess(final ListenableFuture<Void> execFuture, final long txId) {
76 futures.remove(execFuture);
77 if (futures.isEmpty() && !done) {
78 LOG.debug("All futures completed successfully.");
79 runSuccessful(txCounter);
84 void txFailure(final ListenableFuture<Void> execFuture, final long txId, final Throwable cause) {
85 futures.remove(execFuture);
91 private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionHandler.class);
93 static final int SECOND_AS_NANO = 1000000000;
94 //2^20 as in the model
95 static final int MAX_ITEM = 1048576;
97 static final QName ID_INTS =
98 QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-ints").intern();
99 static final QName ID =
100 QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id").intern();
101 static final QName ITEM =
102 QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "item").intern();
103 static final QName NUMBER =
104 QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "number").intern();
106 public static final QName ID_INT =
107 QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-int").intern();
108 public static final YangInstanceIdentifier ID_INTS_YID = YangInstanceIdentifier.of(ID_INTS);
109 public static final YangInstanceIdentifier ID_INT_YID = ID_INTS_YID.node(ID_INT).toOptimized();
111 static final long INIT_TX_TIMEOUT_SECONDS = 125;
113 private static final long DEAD_TIMEOUT_SECONDS = TimeUnit.MINUTES.toSeconds(5);
115 private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
116 private final Stopwatch stopwatch = Stopwatch.createStarted();
117 private final long runtimeNanos;
118 private final long delayNanos;
120 private ScheduledFuture<?> scheduledFuture;
121 private long txCounter;
125 AbstractTransactionHandler(final TransactionsParams params) {
126 runtimeNanos = TimeUnit.SECONDS.toNanos(params.getSeconds());
127 delayNanos = SECOND_AS_NANO / params.getTransactionsPerSecond();
130 final synchronized void doStart() {
131 phase = new Running();
132 scheduledFuture = executor.scheduleAtFixedRate(this::execute, 0, delayNanos, TimeUnit.NANOSECONDS);
135 private void execute() {
136 final long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
137 if (elapsed < runtimeNanos) {
138 // Not completed yet: create a transaction and hook it up
139 final long txId = txCounter++;
140 final ListenableFuture<Void> execFuture = execWrite(txId);
142 // Ordering is important: we need to add the future before hooking the callback
143 synchronized (this) {
144 ((Running) phase).addFuture(execFuture);
146 Futures.addCallback(execFuture, new FutureCallback<Void>() {
148 public void onSuccess(final Void result) {
149 txSuccess(execFuture, txId);
153 public void onFailure(final Throwable cause) {
154 txFailure(execFuture, txId, cause);
162 private synchronized void startCollection() {
163 scheduledFuture.cancel(false);
165 final Running running = (Running) phase;
166 final Optional<Throwable> failure = running.getFailure();
167 if (failure.isPresent()) {
169 runFailed(failure.get());
173 LOG.debug("Reached maximum run time with {} outstanding futures", running.futures.size());
174 if (running.futures.isEmpty()) {
176 runSuccessful(txCounter);
180 phase = new Collecting(running.futures);
181 executor.schedule(this::checkCollection, DEAD_TIMEOUT_SECONDS, TimeUnit.SECONDS);
185 final synchronized void txSuccess(final ListenableFuture<Void> execFuture, final long txId) {
186 LOG.debug("Future #{} completed successfully", txId);
187 phase.txSuccess(execFuture, txId);
190 final synchronized void txFailure(final ListenableFuture<Void> execFuture, final long txId, final Throwable cause) {
191 LOG.debug("Future #{} failed", txId, cause);
192 phase.txFailure(execFuture, txId, cause);
195 private synchronized void checkCollection() {
196 final Collecting collecting = (Collecting) phase;
197 if (!collecting.done) {
198 final int size = collecting.futures.size();
199 for (int i = 0; i < size; i++) {
200 final ListenableFuture<Void> future = collecting.futures.get(i);
203 future.get(0, TimeUnit.NANOSECONDS);
204 } catch (final TimeoutException e) {
205 LOG.warn("Future #{}/{} not completed yet", i, size);
206 } catch (final ExecutionException e) {
207 LOG.warn("Future #{}/{} failed", i, size, e.getCause());
208 } catch (final InterruptedException e) {
209 LOG.warn("Interrupted while examining future #{}/{}", i, size, e);
213 runTimedOut(new TimeoutException("Collection did not finish in " + DEAD_TIMEOUT_SECONDS + " seconds"));
217 abstract ListenableFuture<Void> execWrite(final long txId);
219 abstract void runFailed(Throwable cause);
221 abstract void runSuccessful(long allTx);
223 abstract void runTimedOut(Exception cause);