OPNFLWPLUG-1083: Stats frozen after applying 2 sec delay in OF channel
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / statistics / StatisticsPollingService.java
1 /*
2  * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.statistics;
9
10 import com.google.common.util.concurrent.AbstractScheduledService;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import com.google.common.util.concurrent.Service;
16 import com.google.common.util.concurrent.SettableFuture;
17 import java.util.concurrent.CancellationException;
18 import java.util.concurrent.CompletableFuture;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.TimeUnit;
21 import java.util.concurrent.TimeoutException;
22 import java.util.function.Supplier;
23 import org.eclipse.jdt.annotation.NonNull;
24 import org.opendaylight.openflowplugin.api.ConnectionException;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27
28 public class StatisticsPollingService extends AbstractScheduledService {
29     private static final Logger LOG = LoggerFactory.getLogger(StatisticsPollingService.class);
30     private static final long DEFAULT_STATS_TIMEOUT = 50000;
31
32     private final TimeCounter counter;
33     private final long pollingInterval;
34     private final long maximumTimerDelay;
35     private final Supplier<ListenableFuture<Boolean>> gatheringSupplier;
36     private final SettableFuture<Void> future = SettableFuture.create();
37
38     StatisticsPollingService(@NonNull final TimeCounter counter,
39                              final long pollingInterval,
40                              final long maximumTimerDelay,
41                              @NonNull final Supplier<ListenableFuture<Boolean>> gatheringSupplier) {
42         this.counter = counter;
43         this.pollingInterval = pollingInterval;
44         this.maximumTimerDelay = maximumTimerDelay;
45         this.gatheringSupplier = gatheringSupplier;
46         this.addListener(new StatisticsPollingServiceListener(), MoreExecutors.directExecutor());
47     }
48
49     ListenableFuture<Void> stop() {
50         stopAsync();
51         return future;
52     }
53
54     @Override
55     protected void startUp() {
56         counter.markStart();
57     }
58
59     @Override
60     protected void runOneIteration() throws Exception {
61         final long averageTime = counter.getAverageTimeBetweenMarks();
62         final long statsTimeout = averageTime > 0 ? 3 * averageTime : DEFAULT_STATS_TIMEOUT;
63         final CompletableFuture<Boolean> waitFuture = new CompletableFuture<>();
64
65         Futures.addCallback(gatheringSupplier.get(), new FutureCallback<Boolean>() {
66             @Override
67             public void onSuccess(final Boolean result) {
68                 waitFuture.complete(result);
69             }
70
71             @Override
72             public void onFailure(final Throwable throwable) {
73                 waitFuture.completeExceptionally(throwable);
74             }
75         }, MoreExecutors.directExecutor());
76
77         try {
78             waitFuture.get(statsTimeout, TimeUnit.MILLISECONDS);
79         } catch (InterruptedException | ExecutionException | TimeoutException e) {
80             LOG.error("Exception occured while waiting for the stats collection.", e);
81         } finally {
82             counter.addTimeMark();
83         }
84     }
85
86     @Override
87     protected Scheduler scheduler() {
88         final long averageStatisticsGatheringTime = counter.getAverageTimeBetweenMarks();
89         long currentTimerDelay = pollingInterval;
90
91         if (averageStatisticsGatheringTime > currentTimerDelay) {
92             currentTimerDelay = averageStatisticsGatheringTime;
93
94             if (currentTimerDelay > maximumTimerDelay) {
95                 currentTimerDelay = maximumTimerDelay;
96             }
97         }
98
99         return Scheduler.newFixedDelaySchedule(currentTimerDelay, currentTimerDelay, TimeUnit.MILLISECONDS);
100     }
101
102     private final class StatisticsPollingServiceListener extends Service.Listener {
103         @Override
104         public void terminated(final Service.State from) {
105             super.terminated(from);
106             future.set(null);
107         }
108
109         @Override
110         public void failed(final Service.State from, final Throwable failure) {
111             super.failed(from, failure);
112             if (!(failure instanceof CancellationException) && !(failure instanceof ConnectionException)) {
113                 future.setException(failure);
114             } else {
115                 future.set(null);
116             }
117         }
118     }
119 }