Merge "Bug 6380 lldp-speaker - DTCL instead of DTL"
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / statistics / StatisticsContextImpl.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.impl.statistics;
10
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.collect.ImmutableList;
14 import com.google.common.collect.Iterators;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.SettableFuture;
19 import io.netty.util.Timeout;
20 import java.util.ArrayList;
21 import java.util.Collection;
22 import java.util.HashSet;
23 import java.util.Iterator;
24 import java.util.List;
25 import java.util.Optional;
26 import java.util.concurrent.ExecutionException;
27 import javax.annotation.Nonnull;
28 import javax.annotation.Nullable;
29 import javax.annotation.concurrent.GuardedBy;
30 import org.opendaylight.mdsal.common.api.TransactionChainClosedException;
31 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
32 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
33 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
34 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
35 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
36 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
37 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
38 import org.opendaylight.openflowplugin.api.openflow.rpc.listener.ItemLifecycleListener;
39 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
40 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
41 import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
42 import org.opendaylight.openflowplugin.impl.rpc.listener.ItemLifecycleListenerImpl;
43 import org.opendaylight.openflowplugin.impl.services.RequestContextUtil;
44 import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringOnTheFlyService;
45 import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringService;
46 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50
51 class StatisticsContextImpl implements StatisticsContext {
52
53     private static final Logger LOG = LoggerFactory.getLogger(StatisticsContextImpl.class);
54     private static final String CONNECTION_CLOSED = "Connection closed.";
55
56     private final ItemLifecycleListener itemLifeCycleListener;
57     private final Collection<RequestContext<?>> requestContexts = new HashSet<>();
58     private final DeviceContext deviceContext;
59     private final DeviceState devState;
60     private final ListenableFuture<Boolean> emptyFuture;
61     private final boolean shuttingDownStatisticsPolling;
62     private final Object COLLECTION_STAT_TYPE_LOCK = new Object();
63     private final SinglePurposeMultipartReplyTranslator multipartReplyTranslator;
64     @GuardedBy("COLLECTION_STAT_TYPE_LOCK")
65     private List<MultipartType> collectingStatType;
66
67     private StatisticsGatheringService statisticsGatheringService;
68     private StatisticsGatheringOnTheFlyService statisticsGatheringOnTheFlyService;
69     private Timeout pollTimeout;
70     private final DeviceInfo deviceInfo;
71     private final StatisticsManager myManager;
72     private final LifecycleService lifecycleService;
73
74     private volatile boolean schedulingEnabled;
75     private volatile CONTEXT_STATE state;
76
77     StatisticsContextImpl(@Nonnull final DeviceInfo deviceInfo,
78                           final boolean shuttingDownStatisticsPolling,
79                           @Nonnull final LifecycleService lifecycleService,
80                           @Nonnull final ConvertorExecutor convertorExecutor,
81                           @Nonnull final StatisticsManager myManager) {
82         this.lifecycleService = lifecycleService;
83         this.deviceContext = lifecycleService.getDeviceContext();
84         this.devState = Preconditions.checkNotNull(deviceContext.getDeviceState());
85         this.shuttingDownStatisticsPolling = shuttingDownStatisticsPolling;
86         multipartReplyTranslator = new SinglePurposeMultipartReplyTranslator(convertorExecutor);
87         emptyFuture = Futures.immediateFuture(false);
88         statisticsGatheringService = new StatisticsGatheringService(this, deviceContext);
89         statisticsGatheringOnTheFlyService = new StatisticsGatheringOnTheFlyService(this, deviceContext, convertorExecutor);
90         itemLifeCycleListener = new ItemLifecycleListenerImpl(deviceContext);
91         statListForCollectingInitialization();
92         setState(CONTEXT_STATE.INITIALIZATION);
93         this.deviceInfo = deviceInfo;
94         this.myManager = myManager;
95     }
96
97     @Override
98     public void statListForCollectingInitialization() {
99         synchronized (COLLECTION_STAT_TYPE_LOCK) {
100             final List<MultipartType> statListForCollecting = new ArrayList<>();
101             if (devState.isTableStatisticsAvailable()) {
102                 statListForCollecting.add(MultipartType.OFPMPTABLE);
103             }
104             if (devState.isFlowStatisticsAvailable()) {
105                 statListForCollecting.add(MultipartType.OFPMPFLOW);
106             }
107             if (devState.isGroupAvailable()) {
108                 statListForCollecting.add(MultipartType.OFPMPGROUPDESC);
109                 statListForCollecting.add(MultipartType.OFPMPGROUP);
110             }
111             if (devState.isMetersAvailable()) {
112                 statListForCollecting.add(MultipartType.OFPMPMETERCONFIG);
113                 statListForCollecting.add(MultipartType.OFPMPMETER);
114             }
115             if (devState.isPortStatisticsAvailable()) {
116                 statListForCollecting.add(MultipartType.OFPMPPORTSTATS);
117             }
118             if (devState.isQueueStatisticsAvailable()) {
119                 statListForCollecting.add(MultipartType.OFPMPQUEUE);
120             }
121             collectingStatType = ImmutableList.<MultipartType>copyOf(statListForCollecting);
122         }
123     }
124
125
126     @Override
127     public ListenableFuture<Boolean> initialGatherDynamicData() {
128         return gatherDynamicData(true);
129     }
130
131     @Override
132     public ListenableFuture<Boolean> gatherDynamicData(){
133         return gatherDynamicData(false);
134     }
135
136     private ListenableFuture<Boolean> gatherDynamicData(final boolean initial) {
137         if (shuttingDownStatisticsPolling) {
138             LOG.debug("Statistics for device {} is not enabled.", getDeviceInfo().getNodeId().getValue());
139             return Futures.immediateFuture(Boolean.TRUE);
140         }
141         final ListenableFuture<Boolean> errorResultFuture = deviceConnectionCheck();
142         if (errorResultFuture != null) {
143             return errorResultFuture;
144         }
145         synchronized (COLLECTION_STAT_TYPE_LOCK) {
146             final Iterator<MultipartType> statIterator = collectingStatType.iterator();
147             final SettableFuture<Boolean> settableStatResultFuture = SettableFuture.create();
148
149             // write start timestamp to state snapshot container
150             StatisticsGatheringUtils.markDeviceStateSnapshotStart(deviceContext);
151
152             statChainFuture(statIterator, settableStatResultFuture, initial);
153
154             // write end timestamp to state snapshot container
155             Futures.addCallback(settableStatResultFuture, new FutureCallback<Boolean>() {
156                 @Override
157                 public void onSuccess(@Nullable final Boolean result) {
158                     StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceContext, true);
159                 }
160                 @Override
161                 public void onFailure(final Throwable t) {
162                     if (!(t instanceof TransactionChainClosedException)) {
163                         StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceContext, false);
164                     }
165                 }
166             });
167             return settableStatResultFuture;
168         }
169     }
170
171     private ListenableFuture<Boolean> chooseStat(final MultipartType multipartType, final boolean initial){
172         switch (multipartType) {
173             case OFPMPFLOW:
174                 return collectFlowStatistics(multipartType, initial);
175             case OFPMPTABLE:
176                 return collectTableStatistics(multipartType);
177             case OFPMPPORTSTATS:
178                 return collectPortStatistics(multipartType);
179             case OFPMPQUEUE:
180                 return collectQueueStatistics(multipartType);
181             case OFPMPGROUPDESC:
182                 return collectGroupDescStatistics(multipartType);
183             case OFPMPGROUP:
184                 return collectGroupStatistics(multipartType);
185             case OFPMPMETERCONFIG:
186                 return collectMeterConfigStatistics(multipartType);
187             case OFPMPMETER:
188                 return collectMeterStatistics(multipartType);
189             default:
190                 LOG.warn("Unsuported Statistics type {}", multipartType);
191                 return Futures.immediateCheckedFuture(Boolean.TRUE);
192         }
193     }
194
195
196     @Override
197     public <T> RequestContext<T> createRequestContext() {
198         final AbstractRequestContext<T> ret = new AbstractRequestContext<T>(deviceInfo.reserveXidForDeviceMessage()) {
199             @Override
200             public void close() {
201                 requestContexts.remove(this);
202             }
203         };
204         requestContexts.add(ret);
205         return ret;
206     }
207
208     @Override
209     public void close() {
210         if (CONTEXT_STATE.TERMINATION.equals(getState())) {
211             if (LOG.isDebugEnabled()) {
212                 LOG.debug("Statistics context is already in state TERMINATION.");
213             }
214         } else {
215             setState(CONTEXT_STATE.TERMINATION);
216             schedulingEnabled = false;
217             for (final Iterator<RequestContext<?>> iterator = Iterators.consumingIterator(requestContexts.iterator());
218                  iterator.hasNext(); ) {
219                 RequestContextUtil.closeRequestContextWithRpcError(iterator.next(), CONNECTION_CLOSED);
220             }
221             if (null != pollTimeout && !pollTimeout.isExpired()) {
222                 pollTimeout.cancel();
223             }
224         }
225     }
226
227     @Override
228     public void setSchedulingEnabled(final boolean schedulingEnabled) {
229         this.schedulingEnabled = schedulingEnabled;
230     }
231
232     @Override
233     public boolean isSchedulingEnabled() {
234         return schedulingEnabled;
235     }
236
237     @Override
238     public void setPollTimeout(final Timeout pollTimeout) {
239         this.pollTimeout = pollTimeout;
240     }
241
242     @Override
243     public Optional<Timeout> getPollTimeout() {
244         return Optional.ofNullable(pollTimeout);
245     }
246
247     private void statChainFuture(final Iterator<MultipartType> iterator, final SettableFuture<Boolean> resultFuture, final boolean initial) {
248         if (ConnectionContext.CONNECTION_STATE.RIP.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) {
249             final String errMsg = String.format("Device connection is closed for Node : %s.",
250                     getDeviceInfo().getNodeId());
251             LOG.debug(errMsg);
252             resultFuture.setException(new IllegalStateException(errMsg));
253             return;
254         }
255         if ( ! iterator.hasNext()) {
256             resultFuture.set(Boolean.TRUE);
257             LOG.debug("Stats collection successfully finished for node {}", getDeviceInfo().getNodeId());
258             return;
259         }
260
261         final MultipartType nextType = iterator.next();
262         LOG.debug("Stats iterating to next type for node {} of type {}", getDeviceInfo().getNodeId(), nextType);
263
264         final ListenableFuture<Boolean> deviceStatisticsCollectionFuture = chooseStat(nextType, initial);
265         Futures.addCallback(deviceStatisticsCollectionFuture, new FutureCallback<Boolean>() {
266             @Override
267             public void onSuccess(final Boolean result) {
268                 statChainFuture(iterator, resultFuture, initial);
269             }
270             @Override
271             public void onFailure(@Nonnull final Throwable t) {
272                 resultFuture.setException(t);
273             }
274         });
275     }
276
277     /**
278      * Method checks a device state. It returns null for be able continue. Otherwise it returns immediateFuture
279      * which has to be returned from caller too
280      *
281      * @return
282      */
283     @VisibleForTesting
284     ListenableFuture<Boolean> deviceConnectionCheck() {
285         if (!ConnectionContext.CONNECTION_STATE.WORKING.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) {
286             ListenableFuture<Boolean> resultingFuture;
287             switch (deviceContext.getPrimaryConnectionContext().getConnectionState()) {
288                 case RIP:
289                     final String errMsg = String.format("Device connection doesn't exist anymore. Primary connection status : %s",
290                             deviceContext.getPrimaryConnectionContext().getConnectionState());
291                     resultingFuture = Futures.immediateFailedFuture(new Throwable(errMsg));
292                     break;
293                 default:
294                     resultingFuture = Futures.immediateCheckedFuture(Boolean.TRUE);
295                     break;
296             }
297             return resultingFuture;
298         }
299         return null;
300     }
301
302     //TODO: Refactor twice sending deviceContext into gatheringStatistics
303     private ListenableFuture<Boolean> collectFlowStatistics(final MultipartType multipartType, final boolean initial) {
304         return devState.isFlowStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
305                 statisticsGatheringOnTheFlyService,
306                 getDeviceInfo(),
307                 /*MultipartType.OFPMPFLOW*/ multipartType,
308                 deviceContext,
309                 deviceContext,
310                 initial, multipartReplyTranslator) : emptyFuture;
311     }
312
313     private ListenableFuture<Boolean> collectTableStatistics(final MultipartType multipartType) {
314         return devState.isTableStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
315                 statisticsGatheringService,
316                 getDeviceInfo(),
317                 /*MultipartType.OFPMPTABLE*/ multipartType,
318                 deviceContext,
319                 deviceContext,
320                 false, multipartReplyTranslator) : emptyFuture;
321     }
322
323     private ListenableFuture<Boolean> collectPortStatistics(final MultipartType multipartType) {
324         return devState.isPortStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
325                 statisticsGatheringService,
326                 getDeviceInfo(),
327                 /*MultipartType.OFPMPPORTSTATS*/ multipartType,
328                 deviceContext,
329                 deviceContext,
330                 false, multipartReplyTranslator) : emptyFuture;
331     }
332
333     private ListenableFuture<Boolean> collectQueueStatistics(final MultipartType multipartType) {
334         return !devState.isQueueStatisticsAvailable() ? emptyFuture : StatisticsGatheringUtils.gatherStatistics(
335                 statisticsGatheringService,
336                 getDeviceInfo(),
337                 /*MultipartType.OFPMPQUEUE*/ multipartType,
338                 deviceContext,
339                 deviceContext,
340                 false, multipartReplyTranslator);
341     }
342
343     private ListenableFuture<Boolean> collectGroupDescStatistics(final MultipartType multipartType) {
344         return devState.isGroupAvailable() ? StatisticsGatheringUtils.gatherStatistics(
345                 statisticsGatheringService,
346                 getDeviceInfo(),
347                 /*MultipartType.OFPMPGROUPDESC*/ multipartType,
348                 deviceContext,
349                 deviceContext,
350                 false, multipartReplyTranslator) : emptyFuture;
351     }
352
353     private ListenableFuture<Boolean> collectGroupStatistics(final MultipartType multipartType) {
354         return devState.isGroupAvailable() ? StatisticsGatheringUtils.gatherStatistics(
355                 statisticsGatheringService,
356                 getDeviceInfo(),
357                 /*MultipartType.OFPMPGROUP*/ multipartType,
358                 deviceContext,
359                 deviceContext,
360                 false, multipartReplyTranslator) : emptyFuture;
361     }
362
363     private ListenableFuture<Boolean> collectMeterConfigStatistics(final MultipartType multipartType) {
364         return devState.isMetersAvailable() ? StatisticsGatheringUtils.gatherStatistics(
365                 statisticsGatheringService,
366                 getDeviceInfo(),
367                 /*MultipartType.OFPMPMETERCONFIG*/ multipartType,
368                 deviceContext,
369                 deviceContext,
370                 false, multipartReplyTranslator) : emptyFuture;
371     }
372
373     private ListenableFuture<Boolean> collectMeterStatistics(final MultipartType multipartType) {
374         return devState.isMetersAvailable() ? StatisticsGatheringUtils.gatherStatistics(
375                 statisticsGatheringService,
376                 getDeviceInfo(),
377                 /*MultipartType.OFPMPMETER*/ multipartType,
378                 deviceContext,
379                 deviceContext,
380                 false, multipartReplyTranslator) : emptyFuture;
381     }
382
383     @VisibleForTesting
384     void setStatisticsGatheringService(final StatisticsGatheringService statisticsGatheringService) {
385         this.statisticsGatheringService = statisticsGatheringService;
386     }
387
388     @VisibleForTesting
389     void setStatisticsGatheringOnTheFlyService(final StatisticsGatheringOnTheFlyService
390                                                              statisticsGatheringOnTheFlyService) {
391         this.statisticsGatheringOnTheFlyService = statisticsGatheringOnTheFlyService;
392     }
393
394     @Override
395     public ItemLifecycleListener getItemLifeCycleListener () {
396         return itemLifeCycleListener;
397     }
398
399     @Override
400     public CONTEXT_STATE getState() {
401         return this.state;
402     }
403
404     @Override
405     public void setState(CONTEXT_STATE state) {
406         this.state = state;
407     }
408
409     @Override
410     public ServiceGroupIdentifier getServiceIdentifier() {
411         return this.deviceInfo.getServiceIdentifier();
412     }
413
414     @Override
415     public DeviceInfo getDeviceInfo() {
416         return this.deviceInfo;
417     }
418
419     @Override
420     public void startupClusterServices() throws ExecutionException, InterruptedException {
421         if (!this.shuttingDownStatisticsPolling) {
422             this.statListForCollectingInitialization();
423             this.initialGatherDynamicData();
424             myManager.startScheduling(deviceInfo);
425         }
426     }
427
428     @Override
429     public ListenableFuture<Void> stopClusterServices(boolean deviceDisconnected) {
430         myManager.stopScheduling(deviceInfo);
431         return Futures.immediateFuture(null);
432     }
433
434     @Override
435     public LifecycleService getLifecycleService() {
436         return lifecycleService;
437     }
438 }