Adjust to DOMDataTreeChangeListener update
[controller.git] / opendaylight / md-sal / samples / clustering-test-app / provider / src / main / java / org / opendaylight / controller / clustering / it / provider / impl / PublishNotificationsTask.java
index 685831db43ef210b66e110b64700966165b8295b..79250b6dd08056ea959de75104459b0e9695b21e 100644 (file)
@@ -5,21 +5,18 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.controller.clustering.it.provider.impl;
 
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.SettableFuture;
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
-import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
+import org.opendaylight.mdsal.binding.api.NotificationPublishService;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.target.rev170215.IdSequence;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.target.rev170215.IdSequenceBuilder;
-import org.opendaylight.yangtools.yang.common.RpcError;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,19 +34,18 @@ public class PublishNotificationsTask implements Runnable {
 
     private long sequenceNumber = 1;
     private long startTime;
-    private SettableFuture<RpcResult<Void>> completionFuture;
     private ScheduledFuture<?> scheduledFuture;
 
+    private Exception lastError = null;
+
     public PublishNotificationsTask(final NotificationPublishService notificationPublishService,
                                     final String notificationId, final long secondsToTake, final long maxPerSecond) {
-        Preconditions.checkNotNull(notificationPublishService);
-        Preconditions.checkNotNull(notificationId);
-        Preconditions.checkArgument(secondsToTake > 0);
-        Preconditions.checkArgument(maxPerSecond > 0);
 
-        this.notificationPublishService = notificationPublishService;
-        this.notificationId = notificationId;
+        this.notificationPublishService = requireNonNull(notificationPublishService);
+        this.notificationId = requireNonNull(notificationId);
+        checkArgument(secondsToTake > 0);
         this.timeToTake = secondsToTake * SECOND_AS_NANO;
+        checkArgument(maxPerSecond > 0);
         this.delay = SECOND_AS_NANO / maxPerSecond;
 
         LOG.debug("Delay : {}", delay);
@@ -68,24 +64,38 @@ public class PublishNotificationsTask implements Runnable {
             notificationPublishService.putNotification(notification);
         } catch (final InterruptedException e) {
             LOG.warn("Unexpected exception while publishing notification, : {}", notification, e);
-            completionFuture.set(RpcResultBuilder.<Void>failed()
-                    .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build());
+            lastError = e;
+
+            //stop on error
+            scheduledFuture.cancel(false);
+            executor.shutdown();
+            return;
         }
 
         LOG.debug("current {}, starttime: {}, timetotake: {}, current-start = {}",
                 current, startTime, timeToTake, current - startTime);
 
-        if ((current - startTime) > timeToTake) {
-            completionFuture.set(RpcResultBuilder.<Void>success().build());
+        if (current - startTime > timeToTake) {
             LOG.debug("Sequence number: {}", sequenceNumber);
             scheduledFuture.cancel(false);
             executor.shutdown();
         }
     }
 
-    public void start(final SettableFuture<RpcResult<Void>> settableFuture) {
+    public void start() {
         startTime = System.nanoTime();
-        completionFuture = settableFuture;
         scheduledFuture = executor.scheduleAtFixedRate(this, 0, delay, TimeUnit.NANOSECONDS);
     }
+
+    public boolean isFinished() {
+        return scheduledFuture.isCancelled();
+    }
+
+    public long getCurrentNotif() {
+        return sequenceNumber;
+    }
+
+    public Exception getLastError() {
+        return lastError;
+    }
 }