SONAR TD - StatisticsContextImpl, StatisticsManagerImpl
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / statistics / StatisticsContextImpl.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.impl.statistics;
10
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.collect.ImmutableList;
14 import com.google.common.collect.Iterators;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.SettableFuture;
19 import io.netty.util.Timeout;
20 import java.util.ArrayList;
21 import java.util.Collection;
22 import java.util.HashSet;
23 import java.util.Iterator;
24 import java.util.List;
25 import java.util.Optional;
26 import javax.annotation.Nonnull;
27 import javax.annotation.Nullable;
28 import javax.annotation.concurrent.GuardedBy;
29 import org.opendaylight.mdsal.common.api.TransactionChainClosedException;
30 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
31 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
32 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
33 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
34 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
35 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
36 import org.opendaylight.openflowplugin.api.openflow.device.handlers.ClusterInitializationPhaseHandler;
37 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
38 import org.opendaylight.openflowplugin.api.openflow.rpc.listener.ItemLifecycleListener;
39 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
40 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
41 import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
42 import org.opendaylight.openflowplugin.impl.rpc.listener.ItemLifecycleListenerImpl;
43 import org.opendaylight.openflowplugin.impl.services.RequestContextUtil;
44 import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringOnTheFlyService;
45 import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringService;
46 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50
51 class StatisticsContextImpl implements StatisticsContext {
52
53     private static final Logger LOG = LoggerFactory.getLogger(StatisticsContextImpl.class);
54     private static final String CONNECTION_CLOSED = "Connection closed.";
55
56     private final ItemLifecycleListener itemLifeCycleListener;
57     private final Collection<RequestContext<?>> requestContexts = new HashSet<>();
58     private final DeviceContext deviceContext;
59     private final DeviceState devState;
60     private final ListenableFuture<Boolean> emptyFuture;
61     private final boolean shuttingDownStatisticsPolling;
62     private final SinglePurposeMultipartReplyTranslator multipartReplyTranslator;
63     private final Object collectionStatTypeLock = new Object();
64     @GuardedBy("collectionStatTypeLock")
65     private List<MultipartType> collectingStatType;
66
67     private StatisticsGatheringService statisticsGatheringService;
68     private StatisticsGatheringOnTheFlyService statisticsGatheringOnTheFlyService;
69     private Timeout pollTimeout;
70     private final DeviceInfo deviceInfo;
71     private final StatisticsManager myManager;
72     private final LifecycleService lifecycleService;
73
74     private volatile boolean schedulingEnabled;
75     private volatile CONTEXT_STATE state;
76     private ClusterInitializationPhaseHandler clusterInitializationPhaseHandler;
77     private ClusterInitializationPhaseHandler initialSubmitHandler;
78
79     StatisticsContextImpl(@Nonnull final DeviceInfo deviceInfo,
80                           final boolean shuttingDownStatisticsPolling,
81                           @Nonnull final LifecycleService lifecycleService,
82                           @Nonnull final ConvertorExecutor convertorExecutor,
83                           @Nonnull final StatisticsManager myManager) {
84         this.lifecycleService = lifecycleService;
85         this.deviceContext = lifecycleService.getDeviceContext();
86         this.devState = Preconditions.checkNotNull(deviceContext.getDeviceState());
87         this.shuttingDownStatisticsPolling = shuttingDownStatisticsPolling;
88         multipartReplyTranslator = new SinglePurposeMultipartReplyTranslator(convertorExecutor);
89         emptyFuture = Futures.immediateFuture(false);
90         statisticsGatheringService = new StatisticsGatheringService(this, deviceContext);
91         statisticsGatheringOnTheFlyService = new StatisticsGatheringOnTheFlyService(this, deviceContext, convertorExecutor);
92         itemLifeCycleListener = new ItemLifecycleListenerImpl(deviceContext);
93         statListForCollectingInitialization();
94         setState(CONTEXT_STATE.INITIALIZATION);
95         this.deviceInfo = deviceInfo;
96         this.myManager = myManager;
97     }
98
99     @Override
100     public void statListForCollectingInitialization() {
101         synchronized (collectionStatTypeLock) {
102             final List<MultipartType> statListForCollecting = new ArrayList<>();
103             if (devState.isTableStatisticsAvailable()) {
104                 statListForCollecting.add(MultipartType.OFPMPTABLE);
105             }
106             if (devState.isFlowStatisticsAvailable()) {
107                 statListForCollecting.add(MultipartType.OFPMPFLOW);
108             }
109             if (devState.isGroupAvailable()) {
110                 statListForCollecting.add(MultipartType.OFPMPGROUPDESC);
111                 statListForCollecting.add(MultipartType.OFPMPGROUP);
112             }
113             if (devState.isMetersAvailable()) {
114                 statListForCollecting.add(MultipartType.OFPMPMETERCONFIG);
115                 statListForCollecting.add(MultipartType.OFPMPMETER);
116             }
117             if (devState.isPortStatisticsAvailable()) {
118                 statListForCollecting.add(MultipartType.OFPMPPORTSTATS);
119             }
120             if (devState.isQueueStatisticsAvailable()) {
121                 statListForCollecting.add(MultipartType.OFPMPQUEUE);
122             }
123             collectingStatType = ImmutableList.<MultipartType>copyOf(statListForCollecting);
124         }
125     }
126
127
128     @Override
129     public ListenableFuture<Boolean> initialGatherDynamicData() {
130         return gatherDynamicData(true);
131     }
132
133     @Override
134     public ListenableFuture<Boolean> gatherDynamicData(){
135         return gatherDynamicData(false);
136     }
137
138     private ListenableFuture<Boolean> gatherDynamicData(final boolean initial) {
139         if (shuttingDownStatisticsPolling) {
140             LOG.debug("Statistics for device {} is not enabled.", getDeviceInfo().getNodeId().getValue());
141             return Futures.immediateFuture(Boolean.TRUE);
142         }
143         final ListenableFuture<Boolean> errorResultFuture = deviceConnectionCheck();
144         if (errorResultFuture != null) {
145             return errorResultFuture;
146         }
147         synchronized (collectionStatTypeLock) {
148             final Iterator<MultipartType> statIterator = collectingStatType.iterator();
149             final SettableFuture<Boolean> settableStatResultFuture = SettableFuture.create();
150
151             // write start timestamp to state snapshot container
152             StatisticsGatheringUtils.markDeviceStateSnapshotStart(deviceContext);
153
154             statChainFuture(statIterator, settableStatResultFuture, initial);
155
156             // write end timestamp to state snapshot container
157             Futures.addCallback(settableStatResultFuture, new FutureCallback<Boolean>() {
158                 @Override
159                 public void onSuccess(@Nullable final Boolean result) {
160                     StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceContext, true);
161                 }
162                 @Override
163                 public void onFailure(final Throwable t) {
164                     if (!(t instanceof TransactionChainClosedException)) {
165                         StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceContext, false);
166                     }
167                 }
168             });
169             return settableStatResultFuture;
170         }
171     }
172
173     private ListenableFuture<Boolean> chooseStat(final MultipartType multipartType, final boolean initial){
174         ListenableFuture<Boolean> result = Futures.immediateCheckedFuture(Boolean.TRUE);
175
176         switch (multipartType) {
177             case OFPMPFLOW:
178                 result = collectFlowStatistics(multipartType, initial);
179                 break;
180             case OFPMPTABLE:
181                 result = collectTableStatistics(multipartType);
182                 break;
183             case OFPMPPORTSTATS:
184                 result = collectPortStatistics(multipartType);
185                 break;
186             case OFPMPQUEUE:
187                 result = collectQueueStatistics(multipartType);
188                 break;
189             case OFPMPGROUPDESC:
190                 result = collectGroupDescStatistics(multipartType);
191                 break;
192             case OFPMPGROUP:
193                 result = collectGroupStatistics(multipartType);
194                 break;
195             case OFPMPMETERCONFIG:
196                 result = collectMeterConfigStatistics(multipartType);
197                 break;
198             case OFPMPMETER:
199                 result = collectMeterStatistics(multipartType);
200                 break;
201             default:
202                 LOG.warn("Unsupported Statistics type {}", multipartType);
203         }
204
205         return result;
206     }
207
208
209     @Override
210     public <T> RequestContext<T> createRequestContext() {
211         final AbstractRequestContext<T> ret = new AbstractRequestContext<T>(deviceInfo.reserveXidForDeviceMessage()) {
212             @Override
213             public void close() {
214                 requestContexts.remove(this);
215             }
216         };
217         requestContexts.add(ret);
218         return ret;
219     }
220
221     @Override
222     public void close() {
223         if (CONTEXT_STATE.TERMINATION.equals(getState())) {
224             if (LOG.isDebugEnabled()) {
225                 LOG.debug("Statistics context is already in state TERMINATION.");
226             }
227         } else {
228             setState(CONTEXT_STATE.TERMINATION);
229             schedulingEnabled = false;
230             for (final Iterator<RequestContext<?>> iterator = Iterators.consumingIterator(requestContexts.iterator());
231                  iterator.hasNext(); ) {
232                 RequestContextUtil.closeRequestContextWithRpcError(iterator.next(), CONNECTION_CLOSED);
233             }
234             if (null != pollTimeout && !pollTimeout.isExpired()) {
235                 pollTimeout.cancel();
236             }
237         }
238     }
239
240     @Override
241     public void setSchedulingEnabled(final boolean schedulingEnabled) {
242         this.schedulingEnabled = schedulingEnabled;
243     }
244
245     @Override
246     public boolean isSchedulingEnabled() {
247         return schedulingEnabled;
248     }
249
250     @Override
251     public void setPollTimeout(final Timeout pollTimeout) {
252         this.pollTimeout = pollTimeout;
253     }
254
255     @Override
256     public Optional<Timeout> getPollTimeout() {
257         return Optional.ofNullable(pollTimeout);
258     }
259
260     private void statChainFuture(final Iterator<MultipartType> iterator, final SettableFuture<Boolean> resultFuture, final boolean initial) {
261         if (ConnectionContext.CONNECTION_STATE.RIP.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) {
262             final String errMsg = String.format("Device connection is closed for Node : %s.",
263                     getDeviceInfo().getNodeId());
264             LOG.debug(errMsg);
265             resultFuture.setException(new IllegalStateException(errMsg));
266             return;
267         }
268         if ( ! iterator.hasNext()) {
269             resultFuture.set(Boolean.TRUE);
270             LOG.debug("Stats collection successfully finished for node {}", getDeviceInfo().getNodeId());
271             return;
272         }
273
274         final MultipartType nextType = iterator.next();
275         LOG.debug("Stats iterating to next type for node {} of type {}", getDeviceInfo().getNodeId(), nextType);
276
277         final ListenableFuture<Boolean> deviceStatisticsCollectionFuture = chooseStat(nextType, initial);
278         Futures.addCallback(deviceStatisticsCollectionFuture, new FutureCallback<Boolean>() {
279             @Override
280             public void onSuccess(final Boolean result) {
281                 statChainFuture(iterator, resultFuture, initial);
282             }
283             @Override
284             public void onFailure(@Nonnull final Throwable t) {
285                 resultFuture.setException(t);
286             }
287         });
288     }
289
290     /**
291      * Method checks a device state. It returns null for be able continue. Otherwise it returns immediateFuture
292      * which has to be returned from caller too
293      *
294      * @return
295      */
296     @VisibleForTesting
297     ListenableFuture<Boolean> deviceConnectionCheck() {
298         if (!ConnectionContext.CONNECTION_STATE.WORKING.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) {
299             ListenableFuture<Boolean> resultingFuture;
300             switch (deviceContext.getPrimaryConnectionContext().getConnectionState()) {
301                 case RIP:
302                     final String errMsg = String.format("Device connection doesn't exist anymore. Primary connection status : %s",
303                             deviceContext.getPrimaryConnectionContext().getConnectionState());
304                     resultingFuture = Futures.immediateFailedFuture(new Throwable(errMsg));
305                     break;
306                 default:
307                     resultingFuture = Futures.immediateCheckedFuture(Boolean.TRUE);
308                     break;
309             }
310             return resultingFuture;
311         }
312         return null;
313     }
314
315     //TODO: Refactor twice sending deviceContext into gatheringStatistics
316     private ListenableFuture<Boolean> collectFlowStatistics(final MultipartType multipartType, final boolean initial) {
317         return devState.isFlowStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
318                 statisticsGatheringOnTheFlyService,
319                 getDeviceInfo(),
320                 /*MultipartType.OFPMPFLOW*/ multipartType,
321                 deviceContext,
322                 deviceContext,
323                 initial, multipartReplyTranslator) : emptyFuture;
324     }
325
326     private ListenableFuture<Boolean> collectTableStatistics(final MultipartType multipartType) {
327         return devState.isTableStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
328                 statisticsGatheringService,
329                 getDeviceInfo(),
330                 /*MultipartType.OFPMPTABLE*/ multipartType,
331                 deviceContext,
332                 deviceContext,
333                 false, multipartReplyTranslator) : emptyFuture;
334     }
335
336     private ListenableFuture<Boolean> collectPortStatistics(final MultipartType multipartType) {
337         return devState.isPortStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
338                 statisticsGatheringService,
339                 getDeviceInfo(),
340                 /*MultipartType.OFPMPPORTSTATS*/ multipartType,
341                 deviceContext,
342                 deviceContext,
343                 false, multipartReplyTranslator) : emptyFuture;
344     }
345
346     private ListenableFuture<Boolean> collectQueueStatistics(final MultipartType multipartType) {
347         return !devState.isQueueStatisticsAvailable() ? emptyFuture : StatisticsGatheringUtils.gatherStatistics(
348                 statisticsGatheringService,
349                 getDeviceInfo(),
350                 /*MultipartType.OFPMPQUEUE*/ multipartType,
351                 deviceContext,
352                 deviceContext,
353                 false, multipartReplyTranslator);
354     }
355
356     private ListenableFuture<Boolean> collectGroupDescStatistics(final MultipartType multipartType) {
357         return devState.isGroupAvailable() ? StatisticsGatheringUtils.gatherStatistics(
358                 statisticsGatheringService,
359                 getDeviceInfo(),
360                 /*MultipartType.OFPMPGROUPDESC*/ multipartType,
361                 deviceContext,
362                 deviceContext,
363                 false, multipartReplyTranslator) : emptyFuture;
364     }
365
366     private ListenableFuture<Boolean> collectGroupStatistics(final MultipartType multipartType) {
367         return devState.isGroupAvailable() ? StatisticsGatheringUtils.gatherStatistics(
368                 statisticsGatheringService,
369                 getDeviceInfo(),
370                 /*MultipartType.OFPMPGROUP*/ multipartType,
371                 deviceContext,
372                 deviceContext,
373                 false, multipartReplyTranslator) : emptyFuture;
374     }
375
376     private ListenableFuture<Boolean> collectMeterConfigStatistics(final MultipartType multipartType) {
377         return devState.isMetersAvailable() ? StatisticsGatheringUtils.gatherStatistics(
378                 statisticsGatheringService,
379                 getDeviceInfo(),
380                 /*MultipartType.OFPMPMETERCONFIG*/ multipartType,
381                 deviceContext,
382                 deviceContext,
383                 false, multipartReplyTranslator) : emptyFuture;
384     }
385
386     private ListenableFuture<Boolean> collectMeterStatistics(final MultipartType multipartType) {
387         return devState.isMetersAvailable() ? StatisticsGatheringUtils.gatherStatistics(
388                 statisticsGatheringService,
389                 getDeviceInfo(),
390                 /*MultipartType.OFPMPMETER*/ multipartType,
391                 deviceContext,
392                 deviceContext,
393                 false, multipartReplyTranslator) : emptyFuture;
394     }
395
396     @VisibleForTesting
397     void setStatisticsGatheringService(final StatisticsGatheringService statisticsGatheringService) {
398         this.statisticsGatheringService = statisticsGatheringService;
399     }
400
401     @VisibleForTesting
402     void setStatisticsGatheringOnTheFlyService(final StatisticsGatheringOnTheFlyService
403                                                              statisticsGatheringOnTheFlyService) {
404         this.statisticsGatheringOnTheFlyService = statisticsGatheringOnTheFlyService;
405     }
406
407     @Override
408     public ItemLifecycleListener getItemLifeCycleListener () {
409         return itemLifeCycleListener;
410     }
411
412     @Override
413     public CONTEXT_STATE getState() {
414         return this.state;
415     }
416
417     @Override
418     public void setState(CONTEXT_STATE state) {
419         this.state = state;
420     }
421
422     @Override
423     public ServiceGroupIdentifier getServiceIdentifier() {
424         return this.deviceInfo.getServiceIdentifier();
425     }
426
427     @Override
428     public DeviceInfo getDeviceInfo() {
429         return this.deviceInfo;
430     }
431
432     @Override
433     public ListenableFuture<Void> stopClusterServices(boolean deviceDisconnected) {
434         myManager.stopScheduling(deviceInfo);
435         return Futures.immediateFuture(null);
436     }
437
438     @Override
439     public LifecycleService getLifecycleService() {
440         return lifecycleService;
441     }
442
443     @Override
444     public void setLifecycleInitializationPhaseHandler(final ClusterInitializationPhaseHandler handler) {
445         this.clusterInitializationPhaseHandler = handler;
446     }
447
448     @Override
449     public boolean onContextInstantiateService(final ConnectionContext connectionContext) {
450
451         if (connectionContext.getConnectionState().equals(ConnectionContext.CONNECTION_STATE.RIP)) {
452             LOG.warn("Connection on device {} was interrupted, will stop starting master services.", deviceInfo.getLOGValue());
453             return false;
454         }
455
456         if (!this.shuttingDownStatisticsPolling) {
457
458             LOG.info("Starting statistics context cluster services for node {}", deviceInfo.getLOGValue());
459
460             this.statListForCollectingInitialization();
461             Futures.addCallback(this.initialGatherDynamicData(), new FutureCallback<Boolean>() {
462
463                         @Override
464                         public void onSuccess(@Nullable Boolean aBoolean) {
465                             initialSubmitHandler.initialSubmitTransaction();
466                         }
467
468                         @Override
469                         public void onFailure(Throwable throwable) {
470                             LOG.warn("Initial gathering statistics unsuccessful for node {}", deviceInfo.getLOGValue());
471                             lifecycleService.closeConnection();
472                         }
473                     });
474
475                     myManager.startScheduling(deviceInfo);
476
477         }
478
479         return this.clusterInitializationPhaseHandler.onContextInstantiateService(connectionContext);
480     }
481
482     @Override
483     public void setInitialSubmitHandler(final ClusterInitializationPhaseHandler initialSubmitHandler) {
484         this.initialSubmitHandler = initialSubmitHandler;
485     }
486 }