Bug 5692 - Direct statistics RPC
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / statistics / StatisticsManagerImpl.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.impl.statistics;
10
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Optional;
13 import com.google.common.base.Preconditions;
14 import com.google.common.base.Verify;
15 import com.google.common.collect.Iterators;
16 import com.google.common.util.concurrent.FutureCallback;
17 import com.google.common.util.concurrent.Futures;
18 import com.google.common.util.concurrent.ListenableFuture;
19 import io.netty.util.Timeout;
20 import io.netty.util.TimerTask;
21 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
22 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
23 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
24 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
25 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler;
26 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleConductor;
27 import org.opendaylight.openflowplugin.api.openflow.rpc.ItemLifeCycleSource;
28 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
29 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.sm.control.rev150812.ChangeStatisticsWorkModeInput;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.sm.control.rev150812.GetStatisticsWorkModeOutput;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.sm.control.rev150812.GetStatisticsWorkModeOutputBuilder;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.sm.control.rev150812.StatisticsManagerControlService;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.sm.control.rev150812.StatisticsWorkMode;
36 import org.opendaylight.yangtools.yang.common.RpcError;
37 import org.opendaylight.yangtools.yang.common.RpcResult;
38 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42 import javax.annotation.CheckForNull;
43 import javax.annotation.Nonnull;
44 import java.util.Iterator;
45 import java.util.concurrent.CancellationException;
46 import java.util.concurrent.ConcurrentHashMap;
47 import java.util.concurrent.ConcurrentMap;
48 import java.util.concurrent.Future;
49 import java.util.concurrent.Semaphore;
50 import java.util.concurrent.TimeUnit;
51
52 public class StatisticsManagerImpl implements StatisticsManager, StatisticsManagerControlService {
53
54     private static final Logger LOG = LoggerFactory.getLogger(StatisticsManagerImpl.class);
55
56     private static final long DEFAULT_STATS_TIMEOUT_SEC = 50L;
57
58     private DeviceInitializationPhaseHandler deviceInitPhaseHandler;
59     private DeviceTerminationPhaseHandler deviceTerminPhaseHandler;
60
61     private final ConcurrentMap<NodeId, StatisticsContext> contexts = new ConcurrentHashMap<>();
62
63     private static final long basicTimerDelay = 3000;
64     private static long currentTimerDelay = basicTimerDelay;
65     private static final long maximumTimerDelay = 900000; //wait max 15 minutes for next statistics
66
67     private StatisticsWorkMode workMode = StatisticsWorkMode.COLLECTALL;
68     private final Semaphore workModeGuard = new Semaphore(1, true);
69     private boolean shuttingDownStatisticsPolling;
70     private BindingAwareBroker.RpcRegistration<StatisticsManagerControlService> controlServiceRegistration;
71
72     private final LifecycleConductor conductor;
73
74     @Override
75     public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) {
76         deviceInitPhaseHandler = handler;
77     }
78
79     public StatisticsManagerImpl(@CheckForNull final RpcProviderRegistry rpcProviderRegistry,
80                                  final boolean shuttingDownStatisticsPolling,
81                                  final LifecycleConductor lifecycleConductor) {
82         Preconditions.checkArgument(rpcProviderRegistry != null);
83         this.controlServiceRegistration = Preconditions.checkNotNull(rpcProviderRegistry.addRpcImplementation(
84                 StatisticsManagerControlService.class, this));
85         this.shuttingDownStatisticsPolling = shuttingDownStatisticsPolling;
86         this.conductor = lifecycleConductor;
87     }
88
89     @Override
90     public void onDeviceContextLevelUp(final NodeId nodeId) throws Exception {
91
92         final DeviceContext deviceContext = Preconditions.checkNotNull(conductor.getDeviceContext(nodeId));
93
94         final StatisticsContext statisticsContext = new StatisticsContextImpl(nodeId, shuttingDownStatisticsPolling, conductor);
95         Verify.verify(contexts.putIfAbsent(nodeId, statisticsContext) == null, "StatisticsCtx still not closed for Node {}", nodeId);
96
97         deviceContext.getDeviceState().setDeviceSynchronized(true);
98         deviceInitPhaseHandler.onDeviceContextLevelUp(nodeId);
99     }
100
101     @VisibleForTesting
102     void pollStatistics(final DeviceContext deviceContext,
103                                 final StatisticsContext statisticsContext,
104                                 final TimeCounter timeCounter) {
105
106         final NodeId nodeId = deviceContext.getDeviceState().getNodeId();
107
108         if (!statisticsContext.isSchedulingEnabled()) {
109             LOG.debug("Disabling statistics scheduling for device: {}", nodeId);
110             return;
111         }
112         
113         if (!deviceContext.getDeviceState().isValid()) {
114             LOG.debug("Session is not valid for device: {}", nodeId);
115             return;
116         }
117
118         if (!deviceContext.getDeviceState().isStatisticsPollingEnabled()) {
119             LOG.debug("Statistics polling is currently disabled for device: {}", nodeId);
120             scheduleNextPolling(deviceContext, statisticsContext, timeCounter);
121             return;
122         }
123
124         LOG.debug("POLLING ALL STATISTICS for device: {}", nodeId);
125         timeCounter.markStart();
126         final ListenableFuture<Boolean> deviceStatisticsCollectionFuture = statisticsContext.gatherDynamicData();
127         Futures.addCallback(deviceStatisticsCollectionFuture, new FutureCallback<Boolean>() {
128             @Override
129             public void onSuccess(final Boolean o) {
130                 timeCounter.addTimeMark();
131                 calculateTimerDelay(timeCounter);
132                 scheduleNextPolling(deviceContext, statisticsContext, timeCounter);
133             }
134
135             @Override
136             public void onFailure(@Nonnull final Throwable throwable) {
137                 timeCounter.addTimeMark();
138                 LOG.warn("Statistics gathering for single node was not successful: {}", throwable.getMessage());
139                 LOG.trace("Statistics gathering for single node was not successful.. ", throwable);
140                 calculateTimerDelay(timeCounter);
141                 if (throwable instanceof CancellationException) {
142                     /** This often happens when something wrong with akka or DS, so closing connection will help to restart device **/
143                     conductor.closeConnection(deviceContext.getDeviceState().getNodeId());
144                 } else {
145                     scheduleNextPolling(deviceContext, statisticsContext, timeCounter);
146                 }
147             }
148         });
149
150         final long averageTime = TimeUnit.MILLISECONDS.toSeconds(timeCounter.getAverageTimeBetweenMarks());
151         final long STATS_TIMEOUT_SEC = averageTime > 0 ? 3 * averageTime : DEFAULT_STATS_TIMEOUT_SEC;
152         final TimerTask timerTask = new TimerTask() {
153
154             @Override
155             public void run(final Timeout timeout) throws Exception {
156                 if (!deviceStatisticsCollectionFuture.isDone()) {
157                     LOG.info("Statistics collection for node {} still in progress even after {} secs", nodeId, STATS_TIMEOUT_SEC);
158                     deviceStatisticsCollectionFuture.cancel(true);
159                 }
160             }
161         };
162
163         conductor.newTimeout(timerTask, STATS_TIMEOUT_SEC, TimeUnit.SECONDS);
164     }
165
166     private void scheduleNextPolling(final DeviceContext deviceContext,
167                                      final StatisticsContext statisticsContext,
168                                      final TimeCounter timeCounter) {
169         LOG.debug("SCHEDULING NEXT STATISTICS POLLING for device: {}", deviceContext.getDeviceState().getNodeId());
170         if (!shuttingDownStatisticsPolling) {
171             final Timeout pollTimeout = conductor.newTimeout(new TimerTask() {
172                 @Override
173                 public void run(final Timeout timeout) throws Exception {
174                     pollStatistics(deviceContext, statisticsContext, timeCounter);
175                 }
176             }, currentTimerDelay, TimeUnit.MILLISECONDS);
177             statisticsContext.setPollTimeout(pollTimeout);
178         }
179     }
180
181     @VisibleForTesting
182     void calculateTimerDelay(final TimeCounter timeCounter) {
183         final long averageStatisticsGatheringTime = timeCounter.getAverageTimeBetweenMarks();
184         if (averageStatisticsGatheringTime > currentTimerDelay) {
185             currentTimerDelay *= 2;
186             if (currentTimerDelay > maximumTimerDelay) {
187                 currentTimerDelay = maximumTimerDelay;
188             }
189         } else {
190             if (currentTimerDelay > basicTimerDelay) {
191                 currentTimerDelay /= 2;
192             } else {
193                 currentTimerDelay = basicTimerDelay;
194             }
195         }
196     }
197
198     @VisibleForTesting
199     static long getCurrentTimerDelay() {
200         return currentTimerDelay;
201     }
202
203     @Override
204     public void onDeviceContextLevelDown(final DeviceContext deviceContext) {
205         final StatisticsContext statisticsContext = contexts.remove(deviceContext.getDeviceState().getNodeId());
206         if (null != statisticsContext) {
207             LOG.trace("Removing device context from stack. No more statistics gathering for device: {}", deviceContext.getDeviceState().getNodeId());
208             statisticsContext.close();
209         }
210         deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceContext);
211     }
212
213     @Override
214     public Future<RpcResult<GetStatisticsWorkModeOutput>> getStatisticsWorkMode() {
215         final GetStatisticsWorkModeOutputBuilder smModeOutputBld = new GetStatisticsWorkModeOutputBuilder();
216         smModeOutputBld.setMode(workMode);
217         return RpcResultBuilder.success(smModeOutputBld.build()).buildFuture();
218     }
219
220     @Override
221     public Future<RpcResult<Void>> changeStatisticsWorkMode(ChangeStatisticsWorkModeInput input) {
222         final Future<RpcResult<Void>> result;
223         // acquire exclusive access
224         if (workModeGuard.tryAcquire()) {
225             final StatisticsWorkMode targetWorkMode = input.getMode();
226             if (!workMode.equals(targetWorkMode)) {
227                 shuttingDownStatisticsPolling = StatisticsWorkMode.FULLYDISABLED.equals(targetWorkMode);
228                 // iterate through stats-ctx: propagate mode
229                 for (final StatisticsContext statisticsContext : contexts.values()) {
230                     final DeviceContext deviceContext = statisticsContext.getDeviceContext();
231                     switch (targetWorkMode) {
232                         case COLLECTALL:
233                             scheduleNextPolling(deviceContext, statisticsContext, new TimeCounter());
234                             for (final ItemLifeCycleSource lifeCycleSource : deviceContext.getItemLifeCycleSourceRegistry().getLifeCycleSources()) {
235                                 lifeCycleSource.setItemLifecycleListener(null);
236                             }
237                             break;
238                         case FULLYDISABLED:
239                             final Optional<Timeout> pollTimeout = statisticsContext.getPollTimeout();
240                             if (pollTimeout.isPresent()) {
241                                 pollTimeout.get().cancel();
242                             }
243                             for (final ItemLifeCycleSource lifeCycleSource : deviceContext.getItemLifeCycleSourceRegistry().getLifeCycleSources()) {
244                                 lifeCycleSource.setItemLifecycleListener(statisticsContext.getItemLifeCycleListener());
245                             }
246                             break;
247                         default:
248                             LOG.warn("Statistics work mode not supported: {}", targetWorkMode);
249                     }
250                 }
251                 workMode = targetWorkMode;
252             }
253             workModeGuard.release();
254             result = RpcResultBuilder.<Void>success().buildFuture();
255         } else {
256             result = RpcResultBuilder.<Void>failed()
257                     .withError(RpcError.ErrorType.APPLICATION, "mode change already in progress")
258                     .buildFuture();
259         }
260         return result;
261     }
262
263     @Override
264     public void startScheduling(final NodeId nodeId) {
265         if (shuttingDownStatisticsPolling) {
266             LOG.info("Statistics are shut down for device: {}", nodeId);
267             return;
268         }
269
270         final StatisticsContext statisticsContext = contexts.get(nodeId);
271
272         if (statisticsContext == null) {
273             LOG.warn("Statistics context not found for device: {}", nodeId);
274             return;
275         }
276
277         if (statisticsContext.isSchedulingEnabled()) {
278             LOG.debug("Statistics scheduling is already enabled for device: {}", nodeId);
279             return;
280         }
281
282         LOG.info("Scheduling statistics poll for device: {}", nodeId);
283         final DeviceContext deviceContext = conductor.getDeviceContext(nodeId);
284
285         if (deviceContext == null) {
286             LOG.warn("Device context not found for device: {}", nodeId);
287             return;
288         }
289
290         statisticsContext.setSchedulingEnabled(true);
291         scheduleNextPolling(deviceContext, statisticsContext, new TimeCounter());
292     }
293
294     @Override
295     public void stopScheduling(final NodeId nodeId) {
296         LOG.debug("Stopping statistics scheduling for device: {}", nodeId);
297         final StatisticsContext statisticsContext = contexts.get(nodeId);
298
299         if (statisticsContext == null) {
300             LOG.warn("Statistics context not found for device: {}", nodeId);
301             return;
302         }
303
304         statisticsContext.setSchedulingEnabled(false);
305     }
306
307     @Override
308     public void close() {
309         if (controlServiceRegistration != null) {
310             controlServiceRegistration.close();
311             controlServiceRegistration = null;
312         }
313         for (final Iterator<StatisticsContext> iterator = Iterators.consumingIterator(contexts.values().iterator());
314                 iterator.hasNext();) {
315             iterator.next().close();
316         }
317     }
318
319     @Override
320     public void setDeviceTerminationPhaseHandler(final DeviceTerminationPhaseHandler handler) {
321         this.deviceTerminPhaseHandler = handler;
322     }
323 }