2 * Copyright (c) 2017 Pantheon Technologies, s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.clustering.it.provider.impl;
10 import com.google.common.base.Stopwatch;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import java.util.ArrayDeque;
15 import java.util.Queue;
16 import java.util.concurrent.ExecutionException;
17 import java.util.concurrent.Executors;
18 import java.util.concurrent.ScheduledExecutorService;
19 import java.util.concurrent.ScheduledFuture;
20 import java.util.concurrent.TimeUnit;
21 import java.util.concurrent.TimeoutException;
22 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.TransactionsParams;
23 import org.opendaylight.yangtools.yang.common.QName;
24 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
28 abstract class AbstractTransactionHandler {
36 private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionHandler.class);
38 static final int SECOND_AS_NANO = 1000000000;
39 //2^20 as in the model
40 static final int MAX_ITEM = 1048576;
42 static final QName ID_INTS =
43 QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-ints").intern();
44 static final QName ID =
45 QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id").intern();
46 static final QName ITEM =
47 QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "item").intern();
48 static final QName NUMBER =
49 QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "number").intern();
51 public static final QName ID_INT =
52 QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-int").intern();
53 public static final YangInstanceIdentifier ID_INTS_YID = YangInstanceIdentifier.of(ID_INTS);
54 public static final YangInstanceIdentifier ID_INT_YID = ID_INTS_YID.node(ID_INT).toOptimized();
56 static final long INIT_TX_TIMEOUT_SECONDS = 125;
58 private static final long DEAD_TIMEOUT_SECONDS = TimeUnit.MINUTES.toSeconds(5);
60 private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
61 private final Queue<ListenableFuture<Void>> futures = new ArrayDeque<>();
62 private final Stopwatch stopwatch = Stopwatch.createUnstarted();
63 private final long runtimeNanos;
64 private final long delayNanos;
66 private ScheduledFuture<?> scheduledFuture;
67 private long txCounter;
70 AbstractTransactionHandler(final TransactionsParams params) {
71 runtimeNanos = TimeUnit.SECONDS.toNanos(params.getSeconds());
72 delayNanos = SECOND_AS_NANO / params.getTransactionsPerSecond();
75 final synchronized void doStart() {
76 scheduledFuture = executor.scheduleAtFixedRate(this::execute, 0, delayNanos, TimeUnit.NANOSECONDS);
78 state = State.RUNNING;
81 private synchronized void execute() {
84 // This could happen due to scheduling artifacts
90 throw new IllegalStateException("Unhandled state " + state);
94 private void runningExecute() {
95 final long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
96 if (elapsed >= runtimeNanos) {
97 LOG.debug("Reached maximum run time with {} outstanding futures", futures.size());
98 if (!checkSuccessful()) {
99 state = State.WAITING;
100 scheduledFuture.cancel(false);
101 scheduledFuture = executor.schedule(this::checkComplete, DEAD_TIMEOUT_SECONDS, TimeUnit.SECONDS);
108 // Not completed yet: create a transaction and hook it up
109 final long txId = txCounter++;
110 final ListenableFuture<Void> execFuture = execWrite(txId);
111 LOG.debug("New future #{} allocated", txId);
113 // Ordering is important: we need to add the future before hooking the callback
114 futures.add(execFuture);
115 Futures.addCallback(execFuture, new FutureCallback<Void>() {
117 public void onSuccess(final Void result) {
118 txSuccess(execFuture, txId);
122 public void onFailure(final Throwable cause) {
123 txFailure(execFuture, txId, cause);
128 final synchronized void txSuccess(final ListenableFuture<Void> execFuture, final long txId) {
129 LOG.debug("Future #{} completed successfully", txId);
130 futures.remove(execFuture);
141 throw new IllegalStateException("Unhandled state " + state);
145 final synchronized void txFailure(final ListenableFuture<Void> execFuture, final long txId, final Throwable cause) {
146 LOG.debug("Future #{} failed", txId, cause);
147 futures.remove(execFuture);
155 state = State.FAILED;
156 scheduledFuture.cancel(false);
161 throw new IllegalStateException("Unhandled state " + state);
165 private synchronized void checkComplete() {
166 final int size = futures.size();
172 for (ListenableFuture<Void> future : futures) {
174 future.get(0, TimeUnit.NANOSECONDS);
175 } catch (final TimeoutException e) {
176 LOG.warn("Future #{}/{} not completed yet", offset, size);
177 } catch (final ExecutionException e) {
178 LOG.warn("Future #{}/{} failed", offset, size, e.getCause());
179 } catch (final InterruptedException e) {
180 LOG.warn("Interrupted while examining future #{}/{}", offset, size, e);
186 state = State.FAILED;
187 runTimedOut(new TimeoutException("Collection did not finish in " + DEAD_TIMEOUT_SECONDS + " seconds"));
190 private boolean checkSuccessful() {
191 if (futures.isEmpty()) {
192 LOG.debug("Completed waiting for all futures");
193 state = State.SUCCESSFUL;
194 scheduledFuture.cancel(false);
196 runSuccessful(txCounter);
203 abstract ListenableFuture<Void> execWrite(final long txId);
205 abstract void runFailed(Throwable cause);
207 abstract void runSuccessful(long allTx);
209 abstract void runTimedOut(Exception cause);