X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsamples%2Fclustering-test-app%2Fprovider%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fclustering%2Fit%2Fprovider%2Fimpl%2FAbstractTransactionHandler.java;fp=opendaylight%2Fmd-sal%2Fsamples%2Fclustering-test-app%2Fprovider%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fclustering%2Fit%2Fprovider%2Fimpl%2FAbstractTransactionHandler.java;h=d0923ce6de7a5b4486f2292231c3fcc0f93dc471;hb=2a2a1d93bf71c5b5b341f1664f474a349e7739c9;hp=ea0749a2d9d11ee615381f462c69aee4d62de0d2;hpb=64bc1360aedb83583edb354444ee3e4295c7a5e6;p=controller.git diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/AbstractTransactionHandler.java b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/AbstractTransactionHandler.java index ea0749a2d9..d0923ce6de 100644 --- a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/AbstractTransactionHandler.java +++ b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/AbstractTransactionHandler.java @@ -11,10 +11,9 @@ import com.google.common.base.Stopwatch; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import java.util.ArrayDeque; -import java.util.Queue; +import java.util.Collection; +import java.util.HashSet; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -35,7 +34,7 @@ abstract class AbstractTransactionHandler { private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionHandler.class); - static final int SECOND_AS_NANO = 1000000000; + static final int SECOND_AS_NANO = 1_000_000_000; //2^20 as in the model static final int MAX_ITEM = 1048576; @@ -55,10 +54,10 @@ abstract class AbstractTransactionHandler { static final long INIT_TX_TIMEOUT_SECONDS = 125; - private static final long DEAD_TIMEOUT_SECONDS = TimeUnit.MINUTES.toSeconds(5); + private static final long DEAD_TIMEOUT_SECONDS = TimeUnit.MINUTES.toSeconds(15); - private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); - private final Queue> futures = new ArrayDeque<>(); + private final ScheduledExecutorService executor = FinalizableScheduledExecutorService.newSingleThread(); + private final Collection> futures = new HashSet<>(); private final Stopwatch stopwatch = Stopwatch.createUnstarted(); private final long runtimeNanos; private final long delayNanos; @@ -99,7 +98,6 @@ abstract class AbstractTransactionHandler { state = State.WAITING; scheduledFuture.cancel(false); scheduledFuture = executor.schedule(this::checkComplete, DEAD_TIMEOUT_SECONDS, TimeUnit.SECONDS); - executor.shutdown(); } return; @@ -122,10 +120,10 @@ abstract class AbstractTransactionHandler { public void onFailure(final Throwable cause) { txFailure(execFuture, txId, cause); } - }); + }, executor); } - final synchronized void txSuccess(final ListenableFuture execFuture, final long txId) { + final void txSuccess(final ListenableFuture execFuture, final long txId) { LOG.debug("Future #{} completed successfully", txId); futures.remove(execFuture); @@ -142,7 +140,7 @@ abstract class AbstractTransactionHandler { } } - final synchronized void txFailure(final ListenableFuture execFuture, final long txId, final Throwable cause) { + final void txFailure(final ListenableFuture execFuture, final long txId, final Throwable cause) { LOG.debug("Future #{} failed", txId, cause); futures.remove(execFuture); @@ -154,7 +152,6 @@ abstract class AbstractTransactionHandler { case WAITING: state = State.FAILED; scheduledFuture.cancel(false); - executor.shutdown(); runFailed(cause); break; default: @@ -162,7 +159,7 @@ abstract class AbstractTransactionHandler { } } - private synchronized void checkComplete() { + private void checkComplete() { final int size = futures.size(); if (size == 0) { return; @@ -192,7 +189,6 @@ abstract class AbstractTransactionHandler { LOG.debug("Completed waiting for all futures"); state = State.SUCCESSFUL; scheduledFuture.cancel(false); - executor.shutdown(); runSuccessful(txCounter); return true; }