X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsamples%2Fclustering-test-app%2Fprovider%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fclustering%2Fit%2Fprovider%2Fimpl%2FPublishNotificationsTask.java;h=79250b6dd08056ea959de75104459b0e9695b21e;hb=dcc776a5e749d495a66e8753e123a1ddbd15d9c6;hp=685831db43ef210b66e110b64700966165b8295b;hpb=9c17a104cd90b5529b5623d1d0136d7cc332707d;p=controller.git diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/PublishNotificationsTask.java b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/PublishNotificationsTask.java index 685831db43..79250b6dd0 100644 --- a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/PublishNotificationsTask.java +++ b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/PublishNotificationsTask.java @@ -5,21 +5,18 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.clustering.it.provider.impl; -import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.SettableFuture; +import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; + import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService; +import org.opendaylight.mdsal.binding.api.NotificationPublishService; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.target.rev170215.IdSequence; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.target.rev170215.IdSequenceBuilder; -import org.opendaylight.yangtools.yang.common.RpcError; -import org.opendaylight.yangtools.yang.common.RpcResult; -import org.opendaylight.yangtools.yang.common.RpcResultBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,19 +34,18 @@ public class PublishNotificationsTask implements Runnable { private long sequenceNumber = 1; private long startTime; - private SettableFuture> completionFuture; private ScheduledFuture scheduledFuture; + private Exception lastError = null; + public PublishNotificationsTask(final NotificationPublishService notificationPublishService, final String notificationId, final long secondsToTake, final long maxPerSecond) { - Preconditions.checkNotNull(notificationPublishService); - Preconditions.checkNotNull(notificationId); - Preconditions.checkArgument(secondsToTake > 0); - Preconditions.checkArgument(maxPerSecond > 0); - this.notificationPublishService = notificationPublishService; - this.notificationId = notificationId; + this.notificationPublishService = requireNonNull(notificationPublishService); + this.notificationId = requireNonNull(notificationId); + checkArgument(secondsToTake > 0); this.timeToTake = secondsToTake * SECOND_AS_NANO; + checkArgument(maxPerSecond > 0); this.delay = SECOND_AS_NANO / maxPerSecond; LOG.debug("Delay : {}", delay); @@ -68,24 +64,38 @@ public class PublishNotificationsTask implements Runnable { notificationPublishService.putNotification(notification); } catch (final InterruptedException e) { LOG.warn("Unexpected exception while publishing notification, : {}", notification, e); - completionFuture.set(RpcResultBuilder.failed() - .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build()); + lastError = e; + + //stop on error + scheduledFuture.cancel(false); + executor.shutdown(); + return; } LOG.debug("current {}, starttime: {}, timetotake: {}, current-start = {}", current, startTime, timeToTake, current - startTime); - if ((current - startTime) > timeToTake) { - completionFuture.set(RpcResultBuilder.success().build()); + if (current - startTime > timeToTake) { LOG.debug("Sequence number: {}", sequenceNumber); scheduledFuture.cancel(false); executor.shutdown(); } } - public void start(final SettableFuture> settableFuture) { + public void start() { startTime = System.nanoTime(); - completionFuture = settableFuture; scheduledFuture = executor.scheduleAtFixedRate(this, 0, delay, TimeUnit.NANOSECONDS); } + + public boolean isFinished() { + return scheduledFuture.isCancelled(); + } + + public long getCurrentNotif() { + return sequenceNumber; + } + + public Exception getLastError() { + return lastError; + } }