Merge "Replace odl-dlux-core with odl-dluxapps-topology"
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / statistics / StatisticsContextImpl.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.impl.statistics;
10
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Function;
13 import com.google.common.base.Preconditions;
14 import com.google.common.collect.ImmutableList;
15 import com.google.common.collect.Iterators;
16 import com.google.common.util.concurrent.FutureCallback;
17 import com.google.common.util.concurrent.Futures;
18 import com.google.common.util.concurrent.ListenableFuture;
19 import com.google.common.util.concurrent.SettableFuture;
20 import io.netty.util.Timeout;
21 import java.util.ArrayList;
22 import java.util.Collection;
23 import java.util.HashSet;
24 import java.util.Iterator;
25 import java.util.List;
26 import java.util.Objects;
27 import java.util.Optional;
28 import javax.annotation.Nonnull;
29 import javax.annotation.Nullable;
30 import javax.annotation.concurrent.GuardedBy;
31 import org.opendaylight.mdsal.common.api.TransactionChainClosedException;
32 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
33 import org.opendaylight.openflowplugin.api.ConnectionException;
34 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
35 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
36 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
37 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
38 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
39 import org.opendaylight.openflowplugin.api.openflow.device.handlers.ClusterInitializationPhaseHandler;
40 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
41 import org.opendaylight.openflowplugin.api.openflow.rpc.listener.ItemLifecycleListener;
42 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
43 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
44 import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
45 import org.opendaylight.openflowplugin.impl.rpc.listener.ItemLifecycleListenerImpl;
46 import org.opendaylight.openflowplugin.impl.services.util.RequestContextUtil;
47 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider;
48 import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringOnTheFlyService;
49 import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringService;
50 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55
56 class StatisticsContextImpl<T extends OfHeader> implements StatisticsContext {
57
58     private static final Logger LOG = LoggerFactory.getLogger(StatisticsContextImpl.class);
59     private static final String CONNECTION_CLOSED = "Connection closed.";
60
61     private final ItemLifecycleListener itemLifeCycleListener;
62     private final Collection<RequestContext<?>> requestContexts = new HashSet<>();
63     private final DeviceContext deviceContext;
64     private final DeviceState devState;
65     private final ListenableFuture<Boolean> emptyFuture;
66     private final boolean isStatisticsPollingOn;
67     private final Object collectionStatTypeLock = new Object();
68     private final ConvertorExecutor convertorExecutor;
69     private final MultipartWriterProvider statisticsWriterProvider;
70     @GuardedBy("collectionStatTypeLock")
71     private List<MultipartType> collectingStatType;
72
73     private StatisticsGatheringService<T> statisticsGatheringService;
74     private StatisticsGatheringOnTheFlyService<T> statisticsGatheringOnTheFlyService;
75     private Timeout pollTimeout;
76     private final DeviceInfo deviceInfo;
77     private final StatisticsManager myManager;
78     private final LifecycleService lifecycleService;
79
80     private volatile boolean schedulingEnabled;
81     private volatile CONTEXT_STATE state;
82     private ClusterInitializationPhaseHandler clusterInitializationPhaseHandler;
83     private ClusterInitializationPhaseHandler initialSubmitHandler;
84
85     private ListenableFuture<Boolean> lastDataGathering;
86
87     StatisticsContextImpl(@Nonnull final DeviceInfo deviceInfo,
88                           final boolean isStatisticsPollingOn,
89                           @Nonnull final LifecycleService lifecycleService,
90                           @Nonnull final ConvertorExecutor convertorExecutor,
91                           @Nonnull final StatisticsManager myManager,
92                           @Nonnull final MultipartWriterProvider statisticsWriterProvider) {
93         this.lifecycleService = lifecycleService;
94         this.deviceContext = lifecycleService.getDeviceContext();
95         this.devState = Preconditions.checkNotNull(deviceContext.getDeviceState());
96         this.isStatisticsPollingOn = isStatisticsPollingOn;
97         this.convertorExecutor = convertorExecutor;
98         emptyFuture = Futures.immediateFuture(false);
99         statisticsGatheringService = new StatisticsGatheringService<>(this, deviceContext);
100         statisticsGatheringOnTheFlyService = new StatisticsGatheringOnTheFlyService<>(this,
101             deviceContext, convertorExecutor, statisticsWriterProvider);
102         itemLifeCycleListener = new ItemLifecycleListenerImpl(deviceContext);
103         statListForCollectingInitialization();
104         this.state = CONTEXT_STATE.INITIALIZATION;
105         this.deviceInfo = deviceInfo;
106         this.myManager = myManager;
107         this.lastDataGathering = null;
108         this.statisticsWriterProvider = statisticsWriterProvider;
109     }
110
111     @Override
112     public void statListForCollectingInitialization() {
113         synchronized (collectionStatTypeLock) {
114             final List<MultipartType> statListForCollecting = new ArrayList<>();
115             if (devState.isTableStatisticsAvailable()) {
116                 statListForCollecting.add(MultipartType.OFPMPTABLE);
117             }
118             if (devState.isFlowStatisticsAvailable()) {
119                 statListForCollecting.add(MultipartType.OFPMPFLOW);
120             }
121             if (devState.isGroupAvailable()) {
122                 statListForCollecting.add(MultipartType.OFPMPGROUPDESC);
123                 statListForCollecting.add(MultipartType.OFPMPGROUP);
124             }
125             if (devState.isMetersAvailable()) {
126                 statListForCollecting.add(MultipartType.OFPMPMETERCONFIG);
127                 statListForCollecting.add(MultipartType.OFPMPMETER);
128             }
129             if (devState.isPortStatisticsAvailable()) {
130                 statListForCollecting.add(MultipartType.OFPMPPORTSTATS);
131             }
132             if (devState.isQueueStatisticsAvailable()) {
133                 statListForCollecting.add(MultipartType.OFPMPQUEUE);
134             }
135             collectingStatType = ImmutableList.<MultipartType>copyOf(statListForCollecting);
136         }
137     }
138
139
140     @Override
141     public ListenableFuture<Boolean> initialGatherDynamicData() {
142         return gatherDynamicData(true);
143     }
144
145     @Override
146     public ListenableFuture<Boolean> gatherDynamicData(){
147         return gatherDynamicData(false);
148     }
149
150     private ListenableFuture<Boolean> gatherDynamicData(final boolean initial) {
151         this.lastDataGathering = null;
152         if (!isStatisticsPollingOn) {
153             LOG.debug("Statistics for device {} is not enabled.", getDeviceInfo().getNodeId().getValue());
154             return Futures.immediateFuture(Boolean.TRUE);
155         }
156         final ListenableFuture<Boolean> errorResultFuture = deviceConnectionCheck();
157         if (errorResultFuture != null) {
158             return errorResultFuture;
159         }
160         synchronized (collectionStatTypeLock) {
161             final Iterator<MultipartType> statIterator = collectingStatType.iterator();
162             final SettableFuture<Boolean> settableStatResultFuture = SettableFuture.create();
163
164             // write start timestamp to state snapshot container
165             StatisticsGatheringUtils.markDeviceStateSnapshotStart(deviceContext);
166
167             statChainFuture(statIterator, settableStatResultFuture, initial);
168
169             // write end timestamp to state snapshot container
170             Futures.addCallback(settableStatResultFuture, new FutureCallback<Boolean>() {
171                 @Override
172                 public void onSuccess(@Nullable final Boolean result) {
173                     StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceContext, true);
174                 }
175                 @Override
176                 public void onFailure(final Throwable t) {
177                     if (!(t instanceof TransactionChainClosedException)) {
178                         StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceContext, false);
179                     }
180                 }
181             });
182             this.lastDataGathering = settableStatResultFuture;
183             return settableStatResultFuture;
184         }
185     }
186
187     private ListenableFuture<Boolean> chooseStat(final MultipartType multipartType, final boolean initial){
188         ListenableFuture<Boolean> result = Futures.immediateCheckedFuture(Boolean.TRUE);
189
190         switch (multipartType) {
191             case OFPMPFLOW:
192                 result = collectFlowStatistics(multipartType, initial);
193                 break;
194             case OFPMPTABLE:
195                 result = collectTableStatistics(multipartType);
196                 break;
197             case OFPMPPORTSTATS:
198                 result = collectPortStatistics(multipartType);
199                 break;
200             case OFPMPQUEUE:
201                 result = collectQueueStatistics(multipartType);
202                 break;
203             case OFPMPGROUPDESC:
204                 result = collectGroupDescStatistics(multipartType);
205                 break;
206             case OFPMPGROUP:
207                 result = collectGroupStatistics(multipartType);
208                 break;
209             case OFPMPMETERCONFIG:
210                 result = collectMeterConfigStatistics(multipartType);
211                 break;
212             case OFPMPMETER:
213                 result = collectMeterStatistics(multipartType);
214                 break;
215             default:
216                 LOG.warn("Unsupported Statistics type {}", multipartType);
217         }
218
219         return result;
220     }
221
222
223     @Override
224     public <T> RequestContext<T> createRequestContext() {
225         final AbstractRequestContext<T> ret = new AbstractRequestContext<T>(deviceInfo.reserveXidForDeviceMessage()) {
226             @Override
227             public void close() {
228                 requestContexts.remove(this);
229             }
230         };
231         requestContexts.add(ret);
232         return ret;
233     }
234
235     @Override
236     public void close() {
237         if (CONTEXT_STATE.TERMINATION.equals(getState())) {
238             if (LOG.isDebugEnabled()) {
239                 LOG.debug("StatisticsContext for node {} is already in TERMINATION state.", getDeviceInfo().getLOGValue());
240             }
241         } else {
242             try {
243                 stopClusterServices(true).get();
244             } catch (Exception e) {
245                 LOG.debug("Failed to close StatisticsContext for node {} with exception: ", getDeviceInfo().getLOGValue(), e);
246             }
247
248             this.state = CONTEXT_STATE.TERMINATION;
249
250             for (final Iterator<RequestContext<?>> iterator = Iterators.consumingIterator(requestContexts.iterator());
251                  iterator.hasNext(); ) {
252                 RequestContextUtil.closeRequestContextWithRpcError(iterator.next(), CONNECTION_CLOSED);
253             }
254
255             if (null != pollTimeout && !pollTimeout.isExpired()) {
256                 pollTimeout.cancel();
257             }
258         }
259     }
260
261     @Override
262     public void setSchedulingEnabled(final boolean schedulingEnabled) {
263         this.schedulingEnabled = schedulingEnabled;
264     }
265
266     @Override
267     public boolean isSchedulingEnabled() {
268         return schedulingEnabled;
269     }
270
271     @Override
272     public void setPollTimeout(final Timeout pollTimeout) {
273         this.pollTimeout = pollTimeout;
274     }
275
276     @Override
277     public Optional<Timeout> getPollTimeout() {
278         return Optional.ofNullable(pollTimeout);
279     }
280
281     private void statChainFuture(final Iterator<MultipartType> iterator, final SettableFuture<Boolean> resultFuture, final boolean initial) {
282         if (ConnectionContext.CONNECTION_STATE.RIP.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) {
283             final String errMsg = String.format("Device connection is closed for Node : %s.",
284                 getDeviceInfo().getNodeId());
285             LOG.debug(errMsg);
286             resultFuture.setException(new ConnectionException(errMsg));
287             return;
288         }
289         if ( ! iterator.hasNext()) {
290             resultFuture.set(Boolean.TRUE);
291             LOG.debug("Stats collection successfully finished for node {}", getDeviceInfo().getLOGValue());
292             return;
293         }
294
295         final MultipartType nextType = iterator.next();
296         LOG.debug("Stats iterating to next type for node {} of type {}", getDeviceInfo().getLOGValue(), nextType);
297
298         final ListenableFuture<Boolean> deviceStatisticsCollectionFuture = chooseStat(nextType, initial);
299         Futures.addCallback(deviceStatisticsCollectionFuture, new FutureCallback<Boolean>() {
300             @Override
301             public void onSuccess(final Boolean result) {
302                 statChainFuture(iterator, resultFuture, initial);
303             }
304             @Override
305             public void onFailure(@Nonnull final Throwable t) {
306                 resultFuture.setException(t);
307             }
308         });
309     }
310
311     /**
312      * Method checks a device state. It returns null for be able continue. Otherwise it returns immediateFuture
313      * which has to be returned from caller too
314      *
315      * @return future
316      */
317     @VisibleForTesting
318     ListenableFuture<Boolean> deviceConnectionCheck() {
319         if (!ConnectionContext.CONNECTION_STATE.WORKING.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) {
320             ListenableFuture<Boolean> resultingFuture;
321             switch (deviceContext.getPrimaryConnectionContext().getConnectionState()) {
322                 case RIP:
323                     final String errMsg = String.format("Device connection doesn't exist anymore. Primary connection status : %s",
324                         deviceContext.getPrimaryConnectionContext().getConnectionState());
325                     resultingFuture = Futures.immediateFailedFuture(new Throwable(errMsg));
326                     break;
327                 default:
328                     resultingFuture = Futures.immediateCheckedFuture(Boolean.TRUE);
329                     break;
330             }
331             return resultingFuture;
332         }
333         return null;
334     }
335
336     //TODO: Refactor twice sending deviceContext into gatheringStatistics
337     private ListenableFuture<Boolean> collectFlowStatistics(final MultipartType multipartType, final boolean initial) {
338         return devState.isFlowStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
339             statisticsGatheringOnTheFlyService,
340             getDeviceInfo(),
341             /*MultipartType.OFPMPFLOW*/ multipartType,
342             deviceContext,
343             deviceContext,
344             initial,
345             convertorExecutor,
346             statisticsWriterProvider) : emptyFuture;
347     }
348
349     private ListenableFuture<Boolean> collectTableStatistics(final MultipartType multipartType) {
350         return devState.isTableStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
351             statisticsGatheringService,
352             getDeviceInfo(),
353                 /*MultipartType.OFPMPTABLE*/ multipartType,
354             deviceContext,
355             deviceContext,
356             false,
357             convertorExecutor,
358             statisticsWriterProvider) : emptyFuture;
359     }
360
361     private ListenableFuture<Boolean> collectPortStatistics(final MultipartType multipartType) {
362         return devState.isPortStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
363             statisticsGatheringService,
364             getDeviceInfo(),
365                 /*MultipartType.OFPMPPORTSTATS*/ multipartType,
366             deviceContext,
367             deviceContext,
368             false,
369             convertorExecutor,
370             statisticsWriterProvider) : emptyFuture;
371     }
372
373     private ListenableFuture<Boolean> collectQueueStatistics(final MultipartType multipartType) {
374         return !devState.isQueueStatisticsAvailable() ? emptyFuture : StatisticsGatheringUtils.gatherStatistics(
375             statisticsGatheringService,
376             getDeviceInfo(),
377                 /*MultipartType.OFPMPQUEUE*/ multipartType,
378             deviceContext,
379             deviceContext,
380             false,
381             convertorExecutor,
382             statisticsWriterProvider);
383     }
384
385     private ListenableFuture<Boolean> collectGroupDescStatistics(final MultipartType multipartType) {
386         return devState.isGroupAvailable() ? StatisticsGatheringUtils.gatherStatistics(
387             statisticsGatheringService,
388             getDeviceInfo(),
389                 /*MultipartType.OFPMPGROUPDESC*/ multipartType,
390             deviceContext,
391             deviceContext,
392             false,
393             convertorExecutor,
394             statisticsWriterProvider) : emptyFuture;
395     }
396
397     private ListenableFuture<Boolean> collectGroupStatistics(final MultipartType multipartType) {
398         return devState.isGroupAvailable() ? StatisticsGatheringUtils.gatherStatistics(
399             statisticsGatheringService,
400             getDeviceInfo(),
401                 /*MultipartType.OFPMPGROUP*/ multipartType,
402             deviceContext,
403             deviceContext,
404             false,
405             convertorExecutor,
406             statisticsWriterProvider) : emptyFuture;
407     }
408
409     private ListenableFuture<Boolean> collectMeterConfigStatistics(final MultipartType multipartType) {
410         return devState.isMetersAvailable() ? StatisticsGatheringUtils.gatherStatistics(
411             statisticsGatheringService,
412             getDeviceInfo(),
413                 /*MultipartType.OFPMPMETERCONFIG*/ multipartType,
414             deviceContext,
415             deviceContext,
416             false,
417             convertorExecutor,
418             statisticsWriterProvider) : emptyFuture;
419     }
420
421     private ListenableFuture<Boolean> collectMeterStatistics(final MultipartType multipartType) {
422         return devState.isMetersAvailable() ? StatisticsGatheringUtils.gatherStatistics(
423             statisticsGatheringService,
424             getDeviceInfo(),
425                 /*MultipartType.OFPMPMETER*/ multipartType,
426             deviceContext,
427             deviceContext,
428             false,
429             convertorExecutor,
430             statisticsWriterProvider) : emptyFuture;
431     }
432
433     @VisibleForTesting
434     void setStatisticsGatheringService(final StatisticsGatheringService<T> statisticsGatheringService) {
435         this.statisticsGatheringService = statisticsGatheringService;
436     }
437
438     @VisibleForTesting
439     void setStatisticsGatheringOnTheFlyService(final StatisticsGatheringOnTheFlyService<T> statisticsGatheringOnTheFlyService) {
440         this.statisticsGatheringOnTheFlyService = statisticsGatheringOnTheFlyService;
441     }
442
443     @Override
444     public ItemLifecycleListener getItemLifeCycleListener () {
445         return itemLifeCycleListener;
446     }
447
448     @Override
449     public CONTEXT_STATE getState() {
450         return this.state;
451     }
452
453     @Override
454     public ServiceGroupIdentifier getServiceIdentifier() {
455         return this.deviceInfo.getServiceIdentifier();
456     }
457
458     @Override
459     public DeviceInfo getDeviceInfo() {
460         return this.deviceInfo;
461     }
462
463     @Override
464     public ListenableFuture<Void> stopClusterServices(boolean connectionInterrupted) {
465         if (CONTEXT_STATE.TERMINATION.equals(getState())) {
466             return Futures.immediateCancelledFuture();
467         }
468
469         return Futures.transform(Futures.immediateFuture(null), new Function<Object, Void>() {
470             @Nullable
471             @Override
472             public Void apply(@Nullable Object input) {
473                 schedulingEnabled = false;
474                 stopGatheringData();
475                 return null;
476             }
477         });
478     }
479
480     @Override
481     public DeviceState gainDeviceState() {
482         return gainDeviceContext().getDeviceState();
483     }
484
485     @Override
486     public DeviceContext gainDeviceContext() {
487         return this.lifecycleService.getDeviceContext();
488     }
489
490     @Override
491     public void stopGatheringData() {
492         if (Objects.nonNull(this.lastDataGathering)) {
493             if (LOG.isDebugEnabled()) {
494                 LOG.debug("Stop the running statistics gathering for node {}", this.deviceInfo.getLOGValue());
495             }
496
497             lastDataGathering.cancel(true);
498         }
499     }
500
501     @Override
502     public void setLifecycleInitializationPhaseHandler(final ClusterInitializationPhaseHandler handler) {
503         this.clusterInitializationPhaseHandler = handler;
504     }
505
506     @Override
507     public boolean onContextInstantiateService(final ConnectionContext connectionContext) {
508         if (connectionContext.getConnectionState().equals(ConnectionContext.CONNECTION_STATE.RIP)) {
509             LOG.warn("Connection on device {} was interrupted, will stop starting master services.", deviceInfo.getLOGValue());
510             return false;
511         }
512
513         LOG.info("Starting statistics context cluster services for node {}", deviceInfo.getLOGValue());
514
515         this.statListForCollectingInitialization();
516         Futures.addCallback(this.initialGatherDynamicData(), new FutureCallback<Boolean>() {
517
518             @Override
519             public void onSuccess(@Nullable Boolean aBoolean) {
520                 initialSubmitHandler.initialSubmitTransaction();
521             }
522
523             @Override
524             public void onFailure(@Nonnull Throwable throwable) {
525                 LOG.warn("Initial gathering statistics unsuccessful for node {}", deviceInfo.getLOGValue());
526                 lifecycleService.closeConnection();
527             }
528         });
529
530         if (this.isStatisticsPollingOn) {
531             myManager.startScheduling(deviceInfo);
532         }
533
534         return this.clusterInitializationPhaseHandler.onContextInstantiateService(connectionContext);
535     }
536
537     @Override
538     public void setInitialSubmitHandler(final ClusterInitializationPhaseHandler initialSubmitHandler) {
539         this.initialSubmitHandler = initialSubmitHandler;
540     }
541 }