Merge "Update comments around flat-batch service" into stable/boron
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / statistics / StatisticsContextImpl.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.impl.statistics;
10
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.collect.ImmutableList;
14 import com.google.common.collect.Iterators;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.SettableFuture;
19 import io.netty.util.Timeout;
20 import java.util.ArrayList;
21 import java.util.Collection;
22 import java.util.HashSet;
23 import java.util.Iterator;
24 import java.util.List;
25 import java.util.Objects;
26 import java.util.Optional;
27 import javax.annotation.Nonnull;
28 import javax.annotation.Nullable;
29 import javax.annotation.concurrent.GuardedBy;
30 import org.opendaylight.mdsal.common.api.TransactionChainClosedException;
31 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
32 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
33 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
34 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
35 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
36 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
37 import org.opendaylight.openflowplugin.api.openflow.device.handlers.ClusterInitializationPhaseHandler;
38 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
39 import org.opendaylight.openflowplugin.api.openflow.rpc.listener.ItemLifecycleListener;
40 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
41 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
42 import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
43 import org.opendaylight.openflowplugin.impl.rpc.listener.ItemLifecycleListenerImpl;
44 import org.opendaylight.openflowplugin.impl.services.RequestContextUtil;
45 import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringOnTheFlyService;
46 import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringService;
47 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
51
52 class StatisticsContextImpl implements StatisticsContext {
53
54     private static final Logger LOG = LoggerFactory.getLogger(StatisticsContextImpl.class);
55     private static final String CONNECTION_CLOSED = "Connection closed.";
56
57     private final ItemLifecycleListener itemLifeCycleListener;
58     private final Collection<RequestContext<?>> requestContexts = new HashSet<>();
59     private final DeviceContext deviceContext;
60     private final DeviceState devState;
61     private final ListenableFuture<Boolean> emptyFuture;
62     private final boolean shuttingDownStatisticsPolling;
63     private final Object COLLECTION_STAT_TYPE_LOCK = new Object();
64     private final SinglePurposeMultipartReplyTranslator multipartReplyTranslator;
65     @GuardedBy("COLLECTION_STAT_TYPE_LOCK")
66     private List<MultipartType> collectingStatType;
67
68     private StatisticsGatheringService statisticsGatheringService;
69     private StatisticsGatheringOnTheFlyService statisticsGatheringOnTheFlyService;
70     private Timeout pollTimeout;
71     private final DeviceInfo deviceInfo;
72     private final StatisticsManager myManager;
73     private final LifecycleService lifecycleService;
74
75     private volatile boolean schedulingEnabled;
76     private volatile CONTEXT_STATE state;
77     private ClusterInitializationPhaseHandler clusterInitializationPhaseHandler;
78     private ClusterInitializationPhaseHandler initialSubmitHandler;
79
80     private ListenableFuture<Boolean> lastDataGathering;
81
82     StatisticsContextImpl(@Nonnull final DeviceInfo deviceInfo,
83                           final boolean shuttingDownStatisticsPolling,
84                           @Nonnull final LifecycleService lifecycleService,
85                           @Nonnull final ConvertorExecutor convertorExecutor,
86                           @Nonnull final StatisticsManager myManager) {
87         this.lifecycleService = lifecycleService;
88         this.deviceContext = lifecycleService.getDeviceContext();
89         this.devState = Preconditions.checkNotNull(deviceContext.getDeviceState());
90         this.shuttingDownStatisticsPolling = shuttingDownStatisticsPolling;
91         multipartReplyTranslator = new SinglePurposeMultipartReplyTranslator(convertorExecutor);
92         emptyFuture = Futures.immediateFuture(false);
93         statisticsGatheringService = new StatisticsGatheringService(this, deviceContext);
94         statisticsGatheringOnTheFlyService = new StatisticsGatheringOnTheFlyService(this, deviceContext, convertorExecutor);
95         itemLifeCycleListener = new ItemLifecycleListenerImpl(deviceContext);
96         statListForCollectingInitialization();
97         setState(CONTEXT_STATE.INITIALIZATION);
98         this.deviceInfo = deviceInfo;
99         this.myManager = myManager;
100         this.lastDataGathering = null;
101     }
102
103     @Override
104     public void statListForCollectingInitialization() {
105         synchronized (COLLECTION_STAT_TYPE_LOCK) {
106             final List<MultipartType> statListForCollecting = new ArrayList<>();
107             if (devState.isTableStatisticsAvailable()) {
108                 statListForCollecting.add(MultipartType.OFPMPTABLE);
109             }
110             if (devState.isFlowStatisticsAvailable()) {
111                 statListForCollecting.add(MultipartType.OFPMPFLOW);
112             }
113             if (devState.isGroupAvailable()) {
114                 statListForCollecting.add(MultipartType.OFPMPGROUPDESC);
115                 statListForCollecting.add(MultipartType.OFPMPGROUP);
116             }
117             if (devState.isMetersAvailable()) {
118                 statListForCollecting.add(MultipartType.OFPMPMETERCONFIG);
119                 statListForCollecting.add(MultipartType.OFPMPMETER);
120             }
121             if (devState.isPortStatisticsAvailable()) {
122                 statListForCollecting.add(MultipartType.OFPMPPORTSTATS);
123             }
124             if (devState.isQueueStatisticsAvailable()) {
125                 statListForCollecting.add(MultipartType.OFPMPQUEUE);
126             }
127             collectingStatType = ImmutableList.<MultipartType>copyOf(statListForCollecting);
128         }
129     }
130
131
132     @Override
133     public ListenableFuture<Boolean> initialGatherDynamicData() {
134         return gatherDynamicData(true);
135     }
136
137     @Override
138     public ListenableFuture<Boolean> gatherDynamicData(){
139         return gatherDynamicData(false);
140     }
141
142     private ListenableFuture<Boolean> gatherDynamicData(final boolean initial) {
143         this.lastDataGathering = null;
144         if (shuttingDownStatisticsPolling) {
145             LOG.debug("Statistics for device {} is not enabled.", getDeviceInfo().getNodeId().getValue());
146             return Futures.immediateFuture(Boolean.TRUE);
147         }
148         final ListenableFuture<Boolean> errorResultFuture = deviceConnectionCheck();
149         if (errorResultFuture != null) {
150             return errorResultFuture;
151         }
152         synchronized (COLLECTION_STAT_TYPE_LOCK) {
153             final Iterator<MultipartType> statIterator = collectingStatType.iterator();
154             final SettableFuture<Boolean> settableStatResultFuture = SettableFuture.create();
155
156             // write start timestamp to state snapshot container
157             StatisticsGatheringUtils.markDeviceStateSnapshotStart(deviceContext);
158
159             statChainFuture(statIterator, settableStatResultFuture, initial);
160
161             // write end timestamp to state snapshot container
162             Futures.addCallback(settableStatResultFuture, new FutureCallback<Boolean>() {
163                 @Override
164                 public void onSuccess(@Nullable final Boolean result) {
165                     StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceContext, true);
166                 }
167                 @Override
168                 public void onFailure(final Throwable t) {
169                     if (!(t instanceof TransactionChainClosedException)) {
170                         StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceContext, false);
171                     }
172                 }
173             });
174             this.lastDataGathering = settableStatResultFuture;
175             return settableStatResultFuture;
176         }
177     }
178
179     private ListenableFuture<Boolean> chooseStat(final MultipartType multipartType, final boolean initial){
180         switch (multipartType) {
181             case OFPMPFLOW:
182                 return collectFlowStatistics(multipartType, initial);
183             case OFPMPTABLE:
184                 return collectTableStatistics(multipartType);
185             case OFPMPPORTSTATS:
186                 return collectPortStatistics(multipartType);
187             case OFPMPQUEUE:
188                 return collectQueueStatistics(multipartType);
189             case OFPMPGROUPDESC:
190                 return collectGroupDescStatistics(multipartType);
191             case OFPMPGROUP:
192                 return collectGroupStatistics(multipartType);
193             case OFPMPMETERCONFIG:
194                 return collectMeterConfigStatistics(multipartType);
195             case OFPMPMETER:
196                 return collectMeterStatistics(multipartType);
197             default:
198                 LOG.warn("Unsuported Statistics type {}", multipartType);
199                 return Futures.immediateCheckedFuture(Boolean.TRUE);
200         }
201     }
202
203
204     @Override
205     public <T> RequestContext<T> createRequestContext() {
206         final AbstractRequestContext<T> ret = new AbstractRequestContext<T>(deviceInfo.reserveXidForDeviceMessage()) {
207             @Override
208             public void close() {
209                 requestContexts.remove(this);
210             }
211         };
212         requestContexts.add(ret);
213         return ret;
214     }
215
216     @Override
217     public void close() {
218         if (CONTEXT_STATE.TERMINATION.equals(getState())) {
219             if (LOG.isDebugEnabled()) {
220                 LOG.debug("Statistics context is already in state TERMINATION.");
221             }
222         } else {
223             stopGatheringData();
224             setState(CONTEXT_STATE.TERMINATION);
225             schedulingEnabled = false;
226             for (final Iterator<RequestContext<?>> iterator = Iterators.consumingIterator(requestContexts.iterator());
227                  iterator.hasNext(); ) {
228                 RequestContextUtil.closeRequestContextWithRpcError(iterator.next(), CONNECTION_CLOSED);
229             }
230             if (null != pollTimeout && !pollTimeout.isExpired()) {
231                 pollTimeout.cancel();
232             }
233         }
234     }
235
236     @Override
237     public void setSchedulingEnabled(final boolean schedulingEnabled) {
238         this.schedulingEnabled = schedulingEnabled;
239     }
240
241     @Override
242     public boolean isSchedulingEnabled() {
243         return schedulingEnabled;
244     }
245
246     @Override
247     public void setPollTimeout(final Timeout pollTimeout) {
248         this.pollTimeout = pollTimeout;
249     }
250
251     @Override
252     public Optional<Timeout> getPollTimeout() {
253         return Optional.ofNullable(pollTimeout);
254     }
255
256     private void statChainFuture(final Iterator<MultipartType> iterator, final SettableFuture<Boolean> resultFuture, final boolean initial) {
257         if (ConnectionContext.CONNECTION_STATE.RIP.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) {
258             final String errMsg = String.format("Device connection is closed for Node : %s.",
259                     getDeviceInfo().getNodeId());
260             LOG.debug(errMsg);
261             resultFuture.setException(new IllegalStateException(errMsg));
262             return;
263         }
264         if ( ! iterator.hasNext()) {
265             resultFuture.set(Boolean.TRUE);
266             LOG.debug("Stats collection successfully finished for node {}", getDeviceInfo().getLOGValue());
267             return;
268         }
269
270         final MultipartType nextType = iterator.next();
271         LOG.debug("Stats iterating to next type for node {} of type {}", getDeviceInfo().getLOGValue(), nextType);
272
273         final ListenableFuture<Boolean> deviceStatisticsCollectionFuture = chooseStat(nextType, initial);
274         Futures.addCallback(deviceStatisticsCollectionFuture, new FutureCallback<Boolean>() {
275             @Override
276             public void onSuccess(final Boolean result) {
277                 statChainFuture(iterator, resultFuture, initial);
278             }
279             @Override
280             public void onFailure(@Nonnull final Throwable t) {
281                 resultFuture.setException(t);
282             }
283         });
284     }
285
286     /**
287      * Method checks a device state. It returns null for be able continue. Otherwise it returns immediateFuture
288      * which has to be returned from caller too
289      *
290      * @return
291      */
292     @VisibleForTesting
293     ListenableFuture<Boolean> deviceConnectionCheck() {
294         if (!ConnectionContext.CONNECTION_STATE.WORKING.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) {
295             ListenableFuture<Boolean> resultingFuture;
296             switch (deviceContext.getPrimaryConnectionContext().getConnectionState()) {
297                 case RIP:
298                     final String errMsg = String.format("Device connection doesn't exist anymore. Primary connection status : %s",
299                             deviceContext.getPrimaryConnectionContext().getConnectionState());
300                     resultingFuture = Futures.immediateFailedFuture(new Throwable(errMsg));
301                     break;
302                 default:
303                     resultingFuture = Futures.immediateCheckedFuture(Boolean.TRUE);
304                     break;
305             }
306             return resultingFuture;
307         }
308         return null;
309     }
310
311     //TODO: Refactor twice sending deviceContext into gatheringStatistics
312     private ListenableFuture<Boolean> collectFlowStatistics(final MultipartType multipartType, final boolean initial) {
313         return devState.isFlowStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
314                 statisticsGatheringOnTheFlyService,
315                 getDeviceInfo(),
316                 /*MultipartType.OFPMPFLOW*/ multipartType,
317                 deviceContext,
318                 deviceContext,
319                 initial, multipartReplyTranslator) : emptyFuture;
320     }
321
322     private ListenableFuture<Boolean> collectTableStatistics(final MultipartType multipartType) {
323         return devState.isTableStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
324                 statisticsGatheringService,
325                 getDeviceInfo(),
326                 /*MultipartType.OFPMPTABLE*/ multipartType,
327                 deviceContext,
328                 deviceContext,
329                 false, multipartReplyTranslator) : emptyFuture;
330     }
331
332     private ListenableFuture<Boolean> collectPortStatistics(final MultipartType multipartType) {
333         return devState.isPortStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
334                 statisticsGatheringService,
335                 getDeviceInfo(),
336                 /*MultipartType.OFPMPPORTSTATS*/ multipartType,
337                 deviceContext,
338                 deviceContext,
339                 false, multipartReplyTranslator) : emptyFuture;
340     }
341
342     private ListenableFuture<Boolean> collectQueueStatistics(final MultipartType multipartType) {
343         return !devState.isQueueStatisticsAvailable() ? emptyFuture : StatisticsGatheringUtils.gatherStatistics(
344                 statisticsGatheringService,
345                 getDeviceInfo(),
346                 /*MultipartType.OFPMPQUEUE*/ multipartType,
347                 deviceContext,
348                 deviceContext,
349                 false, multipartReplyTranslator);
350     }
351
352     private ListenableFuture<Boolean> collectGroupDescStatistics(final MultipartType multipartType) {
353         return devState.isGroupAvailable() ? StatisticsGatheringUtils.gatherStatistics(
354                 statisticsGatheringService,
355                 getDeviceInfo(),
356                 /*MultipartType.OFPMPGROUPDESC*/ multipartType,
357                 deviceContext,
358                 deviceContext,
359                 false, multipartReplyTranslator) : emptyFuture;
360     }
361
362     private ListenableFuture<Boolean> collectGroupStatistics(final MultipartType multipartType) {
363         return devState.isGroupAvailable() ? StatisticsGatheringUtils.gatherStatistics(
364                 statisticsGatheringService,
365                 getDeviceInfo(),
366                 /*MultipartType.OFPMPGROUP*/ multipartType,
367                 deviceContext,
368                 deviceContext,
369                 false, multipartReplyTranslator) : emptyFuture;
370     }
371
372     private ListenableFuture<Boolean> collectMeterConfigStatistics(final MultipartType multipartType) {
373         return devState.isMetersAvailable() ? StatisticsGatheringUtils.gatherStatistics(
374                 statisticsGatheringService,
375                 getDeviceInfo(),
376                 /*MultipartType.OFPMPMETERCONFIG*/ multipartType,
377                 deviceContext,
378                 deviceContext,
379                 false, multipartReplyTranslator) : emptyFuture;
380     }
381
382     private ListenableFuture<Boolean> collectMeterStatistics(final MultipartType multipartType) {
383         return devState.isMetersAvailable() ? StatisticsGatheringUtils.gatherStatistics(
384                 statisticsGatheringService,
385                 getDeviceInfo(),
386                 /*MultipartType.OFPMPMETER*/ multipartType,
387                 deviceContext,
388                 deviceContext,
389                 false, multipartReplyTranslator) : emptyFuture;
390     }
391
392     @VisibleForTesting
393     void setStatisticsGatheringService(final StatisticsGatheringService statisticsGatheringService) {
394         this.statisticsGatheringService = statisticsGatheringService;
395     }
396
397     @VisibleForTesting
398     void setStatisticsGatheringOnTheFlyService(final StatisticsGatheringOnTheFlyService
399                                                              statisticsGatheringOnTheFlyService) {
400         this.statisticsGatheringOnTheFlyService = statisticsGatheringOnTheFlyService;
401     }
402
403     @Override
404     public ItemLifecycleListener getItemLifeCycleListener () {
405         return itemLifeCycleListener;
406     }
407
408     @Override
409     public CONTEXT_STATE getState() {
410         return this.state;
411     }
412
413     @Override
414     public void setState(CONTEXT_STATE state) {
415         this.state = state;
416     }
417
418     @Override
419     public ServiceGroupIdentifier getServiceIdentifier() {
420         return this.deviceInfo.getServiceIdentifier();
421     }
422
423     @Override
424     public DeviceInfo getDeviceInfo() {
425         return this.deviceInfo;
426     }
427
428     @Override
429     public ListenableFuture<Void> stopClusterServices(boolean deviceDisconnected) {
430         stopGatheringData();
431         myManager.stopScheduling(deviceInfo);
432         return Futures.immediateFuture(null);
433     }
434
435     @Override
436     public DeviceState gainDeviceState() {
437         return gainDeviceContext().getDeviceState();
438     }
439
440     @Override
441     public DeviceContext gainDeviceContext() {
442         return this.lifecycleService.getDeviceContext();
443     }
444
445     @Override
446     public void stopGatheringData() {
447         if (Objects.nonNull(this.lastDataGathering)) {
448             if (LOG.isDebugEnabled()) {
449                 LOG.debug("Stop the running statistics gathering for node {}", this.deviceInfo.getLOGValue());
450             }
451             this.lastDataGathering.cancel(true);
452         }
453     }
454
455     @Override
456     public void setLifecycleInitializationPhaseHandler(final ClusterInitializationPhaseHandler handler) {
457         this.clusterInitializationPhaseHandler = handler;
458     }
459
460     @Override
461     public boolean onContextInstantiateService(final ConnectionContext connectionContext) {
462
463         if (connectionContext.getConnectionState().equals(ConnectionContext.CONNECTION_STATE.RIP)) {
464             LOG.warn("Connection on device {} was interrupted, will stop starting master services.", deviceInfo.getLOGValue());
465             return false;
466         }
467
468         if (!this.shuttingDownStatisticsPolling) {
469
470             LOG.info("Starting statistics context cluster services for node {}", deviceInfo.getLOGValue());
471
472             this.statListForCollectingInitialization();
473             Futures.addCallback(this.initialGatherDynamicData(), new FutureCallback<Boolean>() {
474
475                         @Override
476                         public void onSuccess(@Nullable Boolean aBoolean) {
477                             initialSubmitHandler.initialSubmitTransaction();
478                         }
479
480                         @Override
481                         public void onFailure(Throwable throwable) {
482                             LOG.warn("Initial gathering statistics unsuccessful for node {}", deviceInfo.getLOGValue());
483                             lifecycleService.closeConnection();
484                         }
485                     });
486
487                     myManager.startScheduling(deviceInfo);
488
489         }
490
491         return this.clusterInitializationPhaseHandler.onContextInstantiateService(connectionContext);
492     }
493
494     @Override
495     public void setInitialSubmitHandler(final ClusterInitializationPhaseHandler initialSubmitHandler) {
496         this.initialSubmitHandler = initialSubmitHandler;
497     }
498 }