2 * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.impl.statistics;
10 import com.google.common.util.concurrent.AbstractScheduledService;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import com.google.common.util.concurrent.Service;
16 import com.google.common.util.concurrent.SettableFuture;
17 import java.util.concurrent.CancellationException;
18 import java.util.concurrent.CompletableFuture;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.TimeUnit;
21 import java.util.concurrent.TimeoutException;
22 import java.util.function.Supplier;
23 import org.eclipse.jdt.annotation.NonNull;
24 import org.opendaylight.openflowplugin.api.ConnectionException;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
28 public class StatisticsPollingService extends AbstractScheduledService {
29 private static final Logger LOG = LoggerFactory.getLogger(StatisticsPollingService.class);
30 private static final long DEFAULT_STATS_TIMEOUT = 50000;
32 private final TimeCounter counter;
33 private final long pollingInterval;
34 private final long maximumTimerDelay;
35 private final Supplier<ListenableFuture<Boolean>> gatheringSupplier;
36 private final SettableFuture<Void> future = SettableFuture.create();
38 StatisticsPollingService(@NonNull final TimeCounter counter,
39 final long pollingInterval,
40 final long maximumTimerDelay,
41 @NonNull final Supplier<ListenableFuture<Boolean>> gatheringSupplier) {
42 this.counter = counter;
43 this.pollingInterval = pollingInterval;
44 this.maximumTimerDelay = maximumTimerDelay;
45 this.gatheringSupplier = gatheringSupplier;
46 this.addListener(new StatisticsPollingServiceListener(), MoreExecutors.directExecutor());
49 ListenableFuture<Void> stop() {
55 protected void startUp() {
60 protected void runOneIteration() throws Exception {
61 final long averageTime = counter.getAverageTimeBetweenMarks();
62 final long statsTimeout = averageTime > 0 ? 3 * averageTime : DEFAULT_STATS_TIMEOUT;
63 final CompletableFuture<Boolean> waitFuture = new CompletableFuture<>();
65 Futures.addCallback(gatheringSupplier.get(), new FutureCallback<Boolean>() {
67 public void onSuccess(final Boolean result) {
68 waitFuture.complete(result);
72 public void onFailure(final Throwable throwable) {
73 waitFuture.completeExceptionally(throwable);
75 }, MoreExecutors.directExecutor());
78 waitFuture.get(statsTimeout, TimeUnit.MILLISECONDS);
79 } catch (InterruptedException | ExecutionException | TimeoutException e) {
80 LOG.error("Exception occured while waiting for the stats collection.", e);
82 counter.addTimeMark();
87 protected Scheduler scheduler() {
88 final long averageStatisticsGatheringTime = counter.getAverageTimeBetweenMarks();
89 long currentTimerDelay = pollingInterval;
91 if (averageStatisticsGatheringTime > currentTimerDelay) {
92 currentTimerDelay = averageStatisticsGatheringTime;
94 if (currentTimerDelay > maximumTimerDelay) {
95 currentTimerDelay = maximumTimerDelay;
99 return Scheduler.newFixedDelaySchedule(currentTimerDelay, currentTimerDelay, TimeUnit.MILLISECONDS);
102 private final class StatisticsPollingServiceListener extends Service.Listener {
104 public void terminated(final Service.State from) {
105 super.terminated(from);
110 public void failed(final Service.State from, final Throwable failure) {
111 super.failed(from, failure);
112 if (!(failure instanceof CancellationException) && !(failure instanceof ConnectionException)) {
113 future.setException(failure);