eab57c27958dd96bb539a608fb38a525553115a9
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / statistics / StatisticsContextImpl.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.impl.statistics;
10
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Function;
13 import com.google.common.base.Preconditions;
14 import com.google.common.collect.ImmutableList;
15 import com.google.common.collect.Iterators;
16 import com.google.common.util.concurrent.FutureCallback;
17 import com.google.common.util.concurrent.Futures;
18 import com.google.common.util.concurrent.ListenableFuture;
19 import com.google.common.util.concurrent.SettableFuture;
20 import io.netty.util.Timeout;
21 import java.util.ArrayList;
22 import java.util.Collection;
23 import java.util.HashSet;
24 import java.util.Iterator;
25 import java.util.List;
26 import java.util.Objects;
27 import java.util.Optional;
28 import javax.annotation.Nonnull;
29 import javax.annotation.Nullable;
30 import javax.annotation.concurrent.GuardedBy;
31 import org.opendaylight.mdsal.common.api.TransactionChainClosedException;
32 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
33 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
34 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
35 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
36 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
37 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
38 import org.opendaylight.openflowplugin.api.openflow.device.handlers.ClusterInitializationPhaseHandler;
39 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
40 import org.opendaylight.openflowplugin.api.openflow.rpc.listener.ItemLifecycleListener;
41 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
42 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
43 import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
44 import org.opendaylight.openflowplugin.impl.rpc.listener.ItemLifecycleListenerImpl;
45 import org.opendaylight.openflowplugin.impl.services.RequestContextUtil;
46 import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringOnTheFlyService;
47 import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringService;
48 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
52
53 class StatisticsContextImpl implements StatisticsContext {
54
55     private static final Logger LOG = LoggerFactory.getLogger(StatisticsContextImpl.class);
56     private static final String CONNECTION_CLOSED = "Connection closed.";
57
58     private final ItemLifecycleListener itemLifeCycleListener;
59     private final Collection<RequestContext<?>> requestContexts = new HashSet<>();
60     private final DeviceContext deviceContext;
61     private final DeviceState devState;
62     private final ListenableFuture<Boolean> emptyFuture;
63     private final boolean isStatisticsPollingOn;
64     private final SinglePurposeMultipartReplyTranslator multipartReplyTranslator;
65     private final Object collectionStatTypeLock = new Object();
66     @GuardedBy("collectionStatTypeLock")
67     private List<MultipartType> collectingStatType;
68
69     private StatisticsGatheringService statisticsGatheringService;
70     private StatisticsGatheringOnTheFlyService statisticsGatheringOnTheFlyService;
71     private Timeout pollTimeout;
72     private final DeviceInfo deviceInfo;
73     private final StatisticsManager myManager;
74     private final LifecycleService lifecycleService;
75
76     private volatile boolean schedulingEnabled;
77     private volatile CONTEXT_STATE state;
78     private ClusterInitializationPhaseHandler clusterInitializationPhaseHandler;
79     private ClusterInitializationPhaseHandler initialSubmitHandler;
80
81     private ListenableFuture<Boolean> lastDataGathering;
82
83     StatisticsContextImpl(@Nonnull final DeviceInfo deviceInfo,
84                           final boolean isStatisticsPollingOn,
85                           @Nonnull final LifecycleService lifecycleService,
86                           @Nonnull final ConvertorExecutor convertorExecutor,
87                           @Nonnull final StatisticsManager myManager) {
88         this.lifecycleService = lifecycleService;
89         this.deviceContext = lifecycleService.getDeviceContext();
90         this.devState = Preconditions.checkNotNull(deviceContext.getDeviceState());
91         this.isStatisticsPollingOn = isStatisticsPollingOn;
92         multipartReplyTranslator = new SinglePurposeMultipartReplyTranslator(convertorExecutor);
93         emptyFuture = Futures.immediateFuture(false);
94         statisticsGatheringService = new StatisticsGatheringService(this, deviceContext);
95         statisticsGatheringOnTheFlyService = new StatisticsGatheringOnTheFlyService(this, deviceContext, convertorExecutor);
96         itemLifeCycleListener = new ItemLifecycleListenerImpl(deviceContext);
97         statListForCollectingInitialization();
98         this.state = CONTEXT_STATE.INITIALIZATION;
99         this.deviceInfo = deviceInfo;
100         this.myManager = myManager;
101         this.lastDataGathering = null;
102     }
103
104     @Override
105     public void statListForCollectingInitialization() {
106         synchronized (collectionStatTypeLock) {
107             final List<MultipartType> statListForCollecting = new ArrayList<>();
108             if (devState.isTableStatisticsAvailable()) {
109                 statListForCollecting.add(MultipartType.OFPMPTABLE);
110             }
111             if (devState.isFlowStatisticsAvailable()) {
112                 statListForCollecting.add(MultipartType.OFPMPFLOW);
113             }
114             if (devState.isGroupAvailable()) {
115                 statListForCollecting.add(MultipartType.OFPMPGROUPDESC);
116                 statListForCollecting.add(MultipartType.OFPMPGROUP);
117             }
118             if (devState.isMetersAvailable()) {
119                 statListForCollecting.add(MultipartType.OFPMPMETERCONFIG);
120                 statListForCollecting.add(MultipartType.OFPMPMETER);
121             }
122             if (devState.isPortStatisticsAvailable()) {
123                 statListForCollecting.add(MultipartType.OFPMPPORTSTATS);
124             }
125             if (devState.isQueueStatisticsAvailable()) {
126                 statListForCollecting.add(MultipartType.OFPMPQUEUE);
127             }
128             collectingStatType = ImmutableList.<MultipartType>copyOf(statListForCollecting);
129         }
130     }
131
132
133     @Override
134     public ListenableFuture<Boolean> initialGatherDynamicData() {
135         return gatherDynamicData(true);
136     }
137
138     @Override
139     public ListenableFuture<Boolean> gatherDynamicData(){
140         return gatherDynamicData(false);
141     }
142
143     private ListenableFuture<Boolean> gatherDynamicData(final boolean initial) {
144         this.lastDataGathering = null;
145         if (!isStatisticsPollingOn) {
146             LOG.debug("Statistics for device {} is not enabled.", getDeviceInfo().getNodeId().getValue());
147             return Futures.immediateFuture(Boolean.TRUE);
148         }
149         final ListenableFuture<Boolean> errorResultFuture = deviceConnectionCheck();
150         if (errorResultFuture != null) {
151             return errorResultFuture;
152         }
153         synchronized (collectionStatTypeLock) {
154             final Iterator<MultipartType> statIterator = collectingStatType.iterator();
155             final SettableFuture<Boolean> settableStatResultFuture = SettableFuture.create();
156
157             // write start timestamp to state snapshot container
158             StatisticsGatheringUtils.markDeviceStateSnapshotStart(deviceContext);
159
160             statChainFuture(statIterator, settableStatResultFuture, initial);
161
162             // write end timestamp to state snapshot container
163             Futures.addCallback(settableStatResultFuture, new FutureCallback<Boolean>() {
164                 @Override
165                 public void onSuccess(@Nullable final Boolean result) {
166                     StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceContext, true);
167                 }
168                 @Override
169                 public void onFailure(final Throwable t) {
170                     if (!(t instanceof TransactionChainClosedException)) {
171                         StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceContext, false);
172                     }
173                 }
174             });
175             this.lastDataGathering = settableStatResultFuture;
176             return settableStatResultFuture;
177         }
178     }
179
180     private ListenableFuture<Boolean> chooseStat(final MultipartType multipartType, final boolean initial){
181         ListenableFuture<Boolean> result = Futures.immediateCheckedFuture(Boolean.TRUE);
182
183         switch (multipartType) {
184             case OFPMPFLOW:
185                 result = collectFlowStatistics(multipartType, initial);
186                 break;
187             case OFPMPTABLE:
188                 result = collectTableStatistics(multipartType);
189                 break;
190             case OFPMPPORTSTATS:
191                 result = collectPortStatistics(multipartType);
192                 break;
193             case OFPMPQUEUE:
194                 result = collectQueueStatistics(multipartType);
195                 break;
196             case OFPMPGROUPDESC:
197                 result = collectGroupDescStatistics(multipartType);
198                 break;
199             case OFPMPGROUP:
200                 result = collectGroupStatistics(multipartType);
201                 break;
202             case OFPMPMETERCONFIG:
203                 result = collectMeterConfigStatistics(multipartType);
204                 break;
205             case OFPMPMETER:
206                 result = collectMeterStatistics(multipartType);
207                 break;
208             default:
209                 LOG.warn("Unsupported Statistics type {}", multipartType);
210         }
211
212         return result;
213     }
214
215
216     @Override
217     public <T> RequestContext<T> createRequestContext() {
218         final AbstractRequestContext<T> ret = new AbstractRequestContext<T>(deviceInfo.reserveXidForDeviceMessage()) {
219             @Override
220             public void close() {
221                 requestContexts.remove(this);
222             }
223         };
224         requestContexts.add(ret);
225         return ret;
226     }
227
228     @Override
229     public void close() {
230         if (CONTEXT_STATE.TERMINATION.equals(getState())) {
231             if (LOG.isDebugEnabled()) {
232                 LOG.debug("StatisticsContext for node {} is already in TERMINATION state.", getDeviceInfo().getLOGValue());
233             }
234         } else {
235             try {
236                 stopClusterServices(true).get();
237             } catch (Exception e) {
238                 LOG.debug("Failed to close StatisticsContext for node {} with exception: ", getDeviceInfo().getLOGValue(), e);
239             }
240
241             this.state = CONTEXT_STATE.TERMINATION;
242
243             for (final Iterator<RequestContext<?>> iterator = Iterators.consumingIterator(requestContexts.iterator());
244                  iterator.hasNext(); ) {
245                 RequestContextUtil.closeRequestContextWithRpcError(iterator.next(), CONNECTION_CLOSED);
246             }
247
248             if (null != pollTimeout && !pollTimeout.isExpired()) {
249                 pollTimeout.cancel();
250             }
251         }
252     }
253
254     @Override
255     public void setSchedulingEnabled(final boolean schedulingEnabled) {
256         this.schedulingEnabled = schedulingEnabled;
257     }
258
259     @Override
260     public boolean isSchedulingEnabled() {
261         return schedulingEnabled;
262     }
263
264     @Override
265     public void setPollTimeout(final Timeout pollTimeout) {
266         this.pollTimeout = pollTimeout;
267     }
268
269     @Override
270     public Optional<Timeout> getPollTimeout() {
271         return Optional.ofNullable(pollTimeout);
272     }
273
274     private void statChainFuture(final Iterator<MultipartType> iterator, final SettableFuture<Boolean> resultFuture, final boolean initial) {
275         if (ConnectionContext.CONNECTION_STATE.RIP.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) {
276             final String errMsg = String.format("Device connection is closed for Node : %s.",
277                     getDeviceInfo().getNodeId());
278             LOG.debug(errMsg);
279             resultFuture.setException(new IllegalStateException(errMsg));
280             return;
281         }
282         if ( ! iterator.hasNext()) {
283             resultFuture.set(Boolean.TRUE);
284             LOG.debug("Stats collection successfully finished for node {}", getDeviceInfo().getLOGValue());
285             return;
286         }
287
288         final MultipartType nextType = iterator.next();
289         LOG.debug("Stats iterating to next type for node {} of type {}", getDeviceInfo().getLOGValue(), nextType);
290
291         final ListenableFuture<Boolean> deviceStatisticsCollectionFuture = chooseStat(nextType, initial);
292         Futures.addCallback(deviceStatisticsCollectionFuture, new FutureCallback<Boolean>() {
293             @Override
294             public void onSuccess(final Boolean result) {
295                 statChainFuture(iterator, resultFuture, initial);
296             }
297             @Override
298             public void onFailure(@Nonnull final Throwable t) {
299                 resultFuture.setException(t);
300             }
301         });
302     }
303
304     /**
305      * Method checks a device state. It returns null for be able continue. Otherwise it returns immediateFuture
306      * which has to be returned from caller too
307      *
308      * @return
309      */
310     @VisibleForTesting
311     ListenableFuture<Boolean> deviceConnectionCheck() {
312         if (!ConnectionContext.CONNECTION_STATE.WORKING.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) {
313             ListenableFuture<Boolean> resultingFuture;
314             switch (deviceContext.getPrimaryConnectionContext().getConnectionState()) {
315                 case RIP:
316                     final String errMsg = String.format("Device connection doesn't exist anymore. Primary connection status : %s",
317                             deviceContext.getPrimaryConnectionContext().getConnectionState());
318                     resultingFuture = Futures.immediateFailedFuture(new Throwable(errMsg));
319                     break;
320                 default:
321                     resultingFuture = Futures.immediateCheckedFuture(Boolean.TRUE);
322                     break;
323             }
324             return resultingFuture;
325         }
326         return null;
327     }
328
329     //TODO: Refactor twice sending deviceContext into gatheringStatistics
330     private ListenableFuture<Boolean> collectFlowStatistics(final MultipartType multipartType, final boolean initial) {
331         return devState.isFlowStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
332                 statisticsGatheringOnTheFlyService,
333                 getDeviceInfo(),
334                 /*MultipartType.OFPMPFLOW*/ multipartType,
335                 deviceContext,
336                 deviceContext,
337                 initial, multipartReplyTranslator) : emptyFuture;
338     }
339
340     private ListenableFuture<Boolean> collectTableStatistics(final MultipartType multipartType) {
341         return devState.isTableStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
342                 statisticsGatheringService,
343                 getDeviceInfo(),
344                 /*MultipartType.OFPMPTABLE*/ multipartType,
345                 deviceContext,
346                 deviceContext,
347                 false, multipartReplyTranslator) : emptyFuture;
348     }
349
350     private ListenableFuture<Boolean> collectPortStatistics(final MultipartType multipartType) {
351         return devState.isPortStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
352                 statisticsGatheringService,
353                 getDeviceInfo(),
354                 /*MultipartType.OFPMPPORTSTATS*/ multipartType,
355                 deviceContext,
356                 deviceContext,
357                 false, multipartReplyTranslator) : emptyFuture;
358     }
359
360     private ListenableFuture<Boolean> collectQueueStatistics(final MultipartType multipartType) {
361         return !devState.isQueueStatisticsAvailable() ? emptyFuture : StatisticsGatheringUtils.gatherStatistics(
362                 statisticsGatheringService,
363                 getDeviceInfo(),
364                 /*MultipartType.OFPMPQUEUE*/ multipartType,
365                 deviceContext,
366                 deviceContext,
367                 false, multipartReplyTranslator);
368     }
369
370     private ListenableFuture<Boolean> collectGroupDescStatistics(final MultipartType multipartType) {
371         return devState.isGroupAvailable() ? StatisticsGatheringUtils.gatherStatistics(
372                 statisticsGatheringService,
373                 getDeviceInfo(),
374                 /*MultipartType.OFPMPGROUPDESC*/ multipartType,
375                 deviceContext,
376                 deviceContext,
377                 false, multipartReplyTranslator) : emptyFuture;
378     }
379
380     private ListenableFuture<Boolean> collectGroupStatistics(final MultipartType multipartType) {
381         return devState.isGroupAvailable() ? StatisticsGatheringUtils.gatherStatistics(
382                 statisticsGatheringService,
383                 getDeviceInfo(),
384                 /*MultipartType.OFPMPGROUP*/ multipartType,
385                 deviceContext,
386                 deviceContext,
387                 false, multipartReplyTranslator) : emptyFuture;
388     }
389
390     private ListenableFuture<Boolean> collectMeterConfigStatistics(final MultipartType multipartType) {
391         return devState.isMetersAvailable() ? StatisticsGatheringUtils.gatherStatistics(
392                 statisticsGatheringService,
393                 getDeviceInfo(),
394                 /*MultipartType.OFPMPMETERCONFIG*/ multipartType,
395                 deviceContext,
396                 deviceContext,
397                 false, multipartReplyTranslator) : emptyFuture;
398     }
399
400     private ListenableFuture<Boolean> collectMeterStatistics(final MultipartType multipartType) {
401         return devState.isMetersAvailable() ? StatisticsGatheringUtils.gatherStatistics(
402                 statisticsGatheringService,
403                 getDeviceInfo(),
404                 /*MultipartType.OFPMPMETER*/ multipartType,
405                 deviceContext,
406                 deviceContext,
407                 false, multipartReplyTranslator) : emptyFuture;
408     }
409
410     @VisibleForTesting
411     void setStatisticsGatheringService(final StatisticsGatheringService statisticsGatheringService) {
412         this.statisticsGatheringService = statisticsGatheringService;
413     }
414
415     @VisibleForTesting
416     void setStatisticsGatheringOnTheFlyService(final StatisticsGatheringOnTheFlyService
417                                                              statisticsGatheringOnTheFlyService) {
418         this.statisticsGatheringOnTheFlyService = statisticsGatheringOnTheFlyService;
419     }
420
421     @Override
422     public ItemLifecycleListener getItemLifeCycleListener () {
423         return itemLifeCycleListener;
424     }
425
426     @Override
427     public CONTEXT_STATE getState() {
428         return this.state;
429     }
430
431     @Override
432     public ServiceGroupIdentifier getServiceIdentifier() {
433         return this.deviceInfo.getServiceIdentifier();
434     }
435
436     @Override
437     public DeviceInfo getDeviceInfo() {
438         return this.deviceInfo;
439     }
440
441     @Override
442     public ListenableFuture<Void> stopClusterServices(boolean connectionInterrupted) {
443         if (CONTEXT_STATE.TERMINATION.equals(getState())) {
444             return Futures.immediateCancelledFuture();
445         }
446
447         return Futures.transform(Futures.immediateFuture(null), new Function<Object, Void>() {
448             @Nullable
449             @Override
450             public Void apply(@Nullable Object input) {
451                 schedulingEnabled = false;
452                 stopGatheringData();
453                 return null;
454             }
455         });
456     }
457
458     @Override
459     public DeviceState gainDeviceState() {
460         return gainDeviceContext().getDeviceState();
461     }
462
463     @Override
464     public DeviceContext gainDeviceContext() {
465         return this.lifecycleService.getDeviceContext();
466     }
467
468     @Override
469     public void stopGatheringData() {
470         if (Objects.nonNull(this.lastDataGathering)) {
471             if (LOG.isDebugEnabled()) {
472                 LOG.debug("Stop the running statistics gathering for node {}", this.deviceInfo.getLOGValue());
473             }
474
475             lastDataGathering.cancel(true);
476         }
477     }
478
479     @Override
480     public void setLifecycleInitializationPhaseHandler(final ClusterInitializationPhaseHandler handler) {
481         this.clusterInitializationPhaseHandler = handler;
482     }
483
484     @Override
485     public boolean onContextInstantiateService(final ConnectionContext connectionContext) {
486         if (connectionContext.getConnectionState().equals(ConnectionContext.CONNECTION_STATE.RIP)) {
487             LOG.warn("Connection on device {} was interrupted, will stop starting master services.", deviceInfo.getLOGValue());
488             return false;
489         }
490
491         LOG.info("Starting statistics context cluster services for node {}", deviceInfo.getLOGValue());
492
493         this.statListForCollectingInitialization();
494         Futures.addCallback(this.initialGatherDynamicData(), new FutureCallback<Boolean>() {
495
496             @Override
497             public void onSuccess(@Nullable Boolean aBoolean) {
498                 initialSubmitHandler.initialSubmitTransaction();
499             }
500
501             @Override
502             public void onFailure(Throwable throwable) {
503                 LOG.warn("Initial gathering statistics unsuccessful for node {}", deviceInfo.getLOGValue());
504                 lifecycleService.closeConnection();
505             }
506         });
507
508         if (this.isStatisticsPollingOn) {
509             myManager.startScheduling(deviceInfo);
510         }
511
512         return this.clusterInitializationPhaseHandler.onContextInstantiateService(connectionContext);
513     }
514
515     @Override
516     public void setInitialSubmitHandler(final ClusterInitializationPhaseHandler initialSubmitHandler) {
517         this.initialSubmitHandler = initialSubmitHandler;
518     }
519 }