Bug 8015, Bug 7800: Do not block when publishing notifications 76/53476/5
authorVratko Polak <vrpolak@cisco.com>
Fri, 17 Mar 2017 14:07:03 +0000 (15:07 +0100)
committerTom Pantelis <tompantelis@gmail.com>
Thu, 23 Mar 2017 11:16:05 +0000 (11:16 +0000)
+ Yang model edited.
+ check-publish-notificatons implemented.

Change-Id: I757269a61bb819d2abcb07f6106b5e2ed7a34dec
Signed-off-by: Vratko Polak <vrpolak@cisco.com>
Signed-off-by: Tomas Cere <tcere@cisco.com>
opendaylight/md-sal/samples/clustering-test-app/model/src/main/yang/odl-mdsal-lowlevel-control.yang
opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/MdsalLowLevelTestProvider.java
opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/PublishNotificationsTask.java

index 2a5299d761e328624019d9ac021fc1c5accb9df8..1420a0b8e9b9948a07d0aa3e385f8b9ab5104589 100644 (file)
@@ -124,11 +124,13 @@ module odl-mdsal-lowlevel-control {
         }
     }
 
         }
     }
 
-    rpc publish-notifications {
-        description "Upon receiving this, the member shall start publishing llt:id-sequence
-            notifications with the given id and sequence numbers, increasing, from 1.
-            The RPC shall not return until all publishes are successful,
-            or an exception is raised (the exception should propagate to restconf response).";
+    rpc start-publish-notifications {
+        description "Upon receiving this, the member checks whether it is already in the middle of publishing,
+            for this id, and fails if yes. If no, the member shall clear any state tracking data possibly present
+            from the previous call wth this id, and start publishing llt:id-sequence
+            notifications with the given id and sequence numbers increasing from 1.
+            The RPC shall return immediatelly before the first notification is published.
+            The publishing task stops on first error of after the given time.";
         input {
             uses llc:id-grouping;
             leaf seconds {
         input {
             uses llc:id-grouping;
             leaf seconds {
@@ -145,6 +147,35 @@ module odl-mdsal-lowlevel-control {
         // No output.
     }
 
         // No output.
     }
 
+    rpc check-publish-notifications {
+        description "Upon receiving this, the member shall immediatelly return
+            the current tracking data related to the current (or previous) task
+            started by start-publish-notifications with this id.";
+        input {
+            uses llc:id-grouping;
+        }
+        output {
+            leaf active {
+                description "True if a publishing task for this id is running, false otherwise.";
+                mandatory true;
+                type boolean;
+            }
+            leaf publish-count {
+                description "How many notifications were published for this id since last start.
+                    If there was no start-publish-notifications call for this id, this leaf is absent.";
+                mandatory false;
+                type int64;
+            }
+            leaf last-error {
+                description "If no task has been started by start-publish-notifications for this id,
+                    or if the last such call has not encountered an error, this leaf is absent.
+                    Otherwise it contains a string message from the last error, including stacktrace if possible.";
+                mandatory false;
+                type string;
+            }
+        }
+    }
+
     rpc subscribe-ynl {
         description "Upon receiving this, the member checks whether it has already subscribed
             a yang listener for the given id, and fails if yes.
     rpc subscribe-ynl {
         description "Upon receiving this, the member checks whether it has already subscribed
             a yang listener for the given id, and fails if yes.
index 4ff42c14a1c6c76e1a5446811fca0e7d2fdc1758..6b94983b6f360129beead2ebb82c6ad8f1bdfe2c 100644 (file)
@@ -10,6 +10,8 @@ package org.opendaylight.controller.clustering.it.provider;
 
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.SettableFuture;
 
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.SettableFuture;
+import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.Future;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.Future;
@@ -34,15 +36,18 @@ import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegist
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomeModuleLeaderInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomeModuleLeaderInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderInput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsInput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutputBuilder;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedOutput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.OdlMdsalLowlevelControlService;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedOutput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.OdlMdsalLowlevelControlService;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput;
-import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.PublishNotificationsInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutput;
@@ -84,6 +89,7 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
     private DOMRpcImplementationRegistration<GetConstantService> globalGetConstantRegistration = null;
     private ClusterSingletonServiceRegistration getSingletonConstantRegistration;
     private FlappingSingletonService flappingSingletonService;
     private DOMRpcImplementationRegistration<GetConstantService> globalGetConstantRegistration = null;
     private ClusterSingletonServiceRegistration getSingletonConstantRegistration;
     private FlappingSingletonService flappingSingletonService;
+    private Map<String, PublishNotificationsTask> publishNotificationsTasks = new HashMap<>();
 
     public MdsalLowLevelTestProvider(final RpcProviderRegistry rpcRegistry,
                                      final DOMRpcProviderService domRpcService,
 
     public MdsalLowLevelTestProvider(final RpcProviderRegistry rpcRegistry,
                                      final DOMRpcProviderService domRpcService,
@@ -132,16 +138,17 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
     }
 
     @Override
     }
 
     @Override
-    public Future<RpcResult<Void>> publishNotifications(final PublishNotificationsInput input) {
+    public Future<RpcResult<Void>> startPublishNotifications(final StartPublishNotificationsInput input) {
         LOG.debug("publish-notifications, input: {}", input);
 
         final PublishNotificationsTask task = new PublishNotificationsTask(notificationPublishService, input.getId(),
                 input.getSeconds(), input.getNotificationsPerSecond());
 
         LOG.debug("publish-notifications, input: {}", input);
 
         final PublishNotificationsTask task = new PublishNotificationsTask(notificationPublishService, input.getId(),
                 input.getSeconds(), input.getNotificationsPerSecond());
 
-        final SettableFuture<RpcResult<Void>> settableFuture = SettableFuture.create();
-        task.start(settableFuture);
+        publishNotificationsTasks.put(input.getId(), task);
 
 
-        return settableFuture;
+        task.start();
+
+        return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
     }
 
     @Override
     }
 
     @Override
@@ -361,6 +368,33 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
         return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeYnlOutput>success().withResult(output).build());
     }
 
         return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeYnlOutput>success().withResult(output).build());
     }
 
+    @Override
+    public Future<RpcResult<CheckPublishNotificationsOutput>> checkPublishNotifications(
+            final CheckPublishNotificationsInput input) {
+
+        final PublishNotificationsTask task = publishNotificationsTasks.get(input.getId());
+
+        if (task == null) {
+            return Futures.immediateFuture(RpcResultBuilder.success(
+                    new CheckPublishNotificationsOutputBuilder().setActive(false)).build());
+        }
+
+        final CheckPublishNotificationsOutputBuilder checkPublishNotificationsOutputBuilder =
+                new CheckPublishNotificationsOutputBuilder().setActive(!task.isFinished());
+
+        if (task.getLastError() != null) {
+            final StringWriter sw = new StringWriter();
+            final PrintWriter pw = new PrintWriter(sw);
+            task.getLastError().printStackTrace(pw);
+            checkPublishNotificationsOutputBuilder.setLastError(task.getLastError().toString() + sw.toString());
+        }
+
+        final CheckPublishNotificationsOutput output =
+                checkPublishNotificationsOutputBuilder.setPublishCount(task.getCurrentNotif()).build();
+
+        return Futures.immediateFuture(RpcResultBuilder.success(output).build());
+    }
+
     @Override
     public Future<RpcResult<Void>> produceTransactions(ProduceTransactionsInput input) {
         return null;
     @Override
     public Future<RpcResult<Void>> produceTransactions(ProduceTransactionsInput input) {
         return null;
index 685831db43ef210b66e110b64700966165b8295b..bd755d40c8b24fc87cb1ff9b608ca1b0970fb8c1 100644 (file)
@@ -9,7 +9,6 @@
 package org.opendaylight.controller.clustering.it.provider.impl;
 
 import com.google.common.base.Preconditions;
 package org.opendaylight.controller.clustering.it.provider.impl;
 
 import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.SettableFuture;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
@@ -17,9 +16,6 @@ import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.target.rev170215.IdSequence;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.target.rev170215.IdSequenceBuilder;
 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.target.rev170215.IdSequence;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.target.rev170215.IdSequenceBuilder;
-import org.opendaylight.yangtools.yang.common.RpcError;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,9 +33,10 @@ public class PublishNotificationsTask implements Runnable {
 
     private long sequenceNumber = 1;
     private long startTime;
 
     private long sequenceNumber = 1;
     private long startTime;
-    private SettableFuture<RpcResult<Void>> completionFuture;
     private ScheduledFuture<?> scheduledFuture;
 
     private ScheduledFuture<?> scheduledFuture;
 
+    private Exception lastError = null;
+
     public PublishNotificationsTask(final NotificationPublishService notificationPublishService,
                                     final String notificationId, final long secondsToTake, final long maxPerSecond) {
         Preconditions.checkNotNull(notificationPublishService);
     public PublishNotificationsTask(final NotificationPublishService notificationPublishService,
                                     final String notificationId, final long secondsToTake, final long maxPerSecond) {
         Preconditions.checkNotNull(notificationPublishService);
@@ -68,24 +65,38 @@ public class PublishNotificationsTask implements Runnable {
             notificationPublishService.putNotification(notification);
         } catch (final InterruptedException e) {
             LOG.warn("Unexpected exception while publishing notification, : {}", notification, e);
             notificationPublishService.putNotification(notification);
         } catch (final InterruptedException e) {
             LOG.warn("Unexpected exception while publishing notification, : {}", notification, e);
-            completionFuture.set(RpcResultBuilder.<Void>failed()
-                    .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build());
+            lastError = e;
+
+            //stop on error
+            scheduledFuture.cancel(false);
+            executor.shutdown();
+            return;
         }
 
         LOG.debug("current {}, starttime: {}, timetotake: {}, current-start = {}",
                 current, startTime, timeToTake, current - startTime);
 
         if ((current - startTime) > timeToTake) {
         }
 
         LOG.debug("current {}, starttime: {}, timetotake: {}, current-start = {}",
                 current, startTime, timeToTake, current - startTime);
 
         if ((current - startTime) > timeToTake) {
-            completionFuture.set(RpcResultBuilder.<Void>success().build());
             LOG.debug("Sequence number: {}", sequenceNumber);
             scheduledFuture.cancel(false);
             executor.shutdown();
         }
     }
 
             LOG.debug("Sequence number: {}", sequenceNumber);
             scheduledFuture.cancel(false);
             executor.shutdown();
         }
     }
 
-    public void start(final SettableFuture<RpcResult<Void>> settableFuture) {
+    public void start() {
         startTime = System.nanoTime();
         startTime = System.nanoTime();
-        completionFuture = settableFuture;
         scheduledFuture = executor.scheduleAtFixedRate(this, 0, delay, TimeUnit.NANOSECONDS);
     }
         scheduledFuture = executor.scheduleAtFixedRate(this, 0, delay, TimeUnit.NANOSECONDS);
     }
+
+    public boolean isFinished() {
+        return scheduledFuture.isCancelled();
+    }
+
+    public long getCurrentNotif() {
+        return sequenceNumber;
+    }
+
+    public Exception getLastError() {
+        return lastError;
+    }
 }
 }