BUG-8494: Cleanup clustering-it-provider
[controller.git] / opendaylight / md-sal / samples / clustering-test-app / provider / src / main / java / org / opendaylight / controller / clustering / it / provider / impl / AbstractTransactionHandler.java
diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/AbstractTransactionHandler.java b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/AbstractTransactionHandler.java
new file mode 100644 (file)
index 0000000..f23e7ec
--- /dev/null
@@ -0,0 +1,224 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.clustering.it.provider.impl;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.TransactionsParams;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+abstract class AbstractTransactionHandler {
+    private abstract static class Phase {
+        abstract void txSuccess(ListenableFuture<Void> execFuture, long txId);
+
+        abstract void txFailure(ListenableFuture<Void> execFuture, long txId, Throwable cause);
+    }
+
+    private static final class Running extends Phase {
+        private final Queue<ListenableFuture<Void>> futures = new ArrayDeque<>();
+        private Throwable failure;
+
+        void addFuture(final ListenableFuture<Void> execFuture) {
+            futures.add(execFuture);
+        }
+
+        @Override
+        void txSuccess(final ListenableFuture<Void> execFuture, final long txId) {
+            futures.remove(execFuture);
+        }
+
+        @Override
+        void txFailure(final ListenableFuture<Void> execFuture, final long txId, final Throwable cause) {
+            futures.remove(execFuture);
+            if (failure != null) {
+                failure = cause;
+            }
+        }
+
+        Optional<Throwable> getFailure() {
+            return Optional.ofNullable(failure);
+        }
+    }
+
+    private final class Collecting extends Phase {
+        private final List<ListenableFuture<Void>> futures;
+        private boolean done;
+
+        Collecting(final Collection<ListenableFuture<Void>> futures) {
+            this.futures = new ArrayList<>(futures);
+        }
+
+        @Override
+        void txSuccess(final ListenableFuture<Void> execFuture, final long txId) {
+            futures.remove(execFuture);
+            if (futures.isEmpty() && !done) {
+                LOG.debug("All futures completed successfully.");
+                runSuccessful(txCounter);
+            }
+        }
+
+        @Override
+        void txFailure(final ListenableFuture<Void> execFuture, final long txId, final Throwable cause) {
+            futures.remove(execFuture);
+            done = true;
+            runFailed(cause);
+        }
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionHandler.class);
+
+    static final int SECOND_AS_NANO = 1000000000;
+    //2^20 as in the model
+    static final int MAX_ITEM = 1048576;
+
+    static final QName ID_INTS =
+            QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-ints").intern();
+    static final QName ID =
+            QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id").intern();
+    static final QName ITEM =
+            QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "item").intern();
+    static final QName NUMBER =
+            QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "number").intern();
+
+    public static final QName ID_INT =
+            QName.create("tag:opendaylight.org,2017:controller:yang:lowlevel:target", "2017-02-15", "id-int").intern();
+    public static final YangInstanceIdentifier ID_INTS_YID = YangInstanceIdentifier.of(ID_INTS);
+    public static final YangInstanceIdentifier ID_INT_YID = ID_INTS_YID.node(ID_INT).toOptimized();
+
+    static final long INIT_TX_TIMEOUT_SECONDS = 125;
+
+    private static final long DEAD_TIMEOUT_SECONDS = TimeUnit.MINUTES.toSeconds(5);
+
+    private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+    private final Stopwatch stopwatch = Stopwatch.createStarted();
+    private final long runtimeNanos;
+    private final long delayNanos;
+
+    private ScheduledFuture<?> scheduledFuture;
+    private long txCounter;
+    @GuardedBy("this")
+    private Phase phase;
+
+    AbstractTransactionHandler(final TransactionsParams params) {
+        runtimeNanos = TimeUnit.SECONDS.toNanos(params.getSeconds());
+        delayNanos = SECOND_AS_NANO / params.getTransactionsPerSecond();
+    }
+
+    final synchronized void doStart() {
+        phase = new Running();
+        scheduledFuture = executor.scheduleAtFixedRate(this::execute, 0, delayNanos, TimeUnit.NANOSECONDS);
+    }
+
+    private void execute() {
+        final long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
+        if (elapsed < runtimeNanos) {
+            // Not completed yet: create a transaction and hook it up
+            final long txId = txCounter++;
+            final ListenableFuture<Void> execFuture = execWrite(txId);
+
+            // Ordering is important: we need to add the future before hooking the callback
+            synchronized (this) {
+                ((Running) phase).addFuture(execFuture);
+            }
+            Futures.addCallback(execFuture, new FutureCallback<Void>() {
+                @Override
+                public void onSuccess(final Void result) {
+                    txSuccess(execFuture, txId);
+                }
+
+                @Override
+                public void onFailure(final Throwable cause) {
+                    txFailure(execFuture, txId, cause);
+                }
+            });
+        } else {
+            startCollection();
+        }
+    }
+
+    private synchronized void startCollection() {
+        scheduledFuture.cancel(false);
+
+        final Running running = (Running) phase;
+        final Optional<Throwable> failure = running.getFailure();
+        if (failure.isPresent()) {
+            executor.shutdown();
+            runFailed(failure.get());
+            return;
+        }
+
+        LOG.debug("Reached maximum run time with {} outstanding futures", running.futures.size());
+        if (running.futures.isEmpty()) {
+            executor.shutdown();
+            runSuccessful(txCounter);
+            return;
+        }
+
+        phase = new Collecting(running.futures);
+        executor.schedule(this::checkCollection, DEAD_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+        executor.shutdown();
+    }
+
+    final synchronized void txSuccess(final ListenableFuture<Void> execFuture, final long txId) {
+        LOG.debug("Future #{} completed successfully", txId);
+        phase.txSuccess(execFuture, txId);
+    }
+
+    final synchronized void txFailure(final ListenableFuture<Void> execFuture, final long txId, final Throwable cause) {
+        LOG.debug("Future #{} failed", txId, cause);
+        phase.txFailure(execFuture, txId, cause);
+    }
+
+    private synchronized void checkCollection() {
+        final Collecting collecting = (Collecting) phase;
+        if (!collecting.done) {
+            final int size = collecting.futures.size();
+            for (int i = 0; i < size; i++) {
+                final ListenableFuture<Void> future = collecting.futures.get(i);
+
+                try {
+                    future.get(0, TimeUnit.NANOSECONDS);
+                } catch (final TimeoutException e) {
+                    LOG.warn("Future #{}/{} not completed yet", i, size);
+                } catch (final ExecutionException e) {
+                    LOG.warn("Future #{}/{} failed", i, size, e.getCause());
+                } catch (final InterruptedException e) {
+                    LOG.warn("Interrupted while examining future #{}/{}", i, size, e);
+                }
+            }
+
+            runTimedOut(new TimeoutException("Collection did not finish in " + DEAD_TIMEOUT_SECONDS + " seconds"));
+        }
+    }
+
+    abstract ListenableFuture<Void> execWrite(final long txId);
+
+    abstract void runFailed(Throwable cause);
+
+    abstract void runSuccessful(long allTx);
+
+    abstract void runTimedOut(Exception cause);
+}