Merge "Bug 8217: Set error information into direct statistics RPC result."
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / statistics / StatisticsContextImpl.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.impl.statistics;
10
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Function;
13 import com.google.common.base.Preconditions;
14 import com.google.common.collect.ImmutableList;
15 import com.google.common.collect.Iterators;
16 import com.google.common.util.concurrent.FutureCallback;
17 import com.google.common.util.concurrent.Futures;
18 import com.google.common.util.concurrent.ListenableFuture;
19 import com.google.common.util.concurrent.SettableFuture;
20 import io.netty.util.Timeout;
21 import java.util.ArrayList;
22 import java.util.Collection;
23 import java.util.HashSet;
24 import java.util.Iterator;
25 import java.util.List;
26 import java.util.Objects;
27 import java.util.Optional;
28 import javax.annotation.Nonnull;
29 import javax.annotation.Nullable;
30 import javax.annotation.concurrent.GuardedBy;
31 import org.opendaylight.mdsal.common.api.TransactionChainClosedException;
32 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
33 import org.opendaylight.openflowplugin.api.ConnectionException;
34 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
35 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
36 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
37 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
38 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
39 import org.opendaylight.openflowplugin.api.openflow.device.handlers.ClusterInitializationPhaseHandler;
40 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState;
41 import org.opendaylight.openflowplugin.api.openflow.lifecycle.MastershipChangeListener;
42 import org.opendaylight.openflowplugin.api.openflow.rpc.listener.ItemLifecycleListener;
43 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
44 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
45 import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
46 import org.opendaylight.openflowplugin.impl.rpc.listener.ItemLifecycleListenerImpl;
47 import org.opendaylight.openflowplugin.impl.services.util.RequestContextUtil;
48 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider;
49 import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringOnTheFlyService;
50 import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringService;
51 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
56
57 class StatisticsContextImpl<T extends OfHeader> implements StatisticsContext {
58
59     private static final Logger LOG = LoggerFactory.getLogger(StatisticsContextImpl.class);
60     private static final String CONNECTION_CLOSED = "Connection closed.";
61
62     private final ItemLifecycleListener itemLifeCycleListener;
63     private final Collection<RequestContext<?>> requestContexts = new HashSet<>();
64     private final DeviceContext deviceContext;
65     private final DeviceState devState;
66     private final ListenableFuture<Boolean> emptyFuture;
67     private final boolean isStatisticsPollingOn;
68     private final Object collectionStatTypeLock = new Object();
69     private final ConvertorExecutor convertorExecutor;
70     private final MultipartWriterProvider statisticsWriterProvider;
71     @GuardedBy("collectionStatTypeLock")
72     private List<MultipartType> collectingStatType;
73
74     private StatisticsGatheringService<T> statisticsGatheringService;
75     private StatisticsGatheringOnTheFlyService<T> statisticsGatheringOnTheFlyService;
76     private Timeout pollTimeout;
77     private final DeviceInfo deviceInfo;
78     private final StatisticsManager myManager;
79
80     private volatile boolean schedulingEnabled;
81     private volatile CONTEXT_STATE state;
82     private ClusterInitializationPhaseHandler clusterInitializationPhaseHandler;
83     private ClusterInitializationPhaseHandler initialSubmitHandler;
84
85     private ListenableFuture<Boolean> lastDataGathering;
86
87     StatisticsContextImpl(final boolean isStatisticsPollingOn,
88                           @Nonnull final DeviceContext deviceContext,
89                           @Nonnull final ConvertorExecutor convertorExecutor,
90                           @Nonnull final StatisticsManager myManager,
91                           @Nonnull final MultipartWriterProvider statisticsWriterProvider) {
92         this.deviceContext = deviceContext;
93         this.devState = Preconditions.checkNotNull(deviceContext.getDeviceState());
94         this.isStatisticsPollingOn = isStatisticsPollingOn;
95         this.convertorExecutor = convertorExecutor;
96         emptyFuture = Futures.immediateFuture(false);
97         statisticsGatheringService = new StatisticsGatheringService<>(this, deviceContext);
98         statisticsGatheringOnTheFlyService = new StatisticsGatheringOnTheFlyService<>(this,
99             deviceContext, convertorExecutor, statisticsWriterProvider);
100         itemLifeCycleListener = new ItemLifecycleListenerImpl(deviceContext);
101         statListForCollectingInitialization();
102         this.state = CONTEXT_STATE.INITIALIZATION;
103         this.deviceInfo = deviceContext.getDeviceInfo();
104         this.myManager = myManager;
105         this.lastDataGathering = null;
106         this.statisticsWriterProvider = statisticsWriterProvider;
107     }
108
109     @Override
110     public void statListForCollectingInitialization() {
111         synchronized (collectionStatTypeLock) {
112             final List<MultipartType> statListForCollecting = new ArrayList<>();
113             if (devState.isTableStatisticsAvailable()) {
114                 statListForCollecting.add(MultipartType.OFPMPTABLE);
115             }
116             if (devState.isFlowStatisticsAvailable()) {
117                 statListForCollecting.add(MultipartType.OFPMPFLOW);
118             }
119             if (devState.isGroupAvailable()) {
120                 statListForCollecting.add(MultipartType.OFPMPGROUPDESC);
121                 statListForCollecting.add(MultipartType.OFPMPGROUP);
122             }
123             if (devState.isMetersAvailable()) {
124                 statListForCollecting.add(MultipartType.OFPMPMETERCONFIG);
125                 statListForCollecting.add(MultipartType.OFPMPMETER);
126             }
127             if (devState.isPortStatisticsAvailable()) {
128                 statListForCollecting.add(MultipartType.OFPMPPORTSTATS);
129             }
130             if (devState.isQueueStatisticsAvailable()) {
131                 statListForCollecting.add(MultipartType.OFPMPQUEUE);
132             }
133             collectingStatType = ImmutableList.<MultipartType>copyOf(statListForCollecting);
134         }
135     }
136
137
138     @Override
139     public ListenableFuture<Boolean> initialGatherDynamicData() {
140         return gatherDynamicData(true);
141     }
142
143     @Override
144     public ListenableFuture<Boolean> gatherDynamicData(){
145         return gatherDynamicData(false);
146     }
147
148     private ListenableFuture<Boolean> gatherDynamicData(final boolean initial) {
149         this.lastDataGathering = null;
150         if (!isStatisticsPollingOn) {
151             LOG.debug("Statistics for device {} is not enabled.", getDeviceInfo().getNodeId().getValue());
152             return Futures.immediateFuture(Boolean.TRUE);
153         }
154         final ListenableFuture<Boolean> errorResultFuture = deviceConnectionCheck();
155         if (errorResultFuture != null) {
156             return errorResultFuture;
157         }
158         synchronized (collectionStatTypeLock) {
159             final Iterator<MultipartType> statIterator = collectingStatType.iterator();
160             final SettableFuture<Boolean> settableStatResultFuture = SettableFuture.create();
161
162             // write start timestamp to state snapshot container
163             StatisticsGatheringUtils.markDeviceStateSnapshotStart(deviceContext);
164
165             statChainFuture(statIterator, settableStatResultFuture, initial);
166
167             // write end timestamp to state snapshot container
168             Futures.addCallback(settableStatResultFuture, new FutureCallback<Boolean>() {
169                 @Override
170                 public void onSuccess(@Nullable final Boolean result) {
171                     StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceContext, true);
172                 }
173                 @Override
174                 public void onFailure(final Throwable t) {
175                     if (!(t instanceof TransactionChainClosedException)) {
176                         StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceContext, false);
177                     }
178                 }
179             });
180             this.lastDataGathering = settableStatResultFuture;
181             return settableStatResultFuture;
182         }
183     }
184
185     private ListenableFuture<Boolean> chooseStat(final MultipartType multipartType, final boolean initial){
186         ListenableFuture<Boolean> result = Futures.immediateCheckedFuture(Boolean.TRUE);
187
188         switch (multipartType) {
189             case OFPMPFLOW:
190                 result = collectFlowStatistics(multipartType, initial);
191                 break;
192             case OFPMPTABLE:
193                 result = collectTableStatistics(multipartType);
194                 break;
195             case OFPMPPORTSTATS:
196                 result = collectPortStatistics(multipartType);
197                 break;
198             case OFPMPQUEUE:
199                 result = collectQueueStatistics(multipartType);
200                 break;
201             case OFPMPGROUPDESC:
202                 result = collectGroupDescStatistics(multipartType);
203                 break;
204             case OFPMPGROUP:
205                 result = collectGroupStatistics(multipartType);
206                 break;
207             case OFPMPMETERCONFIG:
208                 result = collectMeterConfigStatistics(multipartType);
209                 break;
210             case OFPMPMETER:
211                 result = collectMeterStatistics(multipartType);
212                 break;
213             default:
214                 LOG.warn("Unsupported Statistics type {}", multipartType);
215         }
216
217         return result;
218     }
219
220
221     @Override
222     public <T> RequestContext<T> createRequestContext() {
223         final AbstractRequestContext<T> ret = new AbstractRequestContext<T>(deviceInfo.reserveXidForDeviceMessage()) {
224             @Override
225             public void close() {
226                 requestContexts.remove(this);
227             }
228         };
229         requestContexts.add(ret);
230         return ret;
231     }
232
233     @Override
234     public void close() {
235         if (CONTEXT_STATE.TERMINATION.equals(getState())) {
236             if (LOG.isDebugEnabled()) {
237                 LOG.debug("StatisticsContext for node {} is already in TERMINATION state.", getDeviceInfo().getLOGValue());
238             }
239         } else {
240
241             this.state = CONTEXT_STATE.TERMINATION;
242
243             for (final Iterator<RequestContext<?>> iterator = Iterators.consumingIterator(requestContexts.iterator());
244                  iterator.hasNext(); ) {
245                 RequestContextUtil.closeRequestContextWithRpcError(iterator.next(), CONNECTION_CLOSED);
246             }
247
248             if (null != pollTimeout && !pollTimeout.isExpired()) {
249                 pollTimeout.cancel();
250             }
251         }
252     }
253
254     @Override
255     public void setSchedulingEnabled(final boolean schedulingEnabled) {
256         this.schedulingEnabled = schedulingEnabled;
257     }
258
259     @Override
260     public boolean isSchedulingEnabled() {
261         return schedulingEnabled;
262     }
263
264     @Override
265     public void setPollTimeout(final Timeout pollTimeout) {
266         this.pollTimeout = pollTimeout;
267     }
268
269     @Override
270     public Optional<Timeout> getPollTimeout() {
271         return Optional.ofNullable(pollTimeout);
272     }
273
274     private void statChainFuture(final Iterator<MultipartType> iterator, final SettableFuture<Boolean> resultFuture, final boolean initial) {
275         if (ConnectionContext.CONNECTION_STATE.RIP.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) {
276             final String errMsg = String.format("Device connection is closed for Node : %s.",
277                     getDeviceInfo().getNodeId());
278             LOG.debug(errMsg);
279             resultFuture.setException(new ConnectionException(errMsg));
280             return;
281         }
282
283         if (!iterator.hasNext()) {
284             if (initial) {
285                 Futures.addCallback(StatisticsGatheringUtils.gatherStatistics(
286                         statisticsGatheringService,
287                         getDeviceInfo(),
288                         MultipartType.OFPMPPORTDESC,
289                         deviceContext,
290                         deviceContext,
291                         false,
292                         convertorExecutor,
293                         statisticsWriterProvider), new FutureCallback<Boolean>() {
294                     @Override
295                     public void onSuccess(final Boolean result) {
296                         statChainFuture(iterator, resultFuture, false);
297                     }
298                     @Override
299                     public void onFailure(@Nonnull final Throwable t) {
300                         resultFuture.setException(t);
301                     }
302                 });
303
304                 return;
305             }
306
307             resultFuture.set(Boolean.TRUE);
308             LOG.debug("Stats collection successfully finished for node {}", getDeviceInfo().getLOGValue());
309             return;
310         }
311
312         final MultipartType nextType = iterator.next();
313         LOG.debug("Stats iterating to next type for node {} of type {}", getDeviceInfo().getLOGValue(), nextType);
314
315         final ListenableFuture<Boolean> deviceStatisticsCollectionFuture = chooseStat(nextType, initial);
316         Futures.addCallback(deviceStatisticsCollectionFuture, new FutureCallback<Boolean>() {
317             @Override
318             public void onSuccess(final Boolean result) {
319                 statChainFuture(iterator, resultFuture, initial);
320             }
321             @Override
322             public void onFailure(@Nonnull final Throwable t) {
323                 resultFuture.setException(t);
324             }
325         });
326     }
327
328     /**
329      * Method checks a device state. It returns null for be able continue. Otherwise it returns immediateFuture
330      * which has to be returned from caller too
331      *
332      * @return future
333      */
334     @VisibleForTesting
335     ListenableFuture<Boolean> deviceConnectionCheck() {
336         if (!ConnectionContext.CONNECTION_STATE.WORKING.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) {
337             ListenableFuture<Boolean> resultingFuture;
338             switch (deviceContext.getPrimaryConnectionContext().getConnectionState()) {
339                 case RIP:
340                     final String errMsg = String.format("Device connection doesn't exist anymore. Primary connection status : %s",
341                         deviceContext.getPrimaryConnectionContext().getConnectionState());
342                     resultingFuture = Futures.immediateFailedFuture(new Throwable(errMsg));
343                     break;
344                 default:
345                     resultingFuture = Futures.immediateCheckedFuture(Boolean.TRUE);
346                     break;
347             }
348             return resultingFuture;
349         }
350         return null;
351     }
352
353     //TODO: Refactor twice sending deviceContext into gatheringStatistics
354     private ListenableFuture<Boolean> collectFlowStatistics(final MultipartType multipartType, final boolean initial) {
355         return devState.isFlowStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
356             statisticsGatheringOnTheFlyService,
357             getDeviceInfo(),
358             /*MultipartType.OFPMPFLOW*/ multipartType,
359             deviceContext,
360             deviceContext,
361             initial,
362             convertorExecutor,
363             statisticsWriterProvider) : emptyFuture;
364     }
365
366     private ListenableFuture<Boolean> collectTableStatistics(final MultipartType multipartType) {
367         return devState.isTableStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
368             statisticsGatheringService,
369             getDeviceInfo(),
370                 /*MultipartType.OFPMPTABLE*/ multipartType,
371             deviceContext,
372             deviceContext,
373             false,
374             convertorExecutor,
375             statisticsWriterProvider) : emptyFuture;
376     }
377
378     private ListenableFuture<Boolean> collectPortStatistics(final MultipartType multipartType) {
379         return devState.isPortStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
380             statisticsGatheringService,
381             getDeviceInfo(),
382                 /*MultipartType.OFPMPPORTSTATS*/ multipartType,
383             deviceContext,
384             deviceContext,
385             false,
386             convertorExecutor,
387             statisticsWriterProvider) : emptyFuture;
388     }
389
390     private ListenableFuture<Boolean> collectQueueStatistics(final MultipartType multipartType) {
391         return !devState.isQueueStatisticsAvailable() ? emptyFuture : StatisticsGatheringUtils.gatherStatistics(
392             statisticsGatheringService,
393             getDeviceInfo(),
394                 /*MultipartType.OFPMPQUEUE*/ multipartType,
395             deviceContext,
396             deviceContext,
397             false,
398             convertorExecutor,
399             statisticsWriterProvider);
400     }
401
402     private ListenableFuture<Boolean> collectGroupDescStatistics(final MultipartType multipartType) {
403         return devState.isGroupAvailable() ? StatisticsGatheringUtils.gatherStatistics(
404             statisticsGatheringService,
405             getDeviceInfo(),
406                 /*MultipartType.OFPMPGROUPDESC*/ multipartType,
407             deviceContext,
408             deviceContext,
409             false,
410             convertorExecutor,
411             statisticsWriterProvider) : emptyFuture;
412     }
413
414     private ListenableFuture<Boolean> collectGroupStatistics(final MultipartType multipartType) {
415         return devState.isGroupAvailable() ? StatisticsGatheringUtils.gatherStatistics(
416             statisticsGatheringService,
417             getDeviceInfo(),
418                 /*MultipartType.OFPMPGROUP*/ multipartType,
419             deviceContext,
420             deviceContext,
421             false,
422             convertorExecutor,
423             statisticsWriterProvider) : emptyFuture;
424     }
425
426     private ListenableFuture<Boolean> collectMeterConfigStatistics(final MultipartType multipartType) {
427         return devState.isMetersAvailable() ? StatisticsGatheringUtils.gatherStatistics(
428             statisticsGatheringService,
429             getDeviceInfo(),
430                 /*MultipartType.OFPMPMETERCONFIG*/ multipartType,
431             deviceContext,
432             deviceContext,
433             false,
434             convertorExecutor,
435             statisticsWriterProvider) : emptyFuture;
436     }
437
438     private ListenableFuture<Boolean> collectMeterStatistics(final MultipartType multipartType) {
439         return devState.isMetersAvailable() ? StatisticsGatheringUtils.gatherStatistics(
440             statisticsGatheringService,
441             getDeviceInfo(),
442                 /*MultipartType.OFPMPMETER*/ multipartType,
443             deviceContext,
444             deviceContext,
445             false,
446             convertorExecutor,
447             statisticsWriterProvider) : emptyFuture;
448     }
449
450     @VisibleForTesting
451     void setStatisticsGatheringService(final StatisticsGatheringService<T> statisticsGatheringService) {
452         this.statisticsGatheringService = statisticsGatheringService;
453     }
454
455     @VisibleForTesting
456     void setStatisticsGatheringOnTheFlyService(final StatisticsGatheringOnTheFlyService<T> statisticsGatheringOnTheFlyService) {
457         this.statisticsGatheringOnTheFlyService = statisticsGatheringOnTheFlyService;
458     }
459
460     @Override
461     public ItemLifecycleListener getItemLifeCycleListener () {
462         return itemLifeCycleListener;
463     }
464
465     @Override
466     public CONTEXT_STATE getState() {
467         return this.state;
468     }
469
470     @Override
471     public ServiceGroupIdentifier getServiceIdentifier() {
472         return this.deviceInfo.getServiceIdentifier();
473     }
474
475     @Override
476     public DeviceInfo getDeviceInfo() {
477         return this.deviceInfo;
478     }
479
480     @Override
481     public ListenableFuture<Void> stopClusterServices() {
482         if (CONTEXT_STATE.TERMINATION.equals(this.state)) {
483             return Futures.immediateCancelledFuture();
484         }
485
486         return Futures.transform(Futures.immediateFuture(null), new Function<Object, Void>() {
487             @Nullable
488             @Override
489             public Void apply(@Nullable Object input) {
490                 schedulingEnabled = false;
491                 stopGatheringData();
492                 return null;
493             }
494         });
495     }
496
497     @Override
498     public DeviceState gainDeviceState() {
499         return gainDeviceContext().getDeviceState();
500     }
501
502     @Override
503     public DeviceContext gainDeviceContext() {
504         return this.deviceContext;
505     }
506
507     @Override
508     public void stopGatheringData() {
509         if (Objects.nonNull(this.lastDataGathering)) {
510             if (LOG.isDebugEnabled()) {
511                 LOG.debug("Stop the running statistics gathering for node {}", this.deviceInfo.getLOGValue());
512             }
513
514             lastDataGathering.cancel(true);
515         }
516     }
517
518     @Override
519     public void setLifecycleInitializationPhaseHandler(final ClusterInitializationPhaseHandler handler) {
520         this.clusterInitializationPhaseHandler = handler;
521     }
522
523     @Override
524     public boolean onContextInstantiateService(final MastershipChangeListener mastershipChangeListener) {
525
526         LOG.info("Starting statistics context cluster services for node {}", deviceInfo.getLOGValue());
527
528         this.statListForCollectingInitialization();
529         Futures.addCallback(this.initialGatherDynamicData(), new FutureCallback<Boolean>() {
530
531             @Override
532             public void onSuccess(@Nullable Boolean aBoolean) {
533                 mastershipChangeListener.onMasterRoleAcquired(
534                         deviceInfo,
535                         ContextChainMastershipState.INITIAL_GATHERING
536                 );
537                 if (initialSubmitHandler.initialSubmitTransaction()) {
538                     mastershipChangeListener.onMasterRoleAcquired(
539                             deviceInfo,
540                             ContextChainMastershipState.INITIAL_SUBMIT
541                     );
542                     if (isStatisticsPollingOn) {
543                         myManager.startScheduling(deviceInfo);
544                     }
545                 } else {
546                     mastershipChangeListener.onNotAbleToStartMastership(
547                             deviceInfo,
548                             "Initial transaction cannot be submitted."
549                     );
550                 }
551             }
552
553             @Override
554             public void onFailure(@Nonnull Throwable throwable) {
555                 mastershipChangeListener.onNotAbleToStartMastership(
556                         deviceInfo,
557                         "Initial gathering statistics unsuccessful."
558                 );
559             }
560         });
561
562         return this.clusterInitializationPhaseHandler.onContextInstantiateService(mastershipChangeListener);
563     }
564
565     @Override
566     public void setInitialSubmitHandler(final ClusterInitializationPhaseHandler initialSubmitHandler) {
567         this.initialSubmitHandler = initialSubmitHandler;
568     }
569 }