From 2a2a1d93bf71c5b5b341f1664f474a349e7739c9 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Mon, 10 Jul 2017 16:08:57 +0200 Subject: [PATCH] BUG-8618: fix test driver Since the test can produce bursts of completions, which in turn can get slowed down by writout of new messages, offload future completion to the executor we have internally. This in turn simplifies things, as we can rely on state being manipulated (mostly) from a single thread. Also change ArrayDeque to a HashSet to ensure removal of tasks completes quickly even in face of misordered responses. Change-Id: Ia5341633af2dbe3e26e7208436405daf7632a876 Signed-off-by: Robert Varga (cherry picked from commit 2be77b3bcef31ad8b6dbdce073471561d2cf76d6) --- .../impl/AbstractTransactionHandler.java | 24 ++++++------- .../FinalizableScheduledExecutorService.java | 36 +++++++++++++++++++ .../impl/FlappingSingletonService.java | 11 +++--- 3 files changed, 50 insertions(+), 21 deletions(-) create mode 100644 opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/FinalizableScheduledExecutorService.java diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/AbstractTransactionHandler.java b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/AbstractTransactionHandler.java index ea0749a2d9..d0923ce6de 100644 --- a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/AbstractTransactionHandler.java +++ b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/AbstractTransactionHandler.java @@ -11,10 +11,9 @@ import com.google.common.base.Stopwatch; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import java.util.ArrayDeque; -import java.util.Queue; +import java.util.Collection; +import java.util.HashSet; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -35,7 +34,7 @@ abstract class AbstractTransactionHandler { private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionHandler.class); - static final int SECOND_AS_NANO = 1000000000; + static final int SECOND_AS_NANO = 1_000_000_000; //2^20 as in the model static final int MAX_ITEM = 1048576; @@ -55,10 +54,10 @@ abstract class AbstractTransactionHandler { static final long INIT_TX_TIMEOUT_SECONDS = 125; - private static final long DEAD_TIMEOUT_SECONDS = TimeUnit.MINUTES.toSeconds(5); + private static final long DEAD_TIMEOUT_SECONDS = TimeUnit.MINUTES.toSeconds(15); - private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); - private final Queue> futures = new ArrayDeque<>(); + private final ScheduledExecutorService executor = FinalizableScheduledExecutorService.newSingleThread(); + private final Collection> futures = new HashSet<>(); private final Stopwatch stopwatch = Stopwatch.createUnstarted(); private final long runtimeNanos; private final long delayNanos; @@ -99,7 +98,6 @@ abstract class AbstractTransactionHandler { state = State.WAITING; scheduledFuture.cancel(false); scheduledFuture = executor.schedule(this::checkComplete, DEAD_TIMEOUT_SECONDS, TimeUnit.SECONDS); - executor.shutdown(); } return; @@ -122,10 +120,10 @@ abstract class AbstractTransactionHandler { public void onFailure(final Throwable cause) { txFailure(execFuture, txId, cause); } - }); + }, executor); } - final synchronized void txSuccess(final ListenableFuture execFuture, final long txId) { + final void txSuccess(final ListenableFuture execFuture, final long txId) { LOG.debug("Future #{} completed successfully", txId); futures.remove(execFuture); @@ -142,7 +140,7 @@ abstract class AbstractTransactionHandler { } } - final synchronized void txFailure(final ListenableFuture execFuture, final long txId, final Throwable cause) { + final void txFailure(final ListenableFuture execFuture, final long txId, final Throwable cause) { LOG.debug("Future #{} failed", txId, cause); futures.remove(execFuture); @@ -154,7 +152,6 @@ abstract class AbstractTransactionHandler { case WAITING: state = State.FAILED; scheduledFuture.cancel(false); - executor.shutdown(); runFailed(cause); break; default: @@ -162,7 +159,7 @@ abstract class AbstractTransactionHandler { } } - private synchronized void checkComplete() { + private void checkComplete() { final int size = futures.size(); if (size == 0) { return; @@ -192,7 +189,6 @@ abstract class AbstractTransactionHandler { LOG.debug("Completed waiting for all futures"); state = State.SUCCESSFUL; scheduledFuture.cancel(false); - executor.shutdown(); runSuccessful(txCounter); return true; } diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/FinalizableScheduledExecutorService.java b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/FinalizableScheduledExecutorService.java new file mode 100644 index 0000000000..19e2373653 --- /dev/null +++ b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/FinalizableScheduledExecutorService.java @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2017 Pantheon Technologies, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.clustering.it.provider.impl; + +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * A simple ScheduledExecutorService, which shutds down its threads after a period of inactivity. It is safe to not + * shutdown this + * + * @author Robert Varga + */ +final class FinalizableScheduledExecutorService extends ScheduledThreadPoolExecutor { + + private FinalizableScheduledExecutorService(final int maxThreads, final long time, final TimeUnit unit) { + super(maxThreads); + setKeepAliveTime(time, unit); + allowCoreThreadTimeOut(true); + } + + static ScheduledThreadPoolExecutor newSingleThread() { + return new FinalizableScheduledExecutorService(1, 15, TimeUnit.SECONDS); + } + + // This is a bit ugly, but allows + @Override + protected void finalize() { + super.shutdownNow(); + } +} diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/FlappingSingletonService.java b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/FlappingSingletonService.java index 5ca33ad51d..197fadad9b 100644 --- a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/FlappingSingletonService.java +++ b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/FlappingSingletonService.java @@ -10,7 +10,6 @@ package org.opendaylight.controller.clustering.it.provider.impl; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -28,15 +27,14 @@ public class FlappingSingletonService implements ClusterSingletonService { private static final ServiceGroupIdentifier SERVICE_GROUP_IDENTIFIER = ServiceGroupIdentifier.create("flapping-singleton-service"); + private static final ScheduledExecutorService EXECUTOR = FinalizableScheduledExecutorService.newSingleThread(); + private final ClusterSingletonServiceProvider singletonServiceProvider; + private final AtomicBoolean active = new AtomicBoolean(true); private volatile long flapCount = 0; - private AtomicBoolean active = new AtomicBoolean(true); - private volatile ClusterSingletonServiceRegistration registration; - private static ScheduledExecutorService EXECUTOR = Executors.newSingleThreadScheduledExecutor(); - public FlappingSingletonService(final ClusterSingletonServiceProvider singletonServiceProvider) { LOG.debug("Registering flapping-singleton-service."); @@ -74,8 +72,7 @@ public class FlappingSingletonService implements ClusterSingletonService { EXECUTOR.schedule(() -> { LOG.debug("Running re-registration"); try { - registration = - singletonServiceProvider.registerClusterSingletonService(this); + registration = singletonServiceProvider.registerClusterSingletonService(this); } catch (final Exception e) { LOG.warn("There was a problem re-registering flapping singleton service.", e); setInactive(); -- 2.36.6