Bug 8015, Bug 7800: Do not block when publishing notifications
[controller.git] / opendaylight / md-sal / samples / clustering-test-app / provider / src / main / java / org / opendaylight / controller / clustering / it / provider / impl / PublishNotificationsTask.java
index 685831db43ef210b66e110b64700966165b8295b..bd755d40c8b24fc87cb1ff9b608ca1b0970fb8c1 100644 (file)
@@ -9,7 +9,6 @@
 package org.opendaylight.controller.clustering.it.provider.impl;
 
 import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.SettableFuture;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
@@ -17,9 +16,6 @@ import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.target.rev170215.IdSequence;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.target.rev170215.IdSequenceBuilder;
-import org.opendaylight.yangtools.yang.common.RpcError;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,9 +33,10 @@ public class PublishNotificationsTask implements Runnable {
 
     private long sequenceNumber = 1;
     private long startTime;
-    private SettableFuture<RpcResult<Void>> completionFuture;
     private ScheduledFuture<?> scheduledFuture;
 
+    private Exception lastError = null;
+
     public PublishNotificationsTask(final NotificationPublishService notificationPublishService,
                                     final String notificationId, final long secondsToTake, final long maxPerSecond) {
         Preconditions.checkNotNull(notificationPublishService);
@@ -68,24 +65,38 @@ public class PublishNotificationsTask implements Runnable {
             notificationPublishService.putNotification(notification);
         } catch (final InterruptedException e) {
             LOG.warn("Unexpected exception while publishing notification, : {}", notification, e);
-            completionFuture.set(RpcResultBuilder.<Void>failed()
-                    .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build());
+            lastError = e;
+
+            //stop on error
+            scheduledFuture.cancel(false);
+            executor.shutdown();
+            return;
         }
 
         LOG.debug("current {}, starttime: {}, timetotake: {}, current-start = {}",
                 current, startTime, timeToTake, current - startTime);
 
         if ((current - startTime) > timeToTake) {
-            completionFuture.set(RpcResultBuilder.<Void>success().build());
             LOG.debug("Sequence number: {}", sequenceNumber);
             scheduledFuture.cancel(false);
             executor.shutdown();
         }
     }
 
-    public void start(final SettableFuture<RpcResult<Void>> settableFuture) {
+    public void start() {
         startTime = System.nanoTime();
-        completionFuture = settableFuture;
         scheduledFuture = executor.scheduleAtFixedRate(this, 0, delay, TimeUnit.NANOSECONDS);
     }
+
+    public boolean isFinished() {
+        return scheduledFuture.isCancelled();
+    }
+
+    public long getCurrentNotif() {
+        return sequenceNumber;
+    }
+
+    public Exception getLastError() {
+        return lastError;
+    }
 }