Merge "BUG-6890:"
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / statistics / StatisticsContextImpl.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.impl.statistics;
10
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.collect.ImmutableList;
14 import com.google.common.collect.Iterators;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.SettableFuture;
19 import io.netty.util.Timeout;
20 import java.util.ArrayList;
21 import java.util.Collection;
22 import java.util.HashSet;
23 import java.util.Iterator;
24 import java.util.List;
25 import java.util.Objects;
26 import java.util.Optional;
27 import javax.annotation.Nonnull;
28 import javax.annotation.Nullable;
29 import javax.annotation.concurrent.GuardedBy;
30 import org.opendaylight.mdsal.common.api.TransactionChainClosedException;
31 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
32 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
33 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
34 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
35 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
36 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
37 import org.opendaylight.openflowplugin.api.openflow.device.handlers.ClusterInitializationPhaseHandler;
38 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
39 import org.opendaylight.openflowplugin.api.openflow.rpc.listener.ItemLifecycleListener;
40 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
41 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
42 import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
43 import org.opendaylight.openflowplugin.impl.rpc.listener.ItemLifecycleListenerImpl;
44 import org.opendaylight.openflowplugin.impl.services.RequestContextUtil;
45 import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringOnTheFlyService;
46 import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringService;
47 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
51
52 class StatisticsContextImpl implements StatisticsContext {
53
54     private static final Logger LOG = LoggerFactory.getLogger(StatisticsContextImpl.class);
55     private static final String CONNECTION_CLOSED = "Connection closed.";
56
57     private final ItemLifecycleListener itemLifeCycleListener;
58     private final Collection<RequestContext<?>> requestContexts = new HashSet<>();
59     private final DeviceContext deviceContext;
60     private final DeviceState devState;
61     private final ListenableFuture<Boolean> emptyFuture;
62     private final boolean shuttingDownStatisticsPolling;
63     private final SinglePurposeMultipartReplyTranslator multipartReplyTranslator;
64     private final Object collectionStatTypeLock = new Object();
65     @GuardedBy("collectionStatTypeLock")
66     private List<MultipartType> collectingStatType;
67
68     private StatisticsGatheringService statisticsGatheringService;
69     private StatisticsGatheringOnTheFlyService statisticsGatheringOnTheFlyService;
70     private Timeout pollTimeout;
71     private final DeviceInfo deviceInfo;
72     private final StatisticsManager myManager;
73     private final LifecycleService lifecycleService;
74
75     private volatile boolean schedulingEnabled;
76     private volatile CONTEXT_STATE state;
77     private ClusterInitializationPhaseHandler clusterInitializationPhaseHandler;
78     private ClusterInitializationPhaseHandler initialSubmitHandler;
79
80     private ListenableFuture<Boolean> lastDataGathering;
81
82     StatisticsContextImpl(@Nonnull final DeviceInfo deviceInfo,
83                           final boolean shuttingDownStatisticsPolling,
84                           @Nonnull final LifecycleService lifecycleService,
85                           @Nonnull final ConvertorExecutor convertorExecutor,
86                           @Nonnull final StatisticsManager myManager) {
87         this.lifecycleService = lifecycleService;
88         this.deviceContext = lifecycleService.getDeviceContext();
89         this.devState = Preconditions.checkNotNull(deviceContext.getDeviceState());
90         this.shuttingDownStatisticsPolling = shuttingDownStatisticsPolling;
91         multipartReplyTranslator = new SinglePurposeMultipartReplyTranslator(convertorExecutor);
92         emptyFuture = Futures.immediateFuture(false);
93         statisticsGatheringService = new StatisticsGatheringService(this, deviceContext);
94         statisticsGatheringOnTheFlyService = new StatisticsGatheringOnTheFlyService(this, deviceContext, convertorExecutor);
95         itemLifeCycleListener = new ItemLifecycleListenerImpl(deviceContext);
96         statListForCollectingInitialization();
97         setState(CONTEXT_STATE.INITIALIZATION);
98         this.deviceInfo = deviceInfo;
99         this.myManager = myManager;
100         this.lastDataGathering = null;
101     }
102
103     @Override
104     public void statListForCollectingInitialization() {
105         synchronized (collectionStatTypeLock) {
106             final List<MultipartType> statListForCollecting = new ArrayList<>();
107             if (devState.isTableStatisticsAvailable()) {
108                 statListForCollecting.add(MultipartType.OFPMPTABLE);
109             }
110             if (devState.isFlowStatisticsAvailable()) {
111                 statListForCollecting.add(MultipartType.OFPMPFLOW);
112             }
113             if (devState.isGroupAvailable()) {
114                 statListForCollecting.add(MultipartType.OFPMPGROUPDESC);
115                 statListForCollecting.add(MultipartType.OFPMPGROUP);
116             }
117             if (devState.isMetersAvailable()) {
118                 statListForCollecting.add(MultipartType.OFPMPMETERCONFIG);
119                 statListForCollecting.add(MultipartType.OFPMPMETER);
120             }
121             if (devState.isPortStatisticsAvailable()) {
122                 statListForCollecting.add(MultipartType.OFPMPPORTSTATS);
123             }
124             if (devState.isQueueStatisticsAvailable()) {
125                 statListForCollecting.add(MultipartType.OFPMPQUEUE);
126             }
127             collectingStatType = ImmutableList.<MultipartType>copyOf(statListForCollecting);
128         }
129     }
130
131
132     @Override
133     public ListenableFuture<Boolean> initialGatherDynamicData() {
134         return gatherDynamicData(true);
135     }
136
137     @Override
138     public ListenableFuture<Boolean> gatherDynamicData(){
139         return gatherDynamicData(false);
140     }
141
142     private ListenableFuture<Boolean> gatherDynamicData(final boolean initial) {
143         this.lastDataGathering = null;
144         if (shuttingDownStatisticsPolling) {
145             LOG.debug("Statistics for device {} is not enabled.", getDeviceInfo().getNodeId().getValue());
146             return Futures.immediateFuture(Boolean.TRUE);
147         }
148         final ListenableFuture<Boolean> errorResultFuture = deviceConnectionCheck();
149         if (errorResultFuture != null) {
150             return errorResultFuture;
151         }
152         synchronized (collectionStatTypeLock) {
153             final Iterator<MultipartType> statIterator = collectingStatType.iterator();
154             final SettableFuture<Boolean> settableStatResultFuture = SettableFuture.create();
155
156             // write start timestamp to state snapshot container
157             StatisticsGatheringUtils.markDeviceStateSnapshotStart(deviceContext);
158
159             statChainFuture(statIterator, settableStatResultFuture, initial);
160
161             // write end timestamp to state snapshot container
162             Futures.addCallback(settableStatResultFuture, new FutureCallback<Boolean>() {
163                 @Override
164                 public void onSuccess(@Nullable final Boolean result) {
165                     StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceContext, true);
166                 }
167                 @Override
168                 public void onFailure(final Throwable t) {
169                     if (!(t instanceof TransactionChainClosedException)) {
170                         StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceContext, false);
171                     }
172                 }
173             });
174             this.lastDataGathering = settableStatResultFuture;
175             return settableStatResultFuture;
176         }
177     }
178
179     private ListenableFuture<Boolean> chooseStat(final MultipartType multipartType, final boolean initial){
180         ListenableFuture<Boolean> result = Futures.immediateCheckedFuture(Boolean.TRUE);
181
182         switch (multipartType) {
183             case OFPMPFLOW:
184                 result = collectFlowStatistics(multipartType, initial);
185                 break;
186             case OFPMPTABLE:
187                 result = collectTableStatistics(multipartType);
188                 break;
189             case OFPMPPORTSTATS:
190                 result = collectPortStatistics(multipartType);
191                 break;
192             case OFPMPQUEUE:
193                 result = collectQueueStatistics(multipartType);
194                 break;
195             case OFPMPGROUPDESC:
196                 result = collectGroupDescStatistics(multipartType);
197                 break;
198             case OFPMPGROUP:
199                 result = collectGroupStatistics(multipartType);
200                 break;
201             case OFPMPMETERCONFIG:
202                 result = collectMeterConfigStatistics(multipartType);
203                 break;
204             case OFPMPMETER:
205                 result = collectMeterStatistics(multipartType);
206                 break;
207             default:
208                 LOG.warn("Unsupported Statistics type {}", multipartType);
209         }
210
211         return result;
212     }
213
214
215     @Override
216     public <T> RequestContext<T> createRequestContext() {
217         final AbstractRequestContext<T> ret = new AbstractRequestContext<T>(deviceInfo.reserveXidForDeviceMessage()) {
218             @Override
219             public void close() {
220                 requestContexts.remove(this);
221             }
222         };
223         requestContexts.add(ret);
224         return ret;
225     }
226
227     @Override
228     public void close() {
229         if (CONTEXT_STATE.TERMINATION.equals(getState())) {
230             if (LOG.isDebugEnabled()) {
231                 LOG.debug("Statistics context is already in state TERMINATION.");
232             }
233         } else {
234             stopGatheringData();
235             setState(CONTEXT_STATE.TERMINATION);
236             schedulingEnabled = false;
237             for (final Iterator<RequestContext<?>> iterator = Iterators.consumingIterator(requestContexts.iterator());
238                  iterator.hasNext(); ) {
239                 RequestContextUtil.closeRequestContextWithRpcError(iterator.next(), CONNECTION_CLOSED);
240             }
241             if (null != pollTimeout && !pollTimeout.isExpired()) {
242                 pollTimeout.cancel();
243             }
244         }
245     }
246
247     @Override
248     public void setSchedulingEnabled(final boolean schedulingEnabled) {
249         this.schedulingEnabled = schedulingEnabled;
250     }
251
252     @Override
253     public boolean isSchedulingEnabled() {
254         return schedulingEnabled;
255     }
256
257     @Override
258     public void setPollTimeout(final Timeout pollTimeout) {
259         this.pollTimeout = pollTimeout;
260     }
261
262     @Override
263     public Optional<Timeout> getPollTimeout() {
264         return Optional.ofNullable(pollTimeout);
265     }
266
267     private void statChainFuture(final Iterator<MultipartType> iterator, final SettableFuture<Boolean> resultFuture, final boolean initial) {
268         if (ConnectionContext.CONNECTION_STATE.RIP.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) {
269             final String errMsg = String.format("Device connection is closed for Node : %s.",
270                     getDeviceInfo().getNodeId());
271             LOG.debug(errMsg);
272             resultFuture.setException(new IllegalStateException(errMsg));
273             return;
274         }
275         if ( ! iterator.hasNext()) {
276             resultFuture.set(Boolean.TRUE);
277             LOG.debug("Stats collection successfully finished for node {}", getDeviceInfo().getLOGValue());
278             return;
279         }
280
281         final MultipartType nextType = iterator.next();
282         LOG.debug("Stats iterating to next type for node {} of type {}", getDeviceInfo().getLOGValue(), nextType);
283
284         final ListenableFuture<Boolean> deviceStatisticsCollectionFuture = chooseStat(nextType, initial);
285         Futures.addCallback(deviceStatisticsCollectionFuture, new FutureCallback<Boolean>() {
286             @Override
287             public void onSuccess(final Boolean result) {
288                 statChainFuture(iterator, resultFuture, initial);
289             }
290             @Override
291             public void onFailure(@Nonnull final Throwable t) {
292                 resultFuture.setException(t);
293             }
294         });
295     }
296
297     /**
298      * Method checks a device state. It returns null for be able continue. Otherwise it returns immediateFuture
299      * which has to be returned from caller too
300      *
301      * @return
302      */
303     @VisibleForTesting
304     ListenableFuture<Boolean> deviceConnectionCheck() {
305         if (!ConnectionContext.CONNECTION_STATE.WORKING.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) {
306             ListenableFuture<Boolean> resultingFuture;
307             switch (deviceContext.getPrimaryConnectionContext().getConnectionState()) {
308                 case RIP:
309                     final String errMsg = String.format("Device connection doesn't exist anymore. Primary connection status : %s",
310                             deviceContext.getPrimaryConnectionContext().getConnectionState());
311                     resultingFuture = Futures.immediateFailedFuture(new Throwable(errMsg));
312                     break;
313                 default:
314                     resultingFuture = Futures.immediateCheckedFuture(Boolean.TRUE);
315                     break;
316             }
317             return resultingFuture;
318         }
319         return null;
320     }
321
322     //TODO: Refactor twice sending deviceContext into gatheringStatistics
323     private ListenableFuture<Boolean> collectFlowStatistics(final MultipartType multipartType, final boolean initial) {
324         return devState.isFlowStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
325                 statisticsGatheringOnTheFlyService,
326                 getDeviceInfo(),
327                 /*MultipartType.OFPMPFLOW*/ multipartType,
328                 deviceContext,
329                 deviceContext,
330                 initial, multipartReplyTranslator) : emptyFuture;
331     }
332
333     private ListenableFuture<Boolean> collectTableStatistics(final MultipartType multipartType) {
334         return devState.isTableStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
335                 statisticsGatheringService,
336                 getDeviceInfo(),
337                 /*MultipartType.OFPMPTABLE*/ multipartType,
338                 deviceContext,
339                 deviceContext,
340                 false, multipartReplyTranslator) : emptyFuture;
341     }
342
343     private ListenableFuture<Boolean> collectPortStatistics(final MultipartType multipartType) {
344         return devState.isPortStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
345                 statisticsGatheringService,
346                 getDeviceInfo(),
347                 /*MultipartType.OFPMPPORTSTATS*/ multipartType,
348                 deviceContext,
349                 deviceContext,
350                 false, multipartReplyTranslator) : emptyFuture;
351     }
352
353     private ListenableFuture<Boolean> collectQueueStatistics(final MultipartType multipartType) {
354         return !devState.isQueueStatisticsAvailable() ? emptyFuture : StatisticsGatheringUtils.gatherStatistics(
355                 statisticsGatheringService,
356                 getDeviceInfo(),
357                 /*MultipartType.OFPMPQUEUE*/ multipartType,
358                 deviceContext,
359                 deviceContext,
360                 false, multipartReplyTranslator);
361     }
362
363     private ListenableFuture<Boolean> collectGroupDescStatistics(final MultipartType multipartType) {
364         return devState.isGroupAvailable() ? StatisticsGatheringUtils.gatherStatistics(
365                 statisticsGatheringService,
366                 getDeviceInfo(),
367                 /*MultipartType.OFPMPGROUPDESC*/ multipartType,
368                 deviceContext,
369                 deviceContext,
370                 false, multipartReplyTranslator) : emptyFuture;
371     }
372
373     private ListenableFuture<Boolean> collectGroupStatistics(final MultipartType multipartType) {
374         return devState.isGroupAvailable() ? StatisticsGatheringUtils.gatherStatistics(
375                 statisticsGatheringService,
376                 getDeviceInfo(),
377                 /*MultipartType.OFPMPGROUP*/ multipartType,
378                 deviceContext,
379                 deviceContext,
380                 false, multipartReplyTranslator) : emptyFuture;
381     }
382
383     private ListenableFuture<Boolean> collectMeterConfigStatistics(final MultipartType multipartType) {
384         return devState.isMetersAvailable() ? StatisticsGatheringUtils.gatherStatistics(
385                 statisticsGatheringService,
386                 getDeviceInfo(),
387                 /*MultipartType.OFPMPMETERCONFIG*/ multipartType,
388                 deviceContext,
389                 deviceContext,
390                 false, multipartReplyTranslator) : emptyFuture;
391     }
392
393     private ListenableFuture<Boolean> collectMeterStatistics(final MultipartType multipartType) {
394         return devState.isMetersAvailable() ? StatisticsGatheringUtils.gatherStatistics(
395                 statisticsGatheringService,
396                 getDeviceInfo(),
397                 /*MultipartType.OFPMPMETER*/ multipartType,
398                 deviceContext,
399                 deviceContext,
400                 false, multipartReplyTranslator) : emptyFuture;
401     }
402
403     @VisibleForTesting
404     void setStatisticsGatheringService(final StatisticsGatheringService statisticsGatheringService) {
405         this.statisticsGatheringService = statisticsGatheringService;
406     }
407
408     @VisibleForTesting
409     void setStatisticsGatheringOnTheFlyService(final StatisticsGatheringOnTheFlyService
410                                                              statisticsGatheringOnTheFlyService) {
411         this.statisticsGatheringOnTheFlyService = statisticsGatheringOnTheFlyService;
412     }
413
414     @Override
415     public ItemLifecycleListener getItemLifeCycleListener () {
416         return itemLifeCycleListener;
417     }
418
419     @Override
420     public CONTEXT_STATE getState() {
421         return this.state;
422     }
423
424     @Override
425     public void setState(CONTEXT_STATE state) {
426         this.state = state;
427     }
428
429     @Override
430     public ServiceGroupIdentifier getServiceIdentifier() {
431         return this.deviceInfo.getServiceIdentifier();
432     }
433
434     @Override
435     public DeviceInfo getDeviceInfo() {
436         return this.deviceInfo;
437     }
438
439     @Override
440     public ListenableFuture<Void> stopClusterServices(boolean deviceDisconnected) {
441         stopGatheringData();
442         myManager.stopScheduling(deviceInfo);
443         return Futures.immediateFuture(null);
444     }
445
446     @Override
447     public DeviceState gainDeviceState() {
448         return gainDeviceContext().getDeviceState();
449     }
450
451     @Override
452     public DeviceContext gainDeviceContext() {
453         return this.lifecycleService.getDeviceContext();
454     }
455
456     @Override
457     public void stopGatheringData() {
458         if (Objects.nonNull(this.lastDataGathering)) {
459             if (LOG.isDebugEnabled()) {
460                 LOG.debug("Stop the running statistics gathering for node {}", this.deviceInfo.getLOGValue());
461             }
462             this.lastDataGathering.cancel(true);
463         }
464     }
465
466     @Override
467     public void setLifecycleInitializationPhaseHandler(final ClusterInitializationPhaseHandler handler) {
468         this.clusterInitializationPhaseHandler = handler;
469     }
470
471     @Override
472     public boolean onContextInstantiateService(final ConnectionContext connectionContext) {
473
474         if (connectionContext.getConnectionState().equals(ConnectionContext.CONNECTION_STATE.RIP)) {
475             LOG.warn("Connection on device {} was interrupted, will stop starting master services.", deviceInfo.getLOGValue());
476             return false;
477         }
478
479         LOG.info("Starting statistics context cluster services for node {}", deviceInfo.getLOGValue());
480
481         this.statListForCollectingInitialization();
482         Futures.addCallback(this.initialGatherDynamicData(), new FutureCallback<Boolean>() {
483
484             @Override
485             public void onSuccess(@Nullable Boolean aBoolean) {
486                 initialSubmitHandler.initialSubmitTransaction();
487             }
488
489             @Override
490             public void onFailure(Throwable throwable) {
491                 LOG.warn("Initial gathering statistics unsuccessful for node {}", deviceInfo.getLOGValue());
492                 lifecycleService.closeConnection();
493             }
494         });
495
496         if (!this.shuttingDownStatisticsPolling) {
497             myManager.startScheduling(deviceInfo);
498         }
499
500         return this.clusterInitializationPhaseHandler.onContextInstantiateService(connectionContext);
501     }
502
503     @Override
504     public void setInitialSubmitHandler(final ClusterInitializationPhaseHandler initialSubmitHandler) {
505         this.initialSubmitHandler = initialSubmitHandler;
506     }
507 }