BUG-8618: fix test driver 37/60137/8
authorRobert Varga <robert.varga@pantheon.tech>
Mon, 10 Jul 2017 14:08:57 +0000 (16:08 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Sat, 15 Jul 2017 01:35:30 +0000 (03:35 +0200)
Since the test can produce bursts of completions, which in turn can
get slowed down by writout of new messages, offload future completion
to the executor we have internally. This in turn simplifies things,
as we can rely on state being manipulated (mostly) from a single thread.

Also change ArrayDeque to a HashSet to ensure removal of tasks completes
quickly even in face of misordered responses.

Change-Id: Ia5341633af2dbe3e26e7208436405daf7632a876
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/AbstractTransactionHandler.java
opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/FinalizableScheduledExecutorService.java [new file with mode: 0644]
opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/FlappingSingletonService.java

index ea0749a2d9d11ee615381f462c69aee4d62de0d2..d0923ce6de7a5b4486f2292231c3fcc0f93dc471 100644 (file)
@@ -11,10 +11,9 @@ import com.google.common.base.Stopwatch;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import java.util.ArrayDeque;
-import java.util.Queue;
+import java.util.Collection;
+import java.util.HashSet;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -35,7 +34,7 @@ abstract class AbstractTransactionHandler {
 
     private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionHandler.class);
 
-    static final int SECOND_AS_NANO = 1000000000;
+    static final int SECOND_AS_NANO = 1_000_000_000;
     //2^20 as in the model
     static final int MAX_ITEM = 1048576;
 
@@ -55,10 +54,10 @@ abstract class AbstractTransactionHandler {
 
     static final long INIT_TX_TIMEOUT_SECONDS = 125;
 
-    private static final long DEAD_TIMEOUT_SECONDS = TimeUnit.MINUTES.toSeconds(5);
+    private static final long DEAD_TIMEOUT_SECONDS = TimeUnit.MINUTES.toSeconds(15);
 
-    private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
-    private final Queue<ListenableFuture<Void>> futures = new ArrayDeque<>();
+    private final ScheduledExecutorService executor = FinalizableScheduledExecutorService.newSingleThread();
+    private final Collection<ListenableFuture<Void>> futures = new HashSet<>();
     private final Stopwatch stopwatch = Stopwatch.createUnstarted();
     private final long runtimeNanos;
     private final long delayNanos;
@@ -99,7 +98,6 @@ abstract class AbstractTransactionHandler {
                 state = State.WAITING;
                 scheduledFuture.cancel(false);
                 scheduledFuture = executor.schedule(this::checkComplete, DEAD_TIMEOUT_SECONDS, TimeUnit.SECONDS);
-                executor.shutdown();
             }
 
             return;
@@ -122,10 +120,10 @@ abstract class AbstractTransactionHandler {
             public void onFailure(final Throwable cause) {
                 txFailure(execFuture, txId, cause);
             }
-        });
+        }, executor);
     }
 
-    final synchronized void txSuccess(final ListenableFuture<Void> execFuture, final long txId) {
+    final void txSuccess(final ListenableFuture<Void> execFuture, final long txId) {
         LOG.debug("Future #{} completed successfully", txId);
         futures.remove(execFuture);
 
@@ -142,7 +140,7 @@ abstract class AbstractTransactionHandler {
         }
     }
 
-    final synchronized void txFailure(final ListenableFuture<Void> execFuture, final long txId, final Throwable cause) {
+    final void txFailure(final ListenableFuture<Void> execFuture, final long txId, final Throwable cause) {
         LOG.debug("Future #{} failed", txId, cause);
         futures.remove(execFuture);
 
@@ -154,7 +152,6 @@ abstract class AbstractTransactionHandler {
             case WAITING:
                 state = State.FAILED;
                 scheduledFuture.cancel(false);
-                executor.shutdown();
                 runFailed(cause);
                 break;
             default:
@@ -162,7 +159,7 @@ abstract class AbstractTransactionHandler {
         }
     }
 
-    private synchronized void checkComplete() {
+    private void checkComplete() {
         final int size = futures.size();
         if (size == 0) {
             return;
@@ -192,7 +189,6 @@ abstract class AbstractTransactionHandler {
             LOG.debug("Completed waiting for all futures");
             state = State.SUCCESSFUL;
             scheduledFuture.cancel(false);
-            executor.shutdown();
             runSuccessful(txCounter);
             return true;
         }
diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/FinalizableScheduledExecutorService.java b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/FinalizableScheduledExecutorService.java
new file mode 100644 (file)
index 0000000..19e2373
--- /dev/null
@@ -0,0 +1,36 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.clustering.it.provider.impl;
+
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A simple ScheduledExecutorService, which shutds down its threads after a period of inactivity. It is safe to not
+ * shutdown this
+ *
+ * @author Robert Varga
+ */
+final class FinalizableScheduledExecutorService extends ScheduledThreadPoolExecutor {
+
+    private FinalizableScheduledExecutorService(final int maxThreads, final long time, final TimeUnit unit) {
+        super(maxThreads);
+        setKeepAliveTime(time, unit);
+        allowCoreThreadTimeOut(true);
+    }
+
+    static ScheduledThreadPoolExecutor newSingleThread() {
+        return new FinalizableScheduledExecutorService(1, 15, TimeUnit.SECONDS);
+    }
+
+    // This is a bit ugly, but allows
+    @Override
+    protected void finalize() {
+        super.shutdownNow();
+    }
+}
index 5ca33ad51d63e268ce905a6cb87c44b9fb84dc81..197fadad9b64056c3f175bf609b7fc5426b508fc 100644 (file)
@@ -10,7 +10,6 @@ package org.opendaylight.controller.clustering.it.provider.impl;
 
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -28,15 +27,14 @@ public class FlappingSingletonService implements ClusterSingletonService {
     private static final ServiceGroupIdentifier SERVICE_GROUP_IDENTIFIER =
             ServiceGroupIdentifier.create("flapping-singleton-service");
 
+    private static final ScheduledExecutorService EXECUTOR = FinalizableScheduledExecutorService.newSingleThread();
+
     private final ClusterSingletonServiceProvider singletonServiceProvider;
+    private final AtomicBoolean active = new AtomicBoolean(true);
 
     private volatile long flapCount = 0;
-    private AtomicBoolean active = new AtomicBoolean(true);
-
     private volatile ClusterSingletonServiceRegistration registration;
 
-    private static ScheduledExecutorService EXECUTOR = Executors.newSingleThreadScheduledExecutor();
-
     public FlappingSingletonService(final ClusterSingletonServiceProvider singletonServiceProvider) {
         LOG.debug("Registering flapping-singleton-service.");
 
@@ -74,8 +72,7 @@ public class FlappingSingletonService implements ClusterSingletonService {
             EXECUTOR.schedule(() -> {
                 LOG.debug("Running re-registration");
                 try {
-                    registration =
-                            singletonServiceProvider.registerClusterSingletonService(this);
+                    registration = singletonServiceProvider.registerClusterSingletonService(this);
                 } catch (final Exception e) {
                     LOG.warn("There was a problem re-registering flapping singleton service.", e);
                     setInactive();