Disconnection improvements.
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / statistics / StatisticsContextImpl.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.impl.statistics;
10
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Function;
13 import com.google.common.base.Preconditions;
14 import com.google.common.collect.ImmutableList;
15 import com.google.common.collect.Iterators;
16 import com.google.common.util.concurrent.FutureCallback;
17 import com.google.common.util.concurrent.Futures;
18 import com.google.common.util.concurrent.ListenableFuture;
19 import com.google.common.util.concurrent.SettableFuture;
20 import io.netty.util.Timeout;
21 import java.util.ArrayList;
22 import java.util.Collection;
23 import java.util.HashSet;
24 import java.util.Iterator;
25 import java.util.List;
26 import java.util.Objects;
27 import java.util.Optional;
28 import javax.annotation.Nonnull;
29 import javax.annotation.Nullable;
30 import javax.annotation.concurrent.GuardedBy;
31 import org.opendaylight.mdsal.common.api.TransactionChainClosedException;
32 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
33 import org.opendaylight.openflowplugin.api.ConnectionException;
34 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
35 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
36 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
37 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
38 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
39 import org.opendaylight.openflowplugin.api.openflow.device.handlers.ClusterInitializationPhaseHandler;
40 import org.opendaylight.openflowplugin.api.openflow.lifecycle.MastershipChangeListener;
41 import org.opendaylight.openflowplugin.api.openflow.rpc.listener.ItemLifecycleListener;
42 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
43 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
44 import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
45 import org.opendaylight.openflowplugin.impl.rpc.listener.ItemLifecycleListenerImpl;
46 import org.opendaylight.openflowplugin.impl.services.util.RequestContextUtil;
47 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider;
48 import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringOnTheFlyService;
49 import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringService;
50 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55
56 class StatisticsContextImpl<T extends OfHeader> implements StatisticsContext {
57
58     private static final Logger LOG = LoggerFactory.getLogger(StatisticsContextImpl.class);
59     private static final String CONNECTION_CLOSED = "Connection closed.";
60
61     private final ItemLifecycleListener itemLifeCycleListener;
62     private final Collection<RequestContext<?>> requestContexts = new HashSet<>();
63     private final DeviceContext deviceContext;
64     private final DeviceState devState;
65     private final ListenableFuture<Boolean> emptyFuture;
66     private final boolean isStatisticsPollingOn;
67     private final Object collectionStatTypeLock = new Object();
68     private final ConvertorExecutor convertorExecutor;
69     private final MultipartWriterProvider statisticsWriterProvider;
70     @GuardedBy("collectionStatTypeLock")
71     private List<MultipartType> collectingStatType;
72
73     private StatisticsGatheringService<T> statisticsGatheringService;
74     private StatisticsGatheringOnTheFlyService<T> statisticsGatheringOnTheFlyService;
75     private Timeout pollTimeout;
76     private final DeviceInfo deviceInfo;
77     private final StatisticsManager myManager;
78
79     private volatile boolean schedulingEnabled;
80     private volatile CONTEXT_STATE state;
81     private ClusterInitializationPhaseHandler clusterInitializationPhaseHandler;
82     private ClusterInitializationPhaseHandler initialSubmitHandler;
83
84     private ListenableFuture<Boolean> lastDataGathering;
85
86     StatisticsContextImpl(final boolean isStatisticsPollingOn,
87                           @Nonnull final DeviceContext deviceContext,
88                           @Nonnull final ConvertorExecutor convertorExecutor,
89                           @Nonnull final StatisticsManager myManager,
90                           @Nonnull final MultipartWriterProvider statisticsWriterProvider) {
91         this.deviceContext = deviceContext;
92         this.devState = Preconditions.checkNotNull(deviceContext.getDeviceState());
93         this.isStatisticsPollingOn = isStatisticsPollingOn;
94         this.convertorExecutor = convertorExecutor;
95         emptyFuture = Futures.immediateFuture(false);
96         statisticsGatheringService = new StatisticsGatheringService<>(this, deviceContext);
97         statisticsGatheringOnTheFlyService = new StatisticsGatheringOnTheFlyService<>(this,
98             deviceContext, convertorExecutor, statisticsWriterProvider);
99         itemLifeCycleListener = new ItemLifecycleListenerImpl(deviceContext);
100         statListForCollectingInitialization();
101         this.state = CONTEXT_STATE.INITIALIZATION;
102         this.deviceInfo = deviceContext.getDeviceInfo();
103         this.myManager = myManager;
104         this.lastDataGathering = null;
105         this.statisticsWriterProvider = statisticsWriterProvider;
106     }
107
108     @Override
109     public void statListForCollectingInitialization() {
110         synchronized (collectionStatTypeLock) {
111             final List<MultipartType> statListForCollecting = new ArrayList<>();
112             if (devState.isTableStatisticsAvailable()) {
113                 statListForCollecting.add(MultipartType.OFPMPTABLE);
114             }
115             if (devState.isFlowStatisticsAvailable()) {
116                 statListForCollecting.add(MultipartType.OFPMPFLOW);
117             }
118             if (devState.isGroupAvailable()) {
119                 statListForCollecting.add(MultipartType.OFPMPGROUPDESC);
120                 statListForCollecting.add(MultipartType.OFPMPGROUP);
121             }
122             if (devState.isMetersAvailable()) {
123                 statListForCollecting.add(MultipartType.OFPMPMETERCONFIG);
124                 statListForCollecting.add(MultipartType.OFPMPMETER);
125             }
126             if (devState.isPortStatisticsAvailable()) {
127                 statListForCollecting.add(MultipartType.OFPMPPORTSTATS);
128             }
129             if (devState.isQueueStatisticsAvailable()) {
130                 statListForCollecting.add(MultipartType.OFPMPQUEUE);
131             }
132             collectingStatType = ImmutableList.<MultipartType>copyOf(statListForCollecting);
133         }
134     }
135
136
137     @Override
138     public ListenableFuture<Boolean> initialGatherDynamicData() {
139         return gatherDynamicData(true);
140     }
141
142     @Override
143     public ListenableFuture<Boolean> gatherDynamicData(){
144         return gatherDynamicData(false);
145     }
146
147     private ListenableFuture<Boolean> gatherDynamicData(final boolean initial) {
148         this.lastDataGathering = null;
149         if (!isStatisticsPollingOn) {
150             LOG.debug("Statistics for device {} is not enabled.", getDeviceInfo().getNodeId().getValue());
151             return Futures.immediateFuture(Boolean.TRUE);
152         }
153         final ListenableFuture<Boolean> errorResultFuture = deviceConnectionCheck();
154         if (errorResultFuture != null) {
155             return errorResultFuture;
156         }
157         synchronized (collectionStatTypeLock) {
158             final Iterator<MultipartType> statIterator = collectingStatType.iterator();
159             final SettableFuture<Boolean> settableStatResultFuture = SettableFuture.create();
160
161             // write start timestamp to state snapshot container
162             StatisticsGatheringUtils.markDeviceStateSnapshotStart(deviceContext);
163
164             statChainFuture(statIterator, settableStatResultFuture, initial);
165
166             // write end timestamp to state snapshot container
167             Futures.addCallback(settableStatResultFuture, new FutureCallback<Boolean>() {
168                 @Override
169                 public void onSuccess(@Nullable final Boolean result) {
170                     StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceContext, true);
171                 }
172                 @Override
173                 public void onFailure(final Throwable t) {
174                     if (!(t instanceof TransactionChainClosedException)) {
175                         StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceContext, false);
176                     }
177                 }
178             });
179             this.lastDataGathering = settableStatResultFuture;
180             return settableStatResultFuture;
181         }
182     }
183
184     private ListenableFuture<Boolean> chooseStat(final MultipartType multipartType, final boolean initial){
185         ListenableFuture<Boolean> result = Futures.immediateCheckedFuture(Boolean.TRUE);
186
187         switch (multipartType) {
188             case OFPMPFLOW:
189                 result = collectFlowStatistics(multipartType, initial);
190                 break;
191             case OFPMPTABLE:
192                 result = collectTableStatistics(multipartType);
193                 break;
194             case OFPMPPORTSTATS:
195                 result = collectPortStatistics(multipartType);
196                 break;
197             case OFPMPQUEUE:
198                 result = collectQueueStatistics(multipartType);
199                 break;
200             case OFPMPGROUPDESC:
201                 result = collectGroupDescStatistics(multipartType);
202                 break;
203             case OFPMPGROUP:
204                 result = collectGroupStatistics(multipartType);
205                 break;
206             case OFPMPMETERCONFIG:
207                 result = collectMeterConfigStatistics(multipartType);
208                 break;
209             case OFPMPMETER:
210                 result = collectMeterStatistics(multipartType);
211                 break;
212             default:
213                 LOG.warn("Unsupported Statistics type {}", multipartType);
214         }
215
216         return result;
217     }
218
219
220     @Override
221     public <T> RequestContext<T> createRequestContext() {
222         final AbstractRequestContext<T> ret = new AbstractRequestContext<T>(deviceInfo.reserveXidForDeviceMessage()) {
223             @Override
224             public void close() {
225                 requestContexts.remove(this);
226             }
227         };
228         requestContexts.add(ret);
229         return ret;
230     }
231
232     @Override
233     public void close() {
234         if (CONTEXT_STATE.TERMINATION.equals(getState())) {
235             if (LOG.isDebugEnabled()) {
236                 LOG.debug("StatisticsContext for node {} is already in TERMINATION state.", getDeviceInfo().getLOGValue());
237             }
238         } else {
239             try {
240                 stopClusterServices(true).get();
241             } catch (Exception e) {
242                 LOG.debug("Failed to close StatisticsContext for node {} with exception: ", getDeviceInfo().getLOGValue(), e);
243             }
244
245             this.state = CONTEXT_STATE.TERMINATION;
246
247             for (final Iterator<RequestContext<?>> iterator = Iterators.consumingIterator(requestContexts.iterator());
248                  iterator.hasNext(); ) {
249                 RequestContextUtil.closeRequestContextWithRpcError(iterator.next(), CONNECTION_CLOSED);
250             }
251
252             if (null != pollTimeout && !pollTimeout.isExpired()) {
253                 pollTimeout.cancel();
254             }
255         }
256     }
257
258     @Override
259     public void setSchedulingEnabled(final boolean schedulingEnabled) {
260         this.schedulingEnabled = schedulingEnabled;
261     }
262
263     @Override
264     public boolean isSchedulingEnabled() {
265         return schedulingEnabled;
266     }
267
268     @Override
269     public void setPollTimeout(final Timeout pollTimeout) {
270         this.pollTimeout = pollTimeout;
271     }
272
273     @Override
274     public Optional<Timeout> getPollTimeout() {
275         return Optional.ofNullable(pollTimeout);
276     }
277
278     private void statChainFuture(final Iterator<MultipartType> iterator, final SettableFuture<Boolean> resultFuture, final boolean initial) {
279         if (ConnectionContext.CONNECTION_STATE.RIP.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) {
280             final String errMsg = String.format("Device connection is closed for Node : %s.",
281                 getDeviceInfo().getNodeId());
282             LOG.debug(errMsg);
283             resultFuture.setException(new ConnectionException(errMsg));
284             return;
285         }
286         if ( ! iterator.hasNext()) {
287             resultFuture.set(Boolean.TRUE);
288             LOG.debug("Stats collection successfully finished for node {}", getDeviceInfo().getLOGValue());
289             return;
290         }
291
292         final MultipartType nextType = iterator.next();
293         LOG.debug("Stats iterating to next type for node {} of type {}", getDeviceInfo().getLOGValue(), nextType);
294
295         final ListenableFuture<Boolean> deviceStatisticsCollectionFuture = chooseStat(nextType, initial);
296         Futures.addCallback(deviceStatisticsCollectionFuture, new FutureCallback<Boolean>() {
297             @Override
298             public void onSuccess(final Boolean result) {
299                 statChainFuture(iterator, resultFuture, initial);
300             }
301             @Override
302             public void onFailure(@Nonnull final Throwable t) {
303                 resultFuture.setException(t);
304             }
305         });
306     }
307
308     /**
309      * Method checks a device state. It returns null for be able continue. Otherwise it returns immediateFuture
310      * which has to be returned from caller too
311      *
312      * @return future
313      */
314     @VisibleForTesting
315     ListenableFuture<Boolean> deviceConnectionCheck() {
316         if (!ConnectionContext.CONNECTION_STATE.WORKING.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) {
317             ListenableFuture<Boolean> resultingFuture;
318             switch (deviceContext.getPrimaryConnectionContext().getConnectionState()) {
319                 case RIP:
320                     final String errMsg = String.format("Device connection doesn't exist anymore. Primary connection status : %s",
321                         deviceContext.getPrimaryConnectionContext().getConnectionState());
322                     resultingFuture = Futures.immediateFailedFuture(new Throwable(errMsg));
323                     break;
324                 default:
325                     resultingFuture = Futures.immediateCheckedFuture(Boolean.TRUE);
326                     break;
327             }
328             return resultingFuture;
329         }
330         return null;
331     }
332
333     //TODO: Refactor twice sending deviceContext into gatheringStatistics
334     private ListenableFuture<Boolean> collectFlowStatistics(final MultipartType multipartType, final boolean initial) {
335         return devState.isFlowStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
336             statisticsGatheringOnTheFlyService,
337             getDeviceInfo(),
338             /*MultipartType.OFPMPFLOW*/ multipartType,
339             deviceContext,
340             deviceContext,
341             initial,
342             convertorExecutor,
343             statisticsWriterProvider) : emptyFuture;
344     }
345
346     private ListenableFuture<Boolean> collectTableStatistics(final MultipartType multipartType) {
347         return devState.isTableStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
348             statisticsGatheringService,
349             getDeviceInfo(),
350                 /*MultipartType.OFPMPTABLE*/ multipartType,
351             deviceContext,
352             deviceContext,
353             false,
354             convertorExecutor,
355             statisticsWriterProvider) : emptyFuture;
356     }
357
358     private ListenableFuture<Boolean> collectPortStatistics(final MultipartType multipartType) {
359         return devState.isPortStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
360             statisticsGatheringService,
361             getDeviceInfo(),
362                 /*MultipartType.OFPMPPORTSTATS*/ multipartType,
363             deviceContext,
364             deviceContext,
365             false,
366             convertorExecutor,
367             statisticsWriterProvider) : emptyFuture;
368     }
369
370     private ListenableFuture<Boolean> collectQueueStatistics(final MultipartType multipartType) {
371         return !devState.isQueueStatisticsAvailable() ? emptyFuture : StatisticsGatheringUtils.gatherStatistics(
372             statisticsGatheringService,
373             getDeviceInfo(),
374                 /*MultipartType.OFPMPQUEUE*/ multipartType,
375             deviceContext,
376             deviceContext,
377             false,
378             convertorExecutor,
379             statisticsWriterProvider);
380     }
381
382     private ListenableFuture<Boolean> collectGroupDescStatistics(final MultipartType multipartType) {
383         return devState.isGroupAvailable() ? StatisticsGatheringUtils.gatherStatistics(
384             statisticsGatheringService,
385             getDeviceInfo(),
386                 /*MultipartType.OFPMPGROUPDESC*/ multipartType,
387             deviceContext,
388             deviceContext,
389             false,
390             convertorExecutor,
391             statisticsWriterProvider) : emptyFuture;
392     }
393
394     private ListenableFuture<Boolean> collectGroupStatistics(final MultipartType multipartType) {
395         return devState.isGroupAvailable() ? StatisticsGatheringUtils.gatherStatistics(
396             statisticsGatheringService,
397             getDeviceInfo(),
398                 /*MultipartType.OFPMPGROUP*/ multipartType,
399             deviceContext,
400             deviceContext,
401             false,
402             convertorExecutor,
403             statisticsWriterProvider) : emptyFuture;
404     }
405
406     private ListenableFuture<Boolean> collectMeterConfigStatistics(final MultipartType multipartType) {
407         return devState.isMetersAvailable() ? StatisticsGatheringUtils.gatherStatistics(
408             statisticsGatheringService,
409             getDeviceInfo(),
410                 /*MultipartType.OFPMPMETERCONFIG*/ multipartType,
411             deviceContext,
412             deviceContext,
413             false,
414             convertorExecutor,
415             statisticsWriterProvider) : emptyFuture;
416     }
417
418     private ListenableFuture<Boolean> collectMeterStatistics(final MultipartType multipartType) {
419         return devState.isMetersAvailable() ? StatisticsGatheringUtils.gatherStatistics(
420             statisticsGatheringService,
421             getDeviceInfo(),
422                 /*MultipartType.OFPMPMETER*/ multipartType,
423             deviceContext,
424             deviceContext,
425             false,
426             convertorExecutor,
427             statisticsWriterProvider) : emptyFuture;
428     }
429
430     @VisibleForTesting
431     void setStatisticsGatheringService(final StatisticsGatheringService<T> statisticsGatheringService) {
432         this.statisticsGatheringService = statisticsGatheringService;
433     }
434
435     @VisibleForTesting
436     void setStatisticsGatheringOnTheFlyService(final StatisticsGatheringOnTheFlyService<T> statisticsGatheringOnTheFlyService) {
437         this.statisticsGatheringOnTheFlyService = statisticsGatheringOnTheFlyService;
438     }
439
440     @Override
441     public ItemLifecycleListener getItemLifeCycleListener () {
442         return itemLifeCycleListener;
443     }
444
445     @Override
446     public CONTEXT_STATE getState() {
447         return this.state;
448     }
449
450     @Override
451     public ServiceGroupIdentifier getServiceIdentifier() {
452         return this.deviceInfo.getServiceIdentifier();
453     }
454
455     @Override
456     public DeviceInfo getDeviceInfo() {
457         return this.deviceInfo;
458     }
459
460     @Override
461     public ListenableFuture<Void> stopClusterServices() {
462         if (CONTEXT_STATE.TERMINATION.equals(this.state)) {
463             return Futures.immediateCancelledFuture();
464         }
465
466         return Futures.transform(Futures.immediateFuture(null), new Function<Object, Void>() {
467             @Nullable
468             @Override
469             public Void apply(@Nullable Object input) {
470                 schedulingEnabled = false;
471                 stopGatheringData();
472                 return null;
473             }
474         });
475     }
476
477     @Override
478     public DeviceState gainDeviceState() {
479         return gainDeviceContext().getDeviceState();
480     }
481
482     @Override
483     public DeviceContext gainDeviceContext() {
484         return this.deviceContext;
485     }
486
487     @Override
488     public void stopGatheringData() {
489         if (Objects.nonNull(this.lastDataGathering)) {
490             if (LOG.isDebugEnabled()) {
491                 LOG.debug("Stop the running statistics gathering for node {}", this.deviceInfo.getLOGValue());
492             }
493
494             lastDataGathering.cancel(true);
495         }
496     }
497
498     @Override
499     public void setLifecycleInitializationPhaseHandler(final ClusterInitializationPhaseHandler handler) {
500         this.clusterInitializationPhaseHandler = handler;
501     }
502
503     @Override
504     public boolean onContextInstantiateService(final MastershipChangeListener mastershipChangeListener) {
505
506         LOG.info("Starting statistics context cluster services for node {}", deviceInfo.getLOGValue());
507
508         this.statListForCollectingInitialization();
509         Futures.addCallback(this.initialGatherDynamicData(), new FutureCallback<Boolean>() {
510
511             @Override
512             public void onSuccess(@Nullable Boolean aBoolean) {
513                 initialSubmitHandler.initialSubmitTransaction();
514             }
515
516             @Override
517             public void onFailure(@Nonnull Throwable throwable) {
518                 LOG.warn("Initial gathering statistics unsuccessful for node {}", deviceInfo.getLOGValue());
519                 mastershipChangeListener.onNotAbleToStartMastership(deviceInfo);
520             }
521         });
522
523         if (this.isStatisticsPollingOn) {
524             myManager.startScheduling(deviceInfo);
525         }
526
527         return this.clusterInitializationPhaseHandler.onContextInstantiateService(mastershipChangeListener);
528     }
529
530     @Override
531     public void setInitialSubmitHandler(final ClusterInitializationPhaseHandler initialSubmitHandler) {
532         this.initialSubmitHandler = initialSubmitHandler;
533     }
534 }