35a0d73778b3b4ab96f40a552cfc840500cbb7ec
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / statistics / StatisticsManagerImpl.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.impl.statistics;
10
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.base.Verify;
14 import com.google.common.collect.Iterators;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import io.netty.util.HashedWheelTimer;
19 import io.netty.util.Timeout;
20 import io.netty.util.TimerTask;
21 import java.util.Iterator;
22 import java.util.Map;
23 import java.util.Optional;
24 import java.util.concurrent.ConcurrentHashMap;
25 import java.util.concurrent.ConcurrentMap;
26 import java.util.concurrent.Future;
27 import java.util.concurrent.Semaphore;
28 import java.util.concurrent.TimeUnit;
29 import javax.annotation.Nonnull;
30 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
31 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
32 import org.opendaylight.openflowplugin.api.openflow.OFPContext;
33 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
34 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
35 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
36 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
37 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler;
38 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
39 import org.opendaylight.openflowplugin.api.openflow.rpc.ItemLifeCycleSource;
40 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
41 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
42 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.sm.control.rev150812.ChangeStatisticsWorkModeInput;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.sm.control.rev150812.GetStatisticsWorkModeOutput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.sm.control.rev150812.GetStatisticsWorkModeOutputBuilder;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.sm.control.rev150812.StatisticsManagerControlService;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.sm.control.rev150812.StatisticsWorkMode;
48 import org.opendaylight.yangtools.yang.common.RpcError;
49 import org.opendaylight.yangtools.yang.common.RpcResult;
50 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53
54 public class StatisticsManagerImpl implements StatisticsManager, StatisticsManagerControlService {
55
56     private static final Logger LOG = LoggerFactory.getLogger(StatisticsManagerImpl.class);
57
58     private static final long DEFAULT_STATS_TIMEOUT_SEC = 50L;
59     private final ConvertorExecutor converterExecutor;
60
61     private DeviceInitializationPhaseHandler deviceInitPhaseHandler;
62     private DeviceTerminationPhaseHandler deviceTerminationPhaseHandler;
63
64     private final ConcurrentMap<DeviceInfo, StatisticsContext> contexts = new ConcurrentHashMap<>();
65
66     private static long basicTimerDelay;
67     private static long currentTimerDelay;
68     private static long maximumTimerDelay; //wait time for next statistics
69
70     private StatisticsWorkMode workMode = StatisticsWorkMode.COLLECTALL;
71     private final Semaphore workModeGuard = new Semaphore(1, true);
72     private boolean isStatisticsPollingOn;
73     private BindingAwareBroker.RpcRegistration<StatisticsManagerControlService> controlServiceRegistration;
74
75     private final HashedWheelTimer hashedWheelTimer;
76
77     @Override
78     public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) {
79         deviceInitPhaseHandler = handler;
80     }
81
82     public StatisticsManagerImpl(final RpcProviderRegistry rpcProviderRegistry,
83                                  final boolean isStatisticsPollingOn,
84                                  final HashedWheelTimer hashedWheelTimer,
85                                  final ConvertorExecutor convertorExecutor,
86                                  final long basicTimerDelay,
87                                  final long maximumTimerDelay) {
88         Preconditions.checkArgument(rpcProviderRegistry != null);
89             this.converterExecutor = convertorExecutor;
90         this.controlServiceRegistration = Preconditions.checkNotNull(
91                 rpcProviderRegistry.addRpcImplementation(StatisticsManagerControlService.class, this)
92         );
93         this.isStatisticsPollingOn = isStatisticsPollingOn;
94         this.basicTimerDelay = basicTimerDelay;
95         this.currentTimerDelay = basicTimerDelay;
96         this.maximumTimerDelay = maximumTimerDelay;
97         this.hashedWheelTimer = hashedWheelTimer;
98     }
99
100     @Override
101     public void onDeviceContextLevelUp(final DeviceInfo deviceInfo,
102                                        final LifecycleService lifecycleService) throws Exception {
103
104         final StatisticsContext statisticsContext =
105                 new StatisticsContextImpl(
106                         deviceInfo,
107                         isStatisticsPollingOn,
108                         lifecycleService,
109                         converterExecutor,
110                         this);
111
112         Verify.verify(
113                 contexts.putIfAbsent(deviceInfo, statisticsContext) == null,
114                 "StatisticsCtx still not closed for Node {}", deviceInfo.getLOGValue()
115         );
116
117         lifecycleService.setStatContext(statisticsContext);
118         lifecycleService.registerDeviceRemovedHandler(this);
119         deviceInitPhaseHandler.onDeviceContextLevelUp(deviceInfo, lifecycleService);
120     }
121
122     @VisibleForTesting
123     void pollStatistics(final DeviceState deviceState,
124                         final StatisticsContext statisticsContext,
125                         final TimeCounter timeCounter,
126                         final DeviceInfo deviceInfo) {
127
128         if (!statisticsContext.isSchedulingEnabled()) {
129             if (LOG.isDebugEnabled()) {
130                 LOG.debug("Disabled statistics scheduling for device: {}", deviceInfo.getNodeId().getValue());
131             }
132             return;
133         }
134
135         if (LOG.isDebugEnabled()) {
136             LOG.debug("POLLING ALL STATISTICS for device: {}", deviceInfo.getNodeId());
137         }
138
139         timeCounter.markStart();
140         final ListenableFuture<Boolean> deviceStatisticsCollectionFuture = statisticsContext.gatherDynamicData();
141         Futures.addCallback(deviceStatisticsCollectionFuture, new FutureCallback<Boolean>() {
142             @Override
143             public void onSuccess(final Boolean o) {
144                 timeCounter.addTimeMark();
145                 calculateTimerDelay(timeCounter);
146                 scheduleNextPolling(deviceState, deviceInfo, statisticsContext, timeCounter);
147             }
148
149             @Override
150             public void onFailure(@Nonnull final Throwable throwable) {
151                 timeCounter.addTimeMark();
152                 LOG.warn("Statistics gathering for single node {} was not successful: {}", deviceInfo.getLOGValue(),
153                         throwable.getMessage());
154                 if (LOG.isTraceEnabled()) {
155                     LOG.trace("Gathering for node {} failure: ", deviceInfo.getLOGValue(), throwable);
156                 }
157                 calculateTimerDelay(timeCounter);
158                 if (throwable instanceof IllegalStateException) {
159                     stopScheduling(deviceInfo);
160                 } else {
161                     scheduleNextPolling(deviceState, deviceInfo, statisticsContext, timeCounter);
162                 }
163             }
164         });
165
166         final long averageTime = TimeUnit.MILLISECONDS.toSeconds(timeCounter.getAverageTimeBetweenMarks());
167         final long statsTimeoutSec = averageTime > 0 ? 3 * averageTime : DEFAULT_STATS_TIMEOUT_SEC;
168         final TimerTask timerTask = timeout -> {
169             if (!deviceStatisticsCollectionFuture.isDone()) {
170                 LOG.info("Statistics collection for node {} still in progress even after {} secs", deviceInfo.getLOGValue(), statsTimeoutSec);
171                 deviceStatisticsCollectionFuture.cancel(true);
172             }
173         };
174
175         hashedWheelTimer.newTimeout(timerTask, statsTimeoutSec, TimeUnit.SECONDS);
176     }
177
178     private void scheduleNextPolling(final DeviceState deviceState,
179                                      final DeviceInfo deviceInfo,
180                                      final StatisticsContext statisticsContext,
181                                      final TimeCounter timeCounter) {
182         if (LOG.isDebugEnabled()) {
183             LOG.debug("SCHEDULING NEXT STATISTICS POLLING for device: {}", deviceInfo.getNodeId());
184         }
185         if (isStatisticsPollingOn) {
186             final Timeout pollTimeout = hashedWheelTimer.newTimeout(
187                     timeout -> pollStatistics(
188                             deviceState,
189                             statisticsContext,
190                             timeCounter,
191                             deviceInfo),
192                     currentTimerDelay,
193                     TimeUnit.MILLISECONDS);
194             statisticsContext.setPollTimeout(pollTimeout);
195         }
196     }
197
198     @VisibleForTesting
199     void calculateTimerDelay(final TimeCounter timeCounter) {
200         final long averageStatisticsGatheringTime = timeCounter.getAverageTimeBetweenMarks();
201         if (averageStatisticsGatheringTime > currentTimerDelay) {
202             currentTimerDelay *= 2;
203             if (currentTimerDelay > maximumTimerDelay) {
204                 currentTimerDelay = maximumTimerDelay;
205             }
206         } else {
207             if (currentTimerDelay > basicTimerDelay) {
208                 currentTimerDelay /= 2;
209             } else {
210                 currentTimerDelay = basicTimerDelay;
211             }
212         }
213     }
214
215     @VisibleForTesting
216     static long getCurrentTimerDelay() {
217         return currentTimerDelay;
218     }
219
220     @Override
221     public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) {
222         Optional.ofNullable(contexts.get(deviceInfo)).ifPresent(OFPContext::close);
223         deviceTerminationPhaseHandler.onDeviceContextLevelDown(deviceInfo);
224     }
225
226     @Override
227     public Future<RpcResult<GetStatisticsWorkModeOutput>> getStatisticsWorkMode() {
228         final GetStatisticsWorkModeOutputBuilder smModeOutputBld = new GetStatisticsWorkModeOutputBuilder();
229         smModeOutputBld.setMode(workMode);
230         return RpcResultBuilder.success(smModeOutputBld.build()).buildFuture();
231     }
232
233     @Override
234     public Future<RpcResult<Void>> changeStatisticsWorkMode(ChangeStatisticsWorkModeInput input) {
235         final Future<RpcResult<Void>> result;
236         // acquire exclusive access
237         if (workModeGuard.tryAcquire()) {
238             final StatisticsWorkMode targetWorkMode = input.getMode();
239             if (!workMode.equals(targetWorkMode)) {
240                 isStatisticsPollingOn = !(StatisticsWorkMode.FULLYDISABLED.equals(targetWorkMode));
241                 // iterate through stats-ctx: propagate mode
242                 for (Map.Entry<DeviceInfo, StatisticsContext> entry : contexts.entrySet()) {
243                     final DeviceInfo deviceInfo = entry.getKey();
244                     final StatisticsContext statisticsContext = entry.getValue();
245                     final DeviceContext deviceContext = statisticsContext.gainDeviceContext();
246                     switch (targetWorkMode) {
247                         case COLLECTALL:
248                             scheduleNextPolling(statisticsContext.gainDeviceState(), deviceInfo, statisticsContext, new TimeCounter());
249                             for (final ItemLifeCycleSource lifeCycleSource : deviceContext.getItemLifeCycleSourceRegistry().getLifeCycleSources()) {
250                                 lifeCycleSource.setItemLifecycleListener(null);
251                             }
252                             break;
253                         case FULLYDISABLED:
254                             final Optional<Timeout> pollTimeout = statisticsContext.getPollTimeout();
255                             if (pollTimeout.isPresent()) {
256                                 pollTimeout.get().cancel();
257                             }
258                             for (final ItemLifeCycleSource lifeCycleSource : deviceContext.getItemLifeCycleSourceRegistry().getLifeCycleSources()) {
259                                 lifeCycleSource.setItemLifecycleListener(statisticsContext.getItemLifeCycleListener());
260                             }
261                             break;
262                         default:
263                             LOG.warn("Statistics work mode not supported: {}", targetWorkMode);
264                     }
265                 }
266                 workMode = targetWorkMode;
267             }
268             workModeGuard.release();
269             result = RpcResultBuilder.<Void>success().buildFuture();
270         } else {
271             result = RpcResultBuilder.<Void>failed()
272                     .withError(RpcError.ErrorType.APPLICATION, "mode change already in progress")
273                     .buildFuture();
274         }
275         return result;
276     }
277
278     @Override
279     public void startScheduling(final DeviceInfo deviceInfo) {
280         if (!isStatisticsPollingOn) {
281             LOG.info("Statistics are shutdown for device: {}", deviceInfo.getNodeId());
282             return;
283         }
284
285         final StatisticsContext statisticsContext = contexts.get(deviceInfo);
286
287         if (statisticsContext == null) {
288             LOG.warn("Statistics context not found for device: {}", deviceInfo.getNodeId());
289             return;
290         }
291
292         if (statisticsContext.isSchedulingEnabled()) {
293             LOG.debug("Statistics scheduling is already enabled for device: {}", deviceInfo.getNodeId());
294             return;
295         }
296
297         LOG.info("Scheduling statistics poll for device: {}", deviceInfo.getNodeId());
298
299         statisticsContext.setSchedulingEnabled(true);
300         scheduleNextPolling(
301                 statisticsContext.gainDeviceState(),
302                 deviceInfo,
303                 statisticsContext,
304                 new TimeCounter()
305         );
306     }
307
308     @Override
309     public void stopScheduling(final DeviceInfo deviceInfo) {
310         if (LOG.isDebugEnabled()) {
311             LOG.debug("Stopping statistics scheduling for device: {}", deviceInfo.getNodeId());
312         }
313
314         final StatisticsContext statisticsContext = contexts.get(deviceInfo);
315
316         if (statisticsContext == null) {
317             LOG.warn("Statistics context not found for device: {}", deviceInfo.getNodeId());
318             return;
319         }
320
321         statisticsContext.setSchedulingEnabled(false);
322     }
323
324     @Override
325     public void close() {
326         if (controlServiceRegistration != null) {
327             controlServiceRegistration.close();
328             controlServiceRegistration = null;
329         }
330
331         for (final Iterator<StatisticsContext> iterator = Iterators.consumingIterator(contexts.values().iterator());
332                 iterator.hasNext();) {
333             iterator.next().close();
334         }
335     }
336
337     @Override
338     public void setDeviceTerminationPhaseHandler(final DeviceTerminationPhaseHandler handler) {
339         this.deviceTerminationPhaseHandler = handler;
340     }
341
342     @Override
343     public void setIsStatisticsPollingOn(boolean isStatisticsPollingOn){
344         this.isStatisticsPollingOn = isStatisticsPollingOn;
345     }
346
347     public void onDeviceRemoved(DeviceInfo deviceInfo) {
348         contexts.remove(deviceInfo);
349         LOG.debug("Statistics context removed for node {}", deviceInfo.getLOGValue());
350     }
351
352     @Override
353     public void setBasicTimerDelay(final long basicTimerDelay) {
354         this.basicTimerDelay = basicTimerDelay;
355     }
356
357     @Override
358     public void setMaximumTimerDelay(final long maximumTimerDelay) {
359         this.maximumTimerDelay = maximumTimerDelay;
360     }
361 }