Merge "Bug 5596 Cleaning part 2"
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / statistics / StatisticsContextImpl.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.impl.statistics;
10
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.collect.ImmutableList;
14 import com.google.common.collect.Iterators;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.SettableFuture;
19 import io.netty.util.Timeout;
20 import java.util.ArrayList;
21 import java.util.Collection;
22 import java.util.HashSet;
23 import java.util.Iterator;
24 import java.util.List;
25 import java.util.Optional;
26 import java.util.concurrent.ExecutionException;
27 import javax.annotation.Nonnull;
28 import javax.annotation.Nullable;
29 import javax.annotation.concurrent.GuardedBy;
30 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
31 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
32 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
33 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
34 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
35 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
36 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleConductor;
37 import org.opendaylight.openflowplugin.api.openflow.rpc.listener.ItemLifecycleListener;
38 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
39 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
40 import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
41 import org.opendaylight.openflowplugin.impl.rpc.listener.ItemLifecycleListenerImpl;
42 import org.opendaylight.openflowplugin.impl.services.RequestContextUtil;
43 import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringOnTheFlyService;
44 import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringService;
45 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49
50 class StatisticsContextImpl implements StatisticsContext {
51
52     private static final Logger LOG = LoggerFactory.getLogger(StatisticsContextImpl.class);
53     private static final String CONNECTION_CLOSED = "Connection closed.";
54
55     private final ItemLifecycleListener itemLifeCycleListener;
56     private final Collection<RequestContext<?>> requestContexts = new HashSet<>();
57     private final DeviceContext deviceContext;
58     private final DeviceState devState;
59     private final ListenableFuture<Boolean> emptyFuture;
60     private final boolean shuttingDownStatisticsPolling;
61     private final Object COLLECTION_STAT_TYPE_LOCK = new Object();
62     private final SinglePurposeMultipartReplyTranslator multipartReplyTranslator;
63     @GuardedBy("COLLECTION_STAT_TYPE_LOCK")
64     private List<MultipartType> collectingStatType;
65
66     private StatisticsGatheringService statisticsGatheringService;
67     private StatisticsGatheringOnTheFlyService statisticsGatheringOnTheFlyService;
68     private Timeout pollTimeout;
69     private final DeviceInfo deviceInfo;
70     private final StatisticsManager myManager;
71
72     private volatile boolean schedulingEnabled;
73     private volatile CONTEXT_STATE state;
74
75     StatisticsContextImpl(@Nonnull final DeviceInfo deviceInfo,
76                           @Nonnull final boolean shuttingDownStatisticsPolling,
77                           @Nonnull final LifecycleConductor lifecycleConductor,
78                           @Nonnull final ConvertorExecutor convertorExecutor,
79                           @Nonnull final StatisticsManager myManager) {
80         this.deviceContext = Preconditions.checkNotNull(lifecycleConductor.getDeviceContext(deviceInfo));
81         this.devState = Preconditions.checkNotNull(deviceContext.getDeviceState());
82         this.shuttingDownStatisticsPolling = shuttingDownStatisticsPolling;
83         multipartReplyTranslator = new SinglePurposeMultipartReplyTranslator(convertorExecutor);
84         emptyFuture = Futures.immediateFuture(false);
85         statisticsGatheringService = new StatisticsGatheringService(this, deviceContext);
86         statisticsGatheringOnTheFlyService = new StatisticsGatheringOnTheFlyService(this, deviceContext, convertorExecutor);
87         itemLifeCycleListener = new ItemLifecycleListenerImpl(deviceContext);
88         statListForCollectingInitialization();
89         setState(CONTEXT_STATE.INITIALIZATION);
90         this.deviceInfo = deviceInfo;
91         this.myManager = myManager;
92     }
93
94     @Override
95     public void statListForCollectingInitialization() {
96         synchronized (COLLECTION_STAT_TYPE_LOCK) {
97             final List<MultipartType> statListForCollecting = new ArrayList<>();
98             if (devState.isTableStatisticsAvailable()) {
99                 statListForCollecting.add(MultipartType.OFPMPTABLE);
100             }
101             if (devState.isFlowStatisticsAvailable()) {
102                 statListForCollecting.add(MultipartType.OFPMPFLOW);
103             }
104             if (devState.isGroupAvailable()) {
105                 statListForCollecting.add(MultipartType.OFPMPGROUPDESC);
106                 statListForCollecting.add(MultipartType.OFPMPGROUP);
107             }
108             if (devState.isMetersAvailable()) {
109                 statListForCollecting.add(MultipartType.OFPMPMETERCONFIG);
110                 statListForCollecting.add(MultipartType.OFPMPMETER);
111             }
112             if (devState.isPortStatisticsAvailable()) {
113                 statListForCollecting.add(MultipartType.OFPMPPORTSTATS);
114             }
115             if (devState.isQueueStatisticsAvailable()) {
116                 statListForCollecting.add(MultipartType.OFPMPQUEUE);
117             }
118             collectingStatType = ImmutableList.<MultipartType>copyOf(statListForCollecting);
119         }
120     }
121
122
123     @Override
124     public ListenableFuture<Boolean> initialGatherDynamicData() {
125         return gatherDynamicData(true);
126     }
127
128     @Override
129     public ListenableFuture<Boolean> gatherDynamicData(){
130         return gatherDynamicData(false);
131     }
132
133     private ListenableFuture<Boolean> gatherDynamicData(final boolean initial) {
134         if (shuttingDownStatisticsPolling) {
135             LOG.debug("Statistics for device {} is not enabled.", getDeviceInfo().getNodeId().getValue());
136             return Futures.immediateFuture(Boolean.TRUE);
137         }
138         final ListenableFuture<Boolean> errorResultFuture = deviceConnectionCheck();
139         if (errorResultFuture != null) {
140             return errorResultFuture;
141         }
142         synchronized (COLLECTION_STAT_TYPE_LOCK) {
143             final Iterator<MultipartType> statIterator = collectingStatType.iterator();
144             final SettableFuture<Boolean> settableStatResultFuture = SettableFuture.create();
145
146             // write start timestamp to state snapshot container
147             StatisticsGatheringUtils.markDeviceStateSnapshotStart(deviceContext);
148
149             statChainFuture(statIterator, settableStatResultFuture, initial);
150
151             // write end timestamp to state snapshot container
152             Futures.addCallback(settableStatResultFuture, new FutureCallback<Boolean>() {
153                 @Override
154                 public void onSuccess(@Nullable final Boolean result) {
155                     StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceContext, true);
156                 }
157                 @Override
158                 public void onFailure(final Throwable t) {
159                     StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceContext, false);
160                 }
161             });
162             return settableStatResultFuture;
163         }
164     }
165
166     private ListenableFuture<Boolean> chooseStat(final MultipartType multipartType, final boolean initial){
167         switch (multipartType) {
168             case OFPMPFLOW:
169                 return collectFlowStatistics(multipartType, initial);
170             case OFPMPTABLE:
171                 return collectTableStatistics(multipartType);
172             case OFPMPPORTSTATS:
173                 return collectPortStatistics(multipartType);
174             case OFPMPQUEUE:
175                 return collectQueueStatistics(multipartType);
176             case OFPMPGROUPDESC:
177                 return collectGroupDescStatistics(multipartType);
178             case OFPMPGROUP:
179                 return collectGroupStatistics(multipartType);
180             case OFPMPMETERCONFIG:
181                 return collectMeterConfigStatistics(multipartType);
182             case OFPMPMETER:
183                 return collectMeterStatistics(multipartType);
184             default:
185                 LOG.warn("Unsuported Statistics type {}", multipartType);
186                 return Futures.immediateCheckedFuture(Boolean.TRUE);
187         }
188     }
189
190
191     @Override
192     public <T> RequestContext<T> createRequestContext() {
193         final AbstractRequestContext<T> ret = new AbstractRequestContext<T>(deviceContext.reserveXidForDeviceMessage()) {
194             @Override
195             public void close() {
196                 requestContexts.remove(this);
197             }
198         };
199         requestContexts.add(ret);
200         return ret;
201     }
202
203     @Override
204     public void close() {
205         if (CONTEXT_STATE.TERMINATION.equals(getState())) {
206             if (LOG.isDebugEnabled()) {
207                 LOG.debug("Statistics context is already in state TERMINATION.");
208             }
209         } else {
210             setState(CONTEXT_STATE.TERMINATION);
211             schedulingEnabled = false;
212             for (final Iterator<RequestContext<?>> iterator = Iterators.consumingIterator(requestContexts.iterator());
213                  iterator.hasNext(); ) {
214                 RequestContextUtil.closeRequestContextWithRpcError(iterator.next(), CONNECTION_CLOSED);
215             }
216             if (null != pollTimeout && !pollTimeout.isExpired()) {
217                 pollTimeout.cancel();
218             }
219         }
220     }
221
222     @Override
223     public void setSchedulingEnabled(final boolean schedulingEnabled) {
224         this.schedulingEnabled = schedulingEnabled;
225     }
226
227     @Override
228     public boolean isSchedulingEnabled() {
229         return schedulingEnabled;
230     }
231
232     @Override
233     public void setPollTimeout(final Timeout pollTimeout) {
234         this.pollTimeout = pollTimeout;
235     }
236
237     @Override
238     public Optional<Timeout> getPollTimeout() {
239         return Optional.ofNullable(pollTimeout);
240     }
241
242     private void statChainFuture(final Iterator<MultipartType> iterator, final SettableFuture<Boolean> resultFuture, final boolean initial) {
243         if (ConnectionContext.CONNECTION_STATE.RIP.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) {
244             final String errMsg = String.format("Device connection is closed for Node : %s.",
245                     getDeviceInfo().getNodeId());
246             LOG.debug(errMsg);
247             resultFuture.setException(new IllegalStateException(errMsg));
248             return;
249         }
250         if ( ! iterator.hasNext()) {
251             resultFuture.set(Boolean.TRUE);
252             LOG.debug("Stats collection successfully finished for node {}", getDeviceInfo().getNodeId());
253             return;
254         }
255
256         final MultipartType nextType = iterator.next();
257         LOG.debug("Stats iterating to next type for node {} of type {}", getDeviceInfo().getNodeId(), nextType);
258
259         final ListenableFuture<Boolean> deviceStatisticsCollectionFuture = chooseStat(nextType, initial);
260         Futures.addCallback(deviceStatisticsCollectionFuture, new FutureCallback<Boolean>() {
261             @Override
262             public void onSuccess(final Boolean result) {
263                 statChainFuture(iterator, resultFuture, initial);
264             }
265             @Override
266             public void onFailure(@Nonnull final Throwable t) {
267                 resultFuture.setException(t);
268             }
269         });
270     }
271
272     /**
273      * Method checks a device state. It returns null for be able continue. Otherwise it returns immediateFuture
274      * which has to be returned from caller too
275      *
276      * @return
277      */
278     @VisibleForTesting
279     ListenableFuture<Boolean> deviceConnectionCheck() {
280         if (!ConnectionContext.CONNECTION_STATE.WORKING.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) {
281             ListenableFuture<Boolean> resultingFuture;
282             switch (deviceContext.getPrimaryConnectionContext().getConnectionState()) {
283                 case RIP:
284                     final String errMsg = String.format("Device connection doesn't exist anymore. Primary connection status : %s",
285                             deviceContext.getPrimaryConnectionContext().getConnectionState());
286                     resultingFuture = Futures.immediateFailedFuture(new Throwable(errMsg));
287                     break;
288                 default:
289                     resultingFuture = Futures.immediateCheckedFuture(Boolean.TRUE);
290                     break;
291             }
292             return resultingFuture;
293         }
294         return null;
295     }
296
297     //TODO: Refactor twice sending deviceContext into gatheringStatistics
298     private ListenableFuture<Boolean> collectFlowStatistics(final MultipartType multipartType, final boolean initial) {
299         return devState.isFlowStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
300                 statisticsGatheringOnTheFlyService,
301                 getDeviceInfo(),
302                 /*MultipartType.OFPMPFLOW*/ multipartType,
303                 deviceContext,
304                 deviceContext,
305                 initial, multipartReplyTranslator) : emptyFuture;
306     }
307
308     private ListenableFuture<Boolean> collectTableStatistics(final MultipartType multipartType) {
309         return devState.isTableStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
310                 statisticsGatheringService,
311                 getDeviceInfo(),
312                 /*MultipartType.OFPMPTABLE*/ multipartType,
313                 deviceContext,
314                 deviceContext,
315                 false, multipartReplyTranslator) : emptyFuture;
316     }
317
318     private ListenableFuture<Boolean> collectPortStatistics(final MultipartType multipartType) {
319         return devState.isPortStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
320                 statisticsGatheringService,
321                 getDeviceInfo(),
322                 /*MultipartType.OFPMPPORTSTATS*/ multipartType,
323                 deviceContext,
324                 deviceContext,
325                 false, multipartReplyTranslator) : emptyFuture;
326     }
327
328     private ListenableFuture<Boolean> collectQueueStatistics(final MultipartType multipartType) {
329         return !devState.isQueueStatisticsAvailable() ? emptyFuture : StatisticsGatheringUtils.gatherStatistics(
330                 statisticsGatheringService,
331                 getDeviceInfo(),
332                 /*MultipartType.OFPMPQUEUE*/ multipartType,
333                 deviceContext,
334                 deviceContext,
335                 false, multipartReplyTranslator);
336     }
337
338     private ListenableFuture<Boolean> collectGroupDescStatistics(final MultipartType multipartType) {
339         return devState.isGroupAvailable() ? StatisticsGatheringUtils.gatherStatistics(
340                 statisticsGatheringService,
341                 getDeviceInfo(),
342                 /*MultipartType.OFPMPGROUPDESC*/ multipartType,
343                 deviceContext,
344                 deviceContext,
345                 false, multipartReplyTranslator) : emptyFuture;
346     }
347
348     private ListenableFuture<Boolean> collectGroupStatistics(final MultipartType multipartType) {
349         return devState.isGroupAvailable() ? StatisticsGatheringUtils.gatherStatistics(
350                 statisticsGatheringService,
351                 getDeviceInfo(),
352                 /*MultipartType.OFPMPGROUP*/ multipartType,
353                 deviceContext,
354                 deviceContext,
355                 false, multipartReplyTranslator) : emptyFuture;
356     }
357
358     private ListenableFuture<Boolean> collectMeterConfigStatistics(final MultipartType multipartType) {
359         return devState.isMetersAvailable() ? StatisticsGatheringUtils.gatherStatistics(
360                 statisticsGatheringService,
361                 getDeviceInfo(),
362                 /*MultipartType.OFPMPMETERCONFIG*/ multipartType,
363                 deviceContext,
364                 deviceContext,
365                 false, multipartReplyTranslator) : emptyFuture;
366     }
367
368     private ListenableFuture<Boolean> collectMeterStatistics(final MultipartType multipartType) {
369         return devState.isMetersAvailable() ? StatisticsGatheringUtils.gatherStatistics(
370                 statisticsGatheringService,
371                 getDeviceInfo(),
372                 /*MultipartType.OFPMPMETER*/ multipartType,
373                 deviceContext,
374                 deviceContext,
375                 false, multipartReplyTranslator) : emptyFuture;
376     }
377
378     @VisibleForTesting
379     void setStatisticsGatheringService(final StatisticsGatheringService statisticsGatheringService) {
380         this.statisticsGatheringService = statisticsGatheringService;
381     }
382
383     @VisibleForTesting
384     void setStatisticsGatheringOnTheFlyService(final StatisticsGatheringOnTheFlyService
385                                                              statisticsGatheringOnTheFlyService) {
386         this.statisticsGatheringOnTheFlyService = statisticsGatheringOnTheFlyService;
387     }
388
389     @Override
390     public ItemLifecycleListener getItemLifeCycleListener () {
391         return itemLifeCycleListener;
392     }
393
394     @Override
395     public CONTEXT_STATE getState() {
396         return this.state;
397     }
398
399     @Override
400     public void setState(CONTEXT_STATE state) {
401         this.state = state;
402     }
403
404     @Override
405     public ServiceGroupIdentifier getServiceIdentifier() {
406         return this.deviceInfo.getServiceIdentifier();
407     }
408
409     @Override
410     public DeviceInfo getDeviceInfo() {
411         return this.deviceInfo;
412     }
413
414     @Override
415     public void startupClusterServices() throws ExecutionException, InterruptedException {
416         if (!this.shuttingDownStatisticsPolling) {
417             this.statListForCollectingInitialization();
418             this.initialGatherDynamicData();
419             myManager.startScheduling(deviceInfo);
420         }
421     }
422
423     @Override
424     public ListenableFuture<Void> stopClusterServices() {
425         myManager.stopScheduling(deviceInfo);
426         return Futures.immediateFuture(null);
427     }
428 }