BUG-8618: fix test driver 37/60437/1
authorRobert Varga <robert.varga@pantheon.tech>
Mon, 10 Jul 2017 14:08:57 +0000 (16:08 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Sun, 16 Jul 2017 21:39:01 +0000 (23:39 +0200)
Since the test can produce bursts of completions, which in turn can
get slowed down by writout of new messages, offload future completion
to the executor we have internally. This in turn simplifies things,
as we can rely on state being manipulated (mostly) from a single thread.

Also change ArrayDeque to a HashSet to ensure removal of tasks completes
quickly even in face of misordered responses.

Change-Id: Ia5341633af2dbe3e26e7208436405daf7632a876
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit 2be77b3bcef31ad8b6dbdce073471561d2cf76d6)

opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/AbstractTransactionHandler.java
opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/FinalizableScheduledExecutorService.java [new file with mode: 0644]
opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/FlappingSingletonService.java

index ea0749a..d0923ce 100644 (file)
@@ -11,10 +11,9 @@ import com.google.common.base.Stopwatch;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import java.util.ArrayDeque;
-import java.util.Queue;
+import java.util.Collection;
+import java.util.HashSet;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -35,7 +34,7 @@ abstract class AbstractTransactionHandler {
 
     private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionHandler.class);
 
-    static final int SECOND_AS_NANO = 1000000000;
+    static final int SECOND_AS_NANO = 1_000_000_000;
     //2^20 as in the model
     static final int MAX_ITEM = 1048576;
 
@@ -55,10 +54,10 @@ abstract class AbstractTransactionHandler {
 
     static final long INIT_TX_TIMEOUT_SECONDS = 125;
 
-    private static final long DEAD_TIMEOUT_SECONDS = TimeUnit.MINUTES.toSeconds(5);
+    private static final long DEAD_TIMEOUT_SECONDS = TimeUnit.MINUTES.toSeconds(15);
 
-    private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
-    private final Queue<ListenableFuture<Void>> futures = new ArrayDeque<>();
+    private final ScheduledExecutorService executor = FinalizableScheduledExecutorService.newSingleThread();
+    private final Collection<ListenableFuture<Void>> futures = new HashSet<>();
     private final Stopwatch stopwatch = Stopwatch.createUnstarted();
     private final long runtimeNanos;
     private final long delayNanos;
@@ -99,7 +98,6 @@ abstract class AbstractTransactionHandler {
                 state = State.WAITING;
                 scheduledFuture.cancel(false);
                 scheduledFuture = executor.schedule(this::checkComplete, DEAD_TIMEOUT_SECONDS, TimeUnit.SECONDS);
-                executor.shutdown();
             }
 
             return;
@@ -122,10 +120,10 @@ abstract class AbstractTransactionHandler {
             public void onFailure(final Throwable cause) {
                 txFailure(execFuture, txId, cause);
             }
-        });
+        }, executor);
     }
 
-    final synchronized void txSuccess(final ListenableFuture<Void> execFuture, final long txId) {
+    final void txSuccess(final ListenableFuture<Void> execFuture, final long txId) {
         LOG.debug("Future #{} completed successfully", txId);
         futures.remove(execFuture);
 
@@ -142,7 +140,7 @@ abstract class AbstractTransactionHandler {
         }
     }
 
-    final synchronized void txFailure(final ListenableFuture<Void> execFuture, final long txId, final Throwable cause) {
+    final void txFailure(final ListenableFuture<Void> execFuture, final long txId, final Throwable cause) {
         LOG.debug("Future #{} failed", txId, cause);
         futures.remove(execFuture);
 
@@ -154,7 +152,6 @@ abstract class AbstractTransactionHandler {
             case WAITING:
                 state = State.FAILED;
                 scheduledFuture.cancel(false);
-                executor.shutdown();
                 runFailed(cause);
                 break;
             default:
@@ -162,7 +159,7 @@ abstract class AbstractTransactionHandler {
         }
     }
 
-    private synchronized void checkComplete() {
+    private void checkComplete() {
         final int size = futures.size();
         if (size == 0) {
             return;
@@ -192,7 +189,6 @@ abstract class AbstractTransactionHandler {
             LOG.debug("Completed waiting for all futures");
             state = State.SUCCESSFUL;
             scheduledFuture.cancel(false);
-            executor.shutdown();
             runSuccessful(txCounter);
             return true;
         }
diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/FinalizableScheduledExecutorService.java b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/FinalizableScheduledExecutorService.java
new file mode 100644 (file)
index 0000000..19e2373
--- /dev/null
@@ -0,0 +1,36 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.clustering.it.provider.impl;
+
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A simple ScheduledExecutorService, which shutds down its threads after a period of inactivity. It is safe to not
+ * shutdown this
+ *
+ * @author Robert Varga
+ */
+final class FinalizableScheduledExecutorService extends ScheduledThreadPoolExecutor {
+
+    private FinalizableScheduledExecutorService(final int maxThreads, final long time, final TimeUnit unit) {
+        super(maxThreads);
+        setKeepAliveTime(time, unit);
+        allowCoreThreadTimeOut(true);
+    }
+
+    static ScheduledThreadPoolExecutor newSingleThread() {
+        return new FinalizableScheduledExecutorService(1, 15, TimeUnit.SECONDS);
+    }
+
+    // This is a bit ugly, but allows
+    @Override
+    protected void finalize() {
+        super.shutdownNow();
+    }
+}
index 5ca33ad..197fada 100644 (file)
@@ -10,7 +10,6 @@ package org.opendaylight.controller.clustering.it.provider.impl;
 
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -28,15 +27,14 @@ public class FlappingSingletonService implements ClusterSingletonService {
     private static final ServiceGroupIdentifier SERVICE_GROUP_IDENTIFIER =
             ServiceGroupIdentifier.create("flapping-singleton-service");
 
+    private static final ScheduledExecutorService EXECUTOR = FinalizableScheduledExecutorService.newSingleThread();
+
     private final ClusterSingletonServiceProvider singletonServiceProvider;
+    private final AtomicBoolean active = new AtomicBoolean(true);
 
     private volatile long flapCount = 0;
-    private AtomicBoolean active = new AtomicBoolean(true);
-
     private volatile ClusterSingletonServiceRegistration registration;
 
-    private static ScheduledExecutorService EXECUTOR = Executors.newSingleThreadScheduledExecutor();
-
     public FlappingSingletonService(final ClusterSingletonServiceProvider singletonServiceProvider) {
         LOG.debug("Registering flapping-singleton-service.");
 
@@ -74,8 +72,7 @@ public class FlappingSingletonService implements ClusterSingletonService {
             EXECUTOR.schedule(() -> {
                 LOG.debug("Running re-registration");
                 try {
-                    registration =
-                            singletonServiceProvider.registerClusterSingletonService(this);
+                    registration = singletonServiceProvider.registerClusterSingletonService(this);
                 } catch (final Exception e) {
                     LOG.warn("There was a problem re-registering flapping singleton service.", e);
                     setInactive();

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.