Bug 7499 - ensure statistics scheduler does not die and keep trying while the control...
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / statistics / StatisticsContextImpl.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.impl.statistics;
10
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Function;
13 import com.google.common.base.Preconditions;
14 import com.google.common.collect.ImmutableList;
15 import com.google.common.collect.Iterators;
16 import com.google.common.util.concurrent.FutureCallback;
17 import com.google.common.util.concurrent.Futures;
18 import com.google.common.util.concurrent.ListenableFuture;
19 import com.google.common.util.concurrent.SettableFuture;
20 import io.netty.util.Timeout;
21 import java.util.ArrayList;
22 import java.util.Collection;
23 import java.util.HashSet;
24 import java.util.Iterator;
25 import java.util.List;
26 import java.util.Objects;
27 import java.util.Optional;
28 import javax.annotation.Nonnull;
29 import javax.annotation.Nullable;
30 import javax.annotation.concurrent.GuardedBy;
31 import org.opendaylight.mdsal.common.api.TransactionChainClosedException;
32 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
33 import org.opendaylight.openflowplugin.api.ConnectionException;
34 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
35 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
36 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
37 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
38 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
39 import org.opendaylight.openflowplugin.api.openflow.device.handlers.ClusterInitializationPhaseHandler;
40 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
41 import org.opendaylight.openflowplugin.api.openflow.rpc.listener.ItemLifecycleListener;
42 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
43 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
44 import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
45 import org.opendaylight.openflowplugin.impl.rpc.listener.ItemLifecycleListenerImpl;
46 import org.opendaylight.openflowplugin.impl.services.RequestContextUtil;
47 import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringOnTheFlyService;
48 import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringService;
49 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53
54 class StatisticsContextImpl implements StatisticsContext {
55
56     private static final Logger LOG = LoggerFactory.getLogger(StatisticsContextImpl.class);
57     private static final String CONNECTION_CLOSED = "Connection closed.";
58
59     private final ItemLifecycleListener itemLifeCycleListener;
60     private final Collection<RequestContext<?>> requestContexts = new HashSet<>();
61     private final DeviceContext deviceContext;
62     private final DeviceState devState;
63     private final ListenableFuture<Boolean> emptyFuture;
64     private final boolean isStatisticsPollingOn;
65     private final SinglePurposeMultipartReplyTranslator multipartReplyTranslator;
66     private final Object collectionStatTypeLock = new Object();
67     @GuardedBy("collectionStatTypeLock")
68     private List<MultipartType> collectingStatType;
69
70     private StatisticsGatheringService statisticsGatheringService;
71     private StatisticsGatheringOnTheFlyService statisticsGatheringOnTheFlyService;
72     private Timeout pollTimeout;
73     private final DeviceInfo deviceInfo;
74     private final StatisticsManager myManager;
75     private final LifecycleService lifecycleService;
76
77     private volatile boolean schedulingEnabled;
78     private volatile CONTEXT_STATE state;
79     private ClusterInitializationPhaseHandler clusterInitializationPhaseHandler;
80     private ClusterInitializationPhaseHandler initialSubmitHandler;
81
82     private ListenableFuture<Boolean> lastDataGathering;
83
84     StatisticsContextImpl(@Nonnull final DeviceInfo deviceInfo,
85                           final boolean isStatisticsPollingOn,
86                           @Nonnull final LifecycleService lifecycleService,
87                           @Nonnull final ConvertorExecutor convertorExecutor,
88                           @Nonnull final StatisticsManager myManager) {
89         this.lifecycleService = lifecycleService;
90         this.deviceContext = lifecycleService.getDeviceContext();
91         this.devState = Preconditions.checkNotNull(deviceContext.getDeviceState());
92         this.isStatisticsPollingOn = isStatisticsPollingOn;
93         multipartReplyTranslator = new SinglePurposeMultipartReplyTranslator(convertorExecutor);
94         emptyFuture = Futures.immediateFuture(false);
95         statisticsGatheringService = new StatisticsGatheringService(this, deviceContext);
96         statisticsGatheringOnTheFlyService = new StatisticsGatheringOnTheFlyService(this, deviceContext, convertorExecutor);
97         itemLifeCycleListener = new ItemLifecycleListenerImpl(deviceContext);
98         statListForCollectingInitialization();
99         this.state = CONTEXT_STATE.INITIALIZATION;
100         this.deviceInfo = deviceInfo;
101         this.myManager = myManager;
102         this.lastDataGathering = null;
103     }
104
105     @Override
106     public void statListForCollectingInitialization() {
107         synchronized (collectionStatTypeLock) {
108             final List<MultipartType> statListForCollecting = new ArrayList<>();
109             if (devState.isTableStatisticsAvailable()) {
110                 statListForCollecting.add(MultipartType.OFPMPTABLE);
111             }
112             if (devState.isFlowStatisticsAvailable()) {
113                 statListForCollecting.add(MultipartType.OFPMPFLOW);
114             }
115             if (devState.isGroupAvailable()) {
116                 statListForCollecting.add(MultipartType.OFPMPGROUPDESC);
117                 statListForCollecting.add(MultipartType.OFPMPGROUP);
118             }
119             if (devState.isMetersAvailable()) {
120                 statListForCollecting.add(MultipartType.OFPMPMETERCONFIG);
121                 statListForCollecting.add(MultipartType.OFPMPMETER);
122             }
123             if (devState.isPortStatisticsAvailable()) {
124                 statListForCollecting.add(MultipartType.OFPMPPORTSTATS);
125             }
126             if (devState.isQueueStatisticsAvailable()) {
127                 statListForCollecting.add(MultipartType.OFPMPQUEUE);
128             }
129             collectingStatType = ImmutableList.<MultipartType>copyOf(statListForCollecting);
130         }
131     }
132
133
134     @Override
135     public ListenableFuture<Boolean> initialGatherDynamicData() {
136         return gatherDynamicData(true);
137     }
138
139     @Override
140     public ListenableFuture<Boolean> gatherDynamicData(){
141         return gatherDynamicData(false);
142     }
143
144     private ListenableFuture<Boolean> gatherDynamicData(final boolean initial) {
145         this.lastDataGathering = null;
146         if (!isStatisticsPollingOn) {
147             LOG.debug("Statistics for device {} is not enabled.", getDeviceInfo().getNodeId().getValue());
148             return Futures.immediateFuture(Boolean.TRUE);
149         }
150         final ListenableFuture<Boolean> errorResultFuture = deviceConnectionCheck();
151         if (errorResultFuture != null) {
152             return errorResultFuture;
153         }
154         synchronized (collectionStatTypeLock) {
155             final Iterator<MultipartType> statIterator = collectingStatType.iterator();
156             final SettableFuture<Boolean> settableStatResultFuture = SettableFuture.create();
157
158             // write start timestamp to state snapshot container
159             StatisticsGatheringUtils.markDeviceStateSnapshotStart(deviceContext);
160
161             statChainFuture(statIterator, settableStatResultFuture, initial);
162
163             // write end timestamp to state snapshot container
164             Futures.addCallback(settableStatResultFuture, new FutureCallback<Boolean>() {
165                 @Override
166                 public void onSuccess(@Nullable final Boolean result) {
167                     StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceContext, true);
168                 }
169                 @Override
170                 public void onFailure(final Throwable t) {
171                     if (!(t instanceof TransactionChainClosedException)) {
172                         StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceContext, false);
173                     }
174                 }
175             });
176             this.lastDataGathering = settableStatResultFuture;
177             return settableStatResultFuture;
178         }
179     }
180
181     private ListenableFuture<Boolean> chooseStat(final MultipartType multipartType, final boolean initial){
182         ListenableFuture<Boolean> result = Futures.immediateCheckedFuture(Boolean.TRUE);
183
184         switch (multipartType) {
185             case OFPMPFLOW:
186                 result = collectFlowStatistics(multipartType, initial);
187                 break;
188             case OFPMPTABLE:
189                 result = collectTableStatistics(multipartType);
190                 break;
191             case OFPMPPORTSTATS:
192                 result = collectPortStatistics(multipartType);
193                 break;
194             case OFPMPQUEUE:
195                 result = collectQueueStatistics(multipartType);
196                 break;
197             case OFPMPGROUPDESC:
198                 result = collectGroupDescStatistics(multipartType);
199                 break;
200             case OFPMPGROUP:
201                 result = collectGroupStatistics(multipartType);
202                 break;
203             case OFPMPMETERCONFIG:
204                 result = collectMeterConfigStatistics(multipartType);
205                 break;
206             case OFPMPMETER:
207                 result = collectMeterStatistics(multipartType);
208                 break;
209             default:
210                 LOG.warn("Unsupported Statistics type {}", multipartType);
211         }
212
213         return result;
214     }
215
216
217     @Override
218     public <T> RequestContext<T> createRequestContext() {
219         final AbstractRequestContext<T> ret = new AbstractRequestContext<T>(deviceInfo.reserveXidForDeviceMessage()) {
220             @Override
221             public void close() {
222                 requestContexts.remove(this);
223             }
224         };
225         requestContexts.add(ret);
226         return ret;
227     }
228
229     @Override
230     public void close() {
231         if (CONTEXT_STATE.TERMINATION.equals(getState())) {
232             if (LOG.isDebugEnabled()) {
233                 LOG.debug("StatisticsContext for node {} is already in TERMINATION state.", getDeviceInfo().getLOGValue());
234             }
235         } else {
236             try {
237                 stopClusterServices(true).get();
238             } catch (Exception e) {
239                 LOG.debug("Failed to close StatisticsContext for node {} with exception: ", getDeviceInfo().getLOGValue(), e);
240             }
241
242             this.state = CONTEXT_STATE.TERMINATION;
243
244             for (final Iterator<RequestContext<?>> iterator = Iterators.consumingIterator(requestContexts.iterator());
245                  iterator.hasNext(); ) {
246                 RequestContextUtil.closeRequestContextWithRpcError(iterator.next(), CONNECTION_CLOSED);
247             }
248
249             if (null != pollTimeout && !pollTimeout.isExpired()) {
250                 pollTimeout.cancel();
251             }
252         }
253     }
254
255     @Override
256     public void setSchedulingEnabled(final boolean schedulingEnabled) {
257         this.schedulingEnabled = schedulingEnabled;
258     }
259
260     @Override
261     public boolean isSchedulingEnabled() {
262         return schedulingEnabled;
263     }
264
265     @Override
266     public void setPollTimeout(final Timeout pollTimeout) {
267         this.pollTimeout = pollTimeout;
268     }
269
270     @Override
271     public Optional<Timeout> getPollTimeout() {
272         return Optional.ofNullable(pollTimeout);
273     }
274
275     private void statChainFuture(final Iterator<MultipartType> iterator, final SettableFuture<Boolean> resultFuture, final boolean initial) {
276         if (ConnectionContext.CONNECTION_STATE.RIP.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) {
277             final String errMsg = String.format("Device connection is closed for Node : %s.",
278                     getDeviceInfo().getNodeId());
279             LOG.debug(errMsg);
280             resultFuture.setException(new ConnectionException(errMsg));
281             return;
282         }
283         if ( ! iterator.hasNext()) {
284             resultFuture.set(Boolean.TRUE);
285             LOG.debug("Stats collection successfully finished for node {}", getDeviceInfo().getLOGValue());
286             return;
287         }
288
289         final MultipartType nextType = iterator.next();
290         LOG.debug("Stats iterating to next type for node {} of type {}", getDeviceInfo().getLOGValue(), nextType);
291
292         final ListenableFuture<Boolean> deviceStatisticsCollectionFuture = chooseStat(nextType, initial);
293         Futures.addCallback(deviceStatisticsCollectionFuture, new FutureCallback<Boolean>() {
294             @Override
295             public void onSuccess(final Boolean result) {
296                 statChainFuture(iterator, resultFuture, initial);
297             }
298             @Override
299             public void onFailure(@Nonnull final Throwable t) {
300                 resultFuture.setException(t);
301             }
302         });
303     }
304
305     /**
306      * Method checks a device state. It returns null for be able continue. Otherwise it returns immediateFuture
307      * which has to be returned from caller too
308      *
309      * @return
310      */
311     @VisibleForTesting
312     ListenableFuture<Boolean> deviceConnectionCheck() {
313         if (!ConnectionContext.CONNECTION_STATE.WORKING.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) {
314             ListenableFuture<Boolean> resultingFuture;
315             switch (deviceContext.getPrimaryConnectionContext().getConnectionState()) {
316                 case RIP:
317                     final String errMsg = String.format("Device connection doesn't exist anymore. Primary connection status : %s",
318                             deviceContext.getPrimaryConnectionContext().getConnectionState());
319                     resultingFuture = Futures.immediateFailedFuture(new Throwable(errMsg));
320                     break;
321                 default:
322                     resultingFuture = Futures.immediateCheckedFuture(Boolean.TRUE);
323                     break;
324             }
325             return resultingFuture;
326         }
327         return null;
328     }
329
330     //TODO: Refactor twice sending deviceContext into gatheringStatistics
331     private ListenableFuture<Boolean> collectFlowStatistics(final MultipartType multipartType, final boolean initial) {
332         return devState.isFlowStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
333                 statisticsGatheringOnTheFlyService,
334                 getDeviceInfo(),
335                 /*MultipartType.OFPMPFLOW*/ multipartType,
336                 deviceContext,
337                 deviceContext,
338                 initial, multipartReplyTranslator) : emptyFuture;
339     }
340
341     private ListenableFuture<Boolean> collectTableStatistics(final MultipartType multipartType) {
342         return devState.isTableStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
343                 statisticsGatheringService,
344                 getDeviceInfo(),
345                 /*MultipartType.OFPMPTABLE*/ multipartType,
346                 deviceContext,
347                 deviceContext,
348                 false, multipartReplyTranslator) : emptyFuture;
349     }
350
351     private ListenableFuture<Boolean> collectPortStatistics(final MultipartType multipartType) {
352         return devState.isPortStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
353                 statisticsGatheringService,
354                 getDeviceInfo(),
355                 /*MultipartType.OFPMPPORTSTATS*/ multipartType,
356                 deviceContext,
357                 deviceContext,
358                 false, multipartReplyTranslator) : emptyFuture;
359     }
360
361     private ListenableFuture<Boolean> collectQueueStatistics(final MultipartType multipartType) {
362         return !devState.isQueueStatisticsAvailable() ? emptyFuture : StatisticsGatheringUtils.gatherStatistics(
363                 statisticsGatheringService,
364                 getDeviceInfo(),
365                 /*MultipartType.OFPMPQUEUE*/ multipartType,
366                 deviceContext,
367                 deviceContext,
368                 false, multipartReplyTranslator);
369     }
370
371     private ListenableFuture<Boolean> collectGroupDescStatistics(final MultipartType multipartType) {
372         return devState.isGroupAvailable() ? StatisticsGatheringUtils.gatherStatistics(
373                 statisticsGatheringService,
374                 getDeviceInfo(),
375                 /*MultipartType.OFPMPGROUPDESC*/ multipartType,
376                 deviceContext,
377                 deviceContext,
378                 false, multipartReplyTranslator) : emptyFuture;
379     }
380
381     private ListenableFuture<Boolean> collectGroupStatistics(final MultipartType multipartType) {
382         return devState.isGroupAvailable() ? StatisticsGatheringUtils.gatherStatistics(
383                 statisticsGatheringService,
384                 getDeviceInfo(),
385                 /*MultipartType.OFPMPGROUP*/ multipartType,
386                 deviceContext,
387                 deviceContext,
388                 false, multipartReplyTranslator) : emptyFuture;
389     }
390
391     private ListenableFuture<Boolean> collectMeterConfigStatistics(final MultipartType multipartType) {
392         return devState.isMetersAvailable() ? StatisticsGatheringUtils.gatherStatistics(
393                 statisticsGatheringService,
394                 getDeviceInfo(),
395                 /*MultipartType.OFPMPMETERCONFIG*/ multipartType,
396                 deviceContext,
397                 deviceContext,
398                 false, multipartReplyTranslator) : emptyFuture;
399     }
400
401     private ListenableFuture<Boolean> collectMeterStatistics(final MultipartType multipartType) {
402         return devState.isMetersAvailable() ? StatisticsGatheringUtils.gatherStatistics(
403                 statisticsGatheringService,
404                 getDeviceInfo(),
405                 /*MultipartType.OFPMPMETER*/ multipartType,
406                 deviceContext,
407                 deviceContext,
408                 false, multipartReplyTranslator) : emptyFuture;
409     }
410
411     @VisibleForTesting
412     void setStatisticsGatheringService(final StatisticsGatheringService statisticsGatheringService) {
413         this.statisticsGatheringService = statisticsGatheringService;
414     }
415
416     @VisibleForTesting
417     void setStatisticsGatheringOnTheFlyService(final StatisticsGatheringOnTheFlyService
418                                                              statisticsGatheringOnTheFlyService) {
419         this.statisticsGatheringOnTheFlyService = statisticsGatheringOnTheFlyService;
420     }
421
422     @Override
423     public ItemLifecycleListener getItemLifeCycleListener () {
424         return itemLifeCycleListener;
425     }
426
427     @Override
428     public CONTEXT_STATE getState() {
429         return this.state;
430     }
431
432     @Override
433     public ServiceGroupIdentifier getServiceIdentifier() {
434         return this.deviceInfo.getServiceIdentifier();
435     }
436
437     @Override
438     public DeviceInfo getDeviceInfo() {
439         return this.deviceInfo;
440     }
441
442     @Override
443     public ListenableFuture<Void> stopClusterServices(boolean connectionInterrupted) {
444         if (CONTEXT_STATE.TERMINATION.equals(getState())) {
445             return Futures.immediateCancelledFuture();
446         }
447
448         return Futures.transform(Futures.immediateFuture(null), new Function<Object, Void>() {
449             @Nullable
450             @Override
451             public Void apply(@Nullable Object input) {
452                 schedulingEnabled = false;
453                 stopGatheringData();
454                 return null;
455             }
456         });
457     }
458
459     @Override
460     public DeviceState gainDeviceState() {
461         return gainDeviceContext().getDeviceState();
462     }
463
464     @Override
465     public DeviceContext gainDeviceContext() {
466         return this.lifecycleService.getDeviceContext();
467     }
468
469     @Override
470     public void stopGatheringData() {
471         if (Objects.nonNull(this.lastDataGathering)) {
472             if (LOG.isDebugEnabled()) {
473                 LOG.debug("Stop the running statistics gathering for node {}", this.deviceInfo.getLOGValue());
474             }
475
476             lastDataGathering.cancel(true);
477         }
478     }
479
480     @Override
481     public void setLifecycleInitializationPhaseHandler(final ClusterInitializationPhaseHandler handler) {
482         this.clusterInitializationPhaseHandler = handler;
483     }
484
485     @Override
486     public boolean onContextInstantiateService(final ConnectionContext connectionContext) {
487         if (connectionContext.getConnectionState().equals(ConnectionContext.CONNECTION_STATE.RIP)) {
488             LOG.warn("Connection on device {} was interrupted, will stop starting master services.", deviceInfo.getLOGValue());
489             return false;
490         }
491
492         LOG.info("Starting statistics context cluster services for node {}", deviceInfo.getLOGValue());
493
494         this.statListForCollectingInitialization();
495         Futures.addCallback(this.initialGatherDynamicData(), new FutureCallback<Boolean>() {
496
497             @Override
498             public void onSuccess(@Nullable Boolean aBoolean) {
499                 initialSubmitHandler.initialSubmitTransaction();
500             }
501
502             @Override
503             public void onFailure(Throwable throwable) {
504                 LOG.warn("Initial gathering statistics unsuccessful for node {}", deviceInfo.getLOGValue());
505                 lifecycleService.closeConnection();
506             }
507         });
508
509         if (this.isStatisticsPollingOn) {
510             myManager.startScheduling(deviceInfo);
511         }
512
513         return this.clusterInitializationPhaseHandler.onContextInstantiateService(connectionContext);
514     }
515
516     @Override
517     public void setInitialSubmitHandler(final ClusterInitializationPhaseHandler initialSubmitHandler) {
518         this.initialSubmitHandler = initialSubmitHandler;
519     }
520 }