From: Vratko Polak Date: Fri, 17 Mar 2017 14:07:03 +0000 (+0100) Subject: Bug 8015, Bug 7800: Do not block when publishing notifications X-Git-Tag: release/carbon~143 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=350d31783d2e7ebcaaa9cc48b572a4d1f2974650 Bug 8015, Bug 7800: Do not block when publishing notifications + Yang model edited. + check-publish-notificatons implemented. Change-Id: I757269a61bb819d2abcb07f6106b5e2ed7a34dec Signed-off-by: Vratko Polak Signed-off-by: Tomas Cere --- diff --git a/opendaylight/md-sal/samples/clustering-test-app/model/src/main/yang/odl-mdsal-lowlevel-control.yang b/opendaylight/md-sal/samples/clustering-test-app/model/src/main/yang/odl-mdsal-lowlevel-control.yang index 2a5299d761..1420a0b8e9 100644 --- a/opendaylight/md-sal/samples/clustering-test-app/model/src/main/yang/odl-mdsal-lowlevel-control.yang +++ b/opendaylight/md-sal/samples/clustering-test-app/model/src/main/yang/odl-mdsal-lowlevel-control.yang @@ -124,11 +124,13 @@ module odl-mdsal-lowlevel-control { } } - rpc publish-notifications { - description "Upon receiving this, the member shall start publishing llt:id-sequence - notifications with the given id and sequence numbers, increasing, from 1. - The RPC shall not return until all publishes are successful, - or an exception is raised (the exception should propagate to restconf response)."; + rpc start-publish-notifications { + description "Upon receiving this, the member checks whether it is already in the middle of publishing, + for this id, and fails if yes. If no, the member shall clear any state tracking data possibly present + from the previous call wth this id, and start publishing llt:id-sequence + notifications with the given id and sequence numbers increasing from 1. + The RPC shall return immediatelly before the first notification is published. + The publishing task stops on first error of after the given time."; input { uses llc:id-grouping; leaf seconds { @@ -145,6 +147,35 @@ module odl-mdsal-lowlevel-control { // No output. } + rpc check-publish-notifications { + description "Upon receiving this, the member shall immediatelly return + the current tracking data related to the current (or previous) task + started by start-publish-notifications with this id."; + input { + uses llc:id-grouping; + } + output { + leaf active { + description "True if a publishing task for this id is running, false otherwise."; + mandatory true; + type boolean; + } + leaf publish-count { + description "How many notifications were published for this id since last start. + If there was no start-publish-notifications call for this id, this leaf is absent."; + mandatory false; + type int64; + } + leaf last-error { + description "If no task has been started by start-publish-notifications for this id, + or if the last such call has not encountered an error, this leaf is absent. + Otherwise it contains a string message from the last error, including stacktrace if possible."; + mandatory false; + type string; + } + } + } + rpc subscribe-ynl { description "Upon receiving this, the member checks whether it has already subscribed a yang listener for the given id, and fails if yes. diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/MdsalLowLevelTestProvider.java b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/MdsalLowLevelTestProvider.java index 4ff42c14a1..6b94983b6f 100644 --- a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/MdsalLowLevelTestProvider.java +++ b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/MdsalLowLevelTestProvider.java @@ -10,6 +10,8 @@ package org.opendaylight.controller.clustering.it.provider; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.SettableFuture; +import java.io.PrintWriter; +import java.io.StringWriter; import java.util.HashMap; import java.util.Map; import java.util.concurrent.Future; @@ -34,15 +36,18 @@ import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegist import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomeModuleLeaderInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderInput; +import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsInput; +import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutput; +import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutputBuilder; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedOutput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.OdlMdsalLowlevelControlService; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput; -import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.PublishNotificationsInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput; +import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutput; @@ -84,6 +89,7 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService private DOMRpcImplementationRegistration globalGetConstantRegistration = null; private ClusterSingletonServiceRegistration getSingletonConstantRegistration; private FlappingSingletonService flappingSingletonService; + private Map publishNotificationsTasks = new HashMap<>(); public MdsalLowLevelTestProvider(final RpcProviderRegistry rpcRegistry, final DOMRpcProviderService domRpcService, @@ -132,16 +138,17 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService } @Override - public Future> publishNotifications(final PublishNotificationsInput input) { + public Future> startPublishNotifications(final StartPublishNotificationsInput input) { LOG.debug("publish-notifications, input: {}", input); final PublishNotificationsTask task = new PublishNotificationsTask(notificationPublishService, input.getId(), input.getSeconds(), input.getNotificationsPerSecond()); - final SettableFuture> settableFuture = SettableFuture.create(); - task.start(settableFuture); + publishNotificationsTasks.put(input.getId(), task); - return settableFuture; + task.start(); + + return Futures.immediateFuture(RpcResultBuilder.success().build()); } @Override @@ -361,6 +368,33 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService return Futures.immediateFuture(RpcResultBuilder.success().withResult(output).build()); } + @Override + public Future> checkPublishNotifications( + final CheckPublishNotificationsInput input) { + + final PublishNotificationsTask task = publishNotificationsTasks.get(input.getId()); + + if (task == null) { + return Futures.immediateFuture(RpcResultBuilder.success( + new CheckPublishNotificationsOutputBuilder().setActive(false)).build()); + } + + final CheckPublishNotificationsOutputBuilder checkPublishNotificationsOutputBuilder = + new CheckPublishNotificationsOutputBuilder().setActive(!task.isFinished()); + + if (task.getLastError() != null) { + final StringWriter sw = new StringWriter(); + final PrintWriter pw = new PrintWriter(sw); + task.getLastError().printStackTrace(pw); + checkPublishNotificationsOutputBuilder.setLastError(task.getLastError().toString() + sw.toString()); + } + + final CheckPublishNotificationsOutput output = + checkPublishNotificationsOutputBuilder.setPublishCount(task.getCurrentNotif()).build(); + + return Futures.immediateFuture(RpcResultBuilder.success(output).build()); + } + @Override public Future> produceTransactions(ProduceTransactionsInput input) { return null; diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/PublishNotificationsTask.java b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/PublishNotificationsTask.java index 685831db43..bd755d40c8 100644 --- a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/PublishNotificationsTask.java +++ b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/PublishNotificationsTask.java @@ -9,7 +9,6 @@ package org.opendaylight.controller.clustering.it.provider.impl; import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.SettableFuture; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -17,9 +16,6 @@ import java.util.concurrent.TimeUnit; import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.target.rev170215.IdSequence; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.target.rev170215.IdSequenceBuilder; -import org.opendaylight.yangtools.yang.common.RpcError; -import org.opendaylight.yangtools.yang.common.RpcResult; -import org.opendaylight.yangtools.yang.common.RpcResultBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,9 +33,10 @@ public class PublishNotificationsTask implements Runnable { private long sequenceNumber = 1; private long startTime; - private SettableFuture> completionFuture; private ScheduledFuture scheduledFuture; + private Exception lastError = null; + public PublishNotificationsTask(final NotificationPublishService notificationPublishService, final String notificationId, final long secondsToTake, final long maxPerSecond) { Preconditions.checkNotNull(notificationPublishService); @@ -68,24 +65,38 @@ public class PublishNotificationsTask implements Runnable { notificationPublishService.putNotification(notification); } catch (final InterruptedException e) { LOG.warn("Unexpected exception while publishing notification, : {}", notification, e); - completionFuture.set(RpcResultBuilder.failed() - .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build()); + lastError = e; + + //stop on error + scheduledFuture.cancel(false); + executor.shutdown(); + return; } LOG.debug("current {}, starttime: {}, timetotake: {}, current-start = {}", current, startTime, timeToTake, current - startTime); if ((current - startTime) > timeToTake) { - completionFuture.set(RpcResultBuilder.success().build()); LOG.debug("Sequence number: {}", sequenceNumber); scheduledFuture.cancel(false); executor.shutdown(); } } - public void start(final SettableFuture> settableFuture) { + public void start() { startTime = System.nanoTime(); - completionFuture = settableFuture; scheduledFuture = executor.scheduleAtFixedRate(this, 0, delay, TimeUnit.NANOSECONDS); } + + public boolean isFinished() { + return scheduledFuture.isCancelled(); + } + + public long getCurrentNotif() { + return sequenceNumber; + } + + public Exception getLastError() { + return lastError; + } }