package org.opendaylight.controller.clustering.it.provider.impl;
import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.SettableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.target.rev170215.IdSequence;
import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.target.rev170215.IdSequenceBuilder;
-import org.opendaylight.yangtools.yang.common.RpcError;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private long sequenceNumber = 1;
private long startTime;
- private SettableFuture<RpcResult<Void>> completionFuture;
private ScheduledFuture<?> scheduledFuture;
+ private Exception lastError = null;
+
public PublishNotificationsTask(final NotificationPublishService notificationPublishService,
final String notificationId, final long secondsToTake, final long maxPerSecond) {
Preconditions.checkNotNull(notificationPublishService);
notificationPublishService.putNotification(notification);
} catch (final InterruptedException e) {
LOG.warn("Unexpected exception while publishing notification, : {}", notification, e);
- completionFuture.set(RpcResultBuilder.<Void>failed()
- .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build());
+ lastError = e;
+
+ //stop on error
+ scheduledFuture.cancel(false);
+ executor.shutdown();
+ return;
}
LOG.debug("current {}, starttime: {}, timetotake: {}, current-start = {}",
current, startTime, timeToTake, current - startTime);
if ((current - startTime) > timeToTake) {
- completionFuture.set(RpcResultBuilder.<Void>success().build());
LOG.debug("Sequence number: {}", sequenceNumber);
scheduledFuture.cancel(false);
executor.shutdown();
}
}
- public void start(final SettableFuture<RpcResult<Void>> settableFuture) {
+ public void start() {
startTime = System.nanoTime();
- completionFuture = settableFuture;
scheduledFuture = executor.scheduleAtFixedRate(this, 0, delay, TimeUnit.NANOSECONDS);
}
+
+ public boolean isFinished() {
+ return scheduledFuture.isCancelled();
+ }
+
+ public long getCurrentNotif() {
+ return sequenceNumber;
+ }
+
+ public Exception getLastError() {
+ return lastError;
+ }
}