Bug 5596 Change of start up services
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / statistics / StatisticsContextImpl.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.impl.statistics;
10
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.collect.ImmutableList;
14 import com.google.common.collect.Iterators;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.SettableFuture;
19 import io.netty.util.Timeout;
20 import java.util.ArrayList;
21 import java.util.Collection;
22 import java.util.HashSet;
23 import java.util.Iterator;
24 import java.util.List;
25 import java.util.Optional;
26 import java.util.concurrent.ExecutionException;
27 import javax.annotation.Nonnull;
28 import javax.annotation.Nullable;
29 import javax.annotation.concurrent.GuardedBy;
30 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
31 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
32 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
33 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
34 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
35 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
36 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleConductor;
37 import org.opendaylight.openflowplugin.api.openflow.rpc.listener.ItemLifecycleListener;
38 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
39 import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
40 import org.opendaylight.openflowplugin.impl.rpc.listener.ItemLifecycleListenerImpl;
41 import org.opendaylight.openflowplugin.impl.services.RequestContextUtil;
42 import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringOnTheFlyService;
43 import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringService;
44 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48
49 class StatisticsContextImpl implements StatisticsContext {
50
51     private static final Logger LOG = LoggerFactory.getLogger(StatisticsContextImpl.class);
52     private static final String CONNECTION_CLOSED = "Connection closed.";
53
54     private final ItemLifecycleListener itemLifeCycleListener;
55     private final Collection<RequestContext<?>> requestContexts = new HashSet<>();
56     private final DeviceContext deviceContext;
57     private final DeviceState devState;
58     private final ListenableFuture<Boolean> emptyFuture;
59     private final boolean shuttingDownStatisticsPolling;
60     private final Object COLLECTION_STAT_TYPE_LOCK = new Object();
61     private final SinglePurposeMultipartReplyTranslator multipartReplyTranslator;
62     @GuardedBy("COLLECTION_STAT_TYPE_LOCK")
63     private List<MultipartType> collectingStatType;
64
65     private StatisticsGatheringService statisticsGatheringService;
66     private StatisticsGatheringOnTheFlyService statisticsGatheringOnTheFlyService;
67     private Timeout pollTimeout;
68     private final DeviceInfo deviceInfo;
69
70     private volatile boolean schedulingEnabled;
71     private volatile CONTEXT_STATE state;
72
73     StatisticsContextImpl(@Nonnull final DeviceInfo deviceInfo,
74                           @Nonnull final boolean shuttingDownStatisticsPolling,
75                           @Nonnull final LifecycleConductor lifecycleConductor,
76                           @Nonnull final ConvertorExecutor convertorExecutor) {
77         this.deviceContext = Preconditions.checkNotNull(lifecycleConductor.getDeviceContext(deviceInfo));
78         this.devState = Preconditions.checkNotNull(deviceContext.getDeviceState());
79         this.shuttingDownStatisticsPolling = shuttingDownStatisticsPolling;
80         multipartReplyTranslator = new SinglePurposeMultipartReplyTranslator(convertorExecutor);
81         emptyFuture = Futures.immediateFuture(false);
82         statisticsGatheringService = new StatisticsGatheringService(this, deviceContext);
83         statisticsGatheringOnTheFlyService = new StatisticsGatheringOnTheFlyService(this, deviceContext, convertorExecutor);
84         itemLifeCycleListener = new ItemLifecycleListenerImpl(deviceContext);
85         statListForCollectingInitialization();
86         setState(CONTEXT_STATE.INITIALIZATION);
87         this.deviceInfo = deviceInfo;
88     }
89
90     @Override
91     public void statListForCollectingInitialization() {
92         synchronized (COLLECTION_STAT_TYPE_LOCK) {
93             final List<MultipartType> statListForCollecting = new ArrayList<>();
94             if (devState.isTableStatisticsAvailable()) {
95                 statListForCollecting.add(MultipartType.OFPMPTABLE);
96             }
97             if (devState.isFlowStatisticsAvailable()) {
98                 statListForCollecting.add(MultipartType.OFPMPFLOW);
99             }
100             if (devState.isGroupAvailable()) {
101                 statListForCollecting.add(MultipartType.OFPMPGROUPDESC);
102                 statListForCollecting.add(MultipartType.OFPMPGROUP);
103             }
104             if (devState.isMetersAvailable()) {
105                 statListForCollecting.add(MultipartType.OFPMPMETERCONFIG);
106                 statListForCollecting.add(MultipartType.OFPMPMETER);
107             }
108             if (devState.isPortStatisticsAvailable()) {
109                 statListForCollecting.add(MultipartType.OFPMPPORTSTATS);
110             }
111             if (devState.isQueueStatisticsAvailable()) {
112                 statListForCollecting.add(MultipartType.OFPMPQUEUE);
113             }
114             collectingStatType = ImmutableList.<MultipartType>copyOf(statListForCollecting);
115         }
116     }
117
118
119     @Override
120     public ListenableFuture<Boolean> initialGatherDynamicData() {
121         return gatherDynamicData(true);
122     }
123
124     @Override
125     public ListenableFuture<Boolean> gatherDynamicData(){
126         return gatherDynamicData(false);
127     }
128
129     private ListenableFuture<Boolean> gatherDynamicData(final boolean initial) {
130         if (shuttingDownStatisticsPolling) {
131             LOG.debug("Statistics for device {} is not enabled.", getDeviceInfo().getNodeId());
132             return Futures.immediateFuture(Boolean.TRUE);
133         }
134         final ListenableFuture<Boolean> errorResultFuture = deviceConnectionCheck();
135         if (errorResultFuture != null) {
136             return errorResultFuture;
137         }
138         synchronized (COLLECTION_STAT_TYPE_LOCK) {
139             final Iterator<MultipartType> statIterator = collectingStatType.iterator();
140             final SettableFuture<Boolean> settableStatResultFuture = SettableFuture.create();
141
142             // write start timestamp to state snapshot container
143             StatisticsGatheringUtils.markDeviceStateSnapshotStart(deviceContext);
144
145             statChainFuture(statIterator, settableStatResultFuture, initial);
146
147             // write end timestamp to state snapshot container
148             Futures.addCallback(settableStatResultFuture, new FutureCallback<Boolean>() {
149                 @Override
150                 public void onSuccess(@Nullable final Boolean result) {
151                     StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceContext, true);
152                 }
153                 @Override
154                 public void onFailure(final Throwable t) {
155                     StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceContext, false);
156                 }
157             });
158             return settableStatResultFuture;
159         }
160     }
161
162     private ListenableFuture<Boolean> chooseStat(final MultipartType multipartType, final boolean initial){
163         switch (multipartType) {
164             case OFPMPFLOW:
165                 return collectFlowStatistics(multipartType, initial);
166             case OFPMPTABLE:
167                 return collectTableStatistics(multipartType);
168             case OFPMPPORTSTATS:
169                 return collectPortStatistics(multipartType);
170             case OFPMPQUEUE:
171                 return collectQueueStatistics(multipartType);
172             case OFPMPGROUPDESC:
173                 return collectGroupDescStatistics(multipartType);
174             case OFPMPGROUP:
175                 return collectGroupStatistics(multipartType);
176             case OFPMPMETERCONFIG:
177                 return collectMeterConfigStatistics(multipartType);
178             case OFPMPMETER:
179                 return collectMeterStatistics(multipartType);
180             default:
181                 LOG.warn("Unsuported Statistics type {}", multipartType);
182                 return Futures.immediateCheckedFuture(Boolean.TRUE);
183         }
184     }
185
186
187     @Override
188     public <T> RequestContext<T> createRequestContext() {
189         final AbstractRequestContext<T> ret = new AbstractRequestContext<T>(deviceContext.reserveXidForDeviceMessage()) {
190             @Override
191             public void close() {
192                 requestContexts.remove(this);
193             }
194         };
195         requestContexts.add(ret);
196         return ret;
197     }
198
199     @Override
200     public void close() {
201         if (CONTEXT_STATE.TERMINATION.equals(getState())) {
202             if (LOG.isDebugEnabled()) {
203                 LOG.debug("Statistics context is already in state TERMINATION.");
204             }
205         } else {
206             setState(CONTEXT_STATE.TERMINATION);
207             schedulingEnabled = false;
208             for (final Iterator<RequestContext<?>> iterator = Iterators.consumingIterator(requestContexts.iterator());
209                  iterator.hasNext(); ) {
210                 RequestContextUtil.closeRequestContextWithRpcError(iterator.next(), CONNECTION_CLOSED);
211             }
212             if (null != pollTimeout && !pollTimeout.isExpired()) {
213                 pollTimeout.cancel();
214             }
215         }
216     }
217
218     @Override
219     public void setSchedulingEnabled(final boolean schedulingEnabled) {
220         this.schedulingEnabled = schedulingEnabled;
221     }
222
223     @Override
224     public boolean isSchedulingEnabled() {
225         return schedulingEnabled;
226     }
227
228     @Override
229     public void setPollTimeout(final Timeout pollTimeout) {
230         this.pollTimeout = pollTimeout;
231     }
232
233     @Override
234     public Optional<Timeout> getPollTimeout() {
235         return Optional.ofNullable(pollTimeout);
236     }
237
238     private void statChainFuture(final Iterator<MultipartType> iterator, final SettableFuture<Boolean> resultFuture, final boolean initial) {
239         if (ConnectionContext.CONNECTION_STATE.RIP.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) {
240             final String errMsg = String.format("Device connection is closed for Node : %s.",
241                     getDeviceInfo().getNodeId());
242             LOG.debug(errMsg);
243             resultFuture.setException(new IllegalStateException(errMsg));
244             return;
245         }
246         if ( ! iterator.hasNext()) {
247             resultFuture.set(Boolean.TRUE);
248             LOG.debug("Stats collection successfully finished for node {}", getDeviceInfo().getNodeId());
249             return;
250         }
251
252         final MultipartType nextType = iterator.next();
253         LOG.debug("Stats iterating to next type for node {} of type {}", getDeviceInfo().getNodeId(), nextType);
254
255         final ListenableFuture<Boolean> deviceStatisticsCollectionFuture = chooseStat(nextType, initial);
256         Futures.addCallback(deviceStatisticsCollectionFuture, new FutureCallback<Boolean>() {
257             @Override
258             public void onSuccess(final Boolean result) {
259                 statChainFuture(iterator, resultFuture, initial);
260             }
261             @Override
262             public void onFailure(@Nonnull final Throwable t) {
263                 resultFuture.setException(t);
264             }
265         });
266     }
267
268     /**
269      * Method checks a device state. It returns null for be able continue. Otherwise it returns immediateFuture
270      * which has to be returned from caller too
271      *
272      * @return
273      */
274     @VisibleForTesting
275     ListenableFuture<Boolean> deviceConnectionCheck() {
276         if (!ConnectionContext.CONNECTION_STATE.WORKING.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) {
277             ListenableFuture<Boolean> resultingFuture;
278             switch (deviceContext.getPrimaryConnectionContext().getConnectionState()) {
279                 case RIP:
280                     final String errMsg = String.format("Device connection doesn't exist anymore. Primary connection status : %s",
281                             deviceContext.getPrimaryConnectionContext().getConnectionState());
282                     resultingFuture = Futures.immediateFailedFuture(new Throwable(errMsg));
283                     break;
284                 default:
285                     resultingFuture = Futures.immediateCheckedFuture(Boolean.TRUE);
286                     break;
287             }
288             return resultingFuture;
289         }
290         return null;
291     }
292
293     //TODO: Refactor twice sending deviceContext into gatheringStatistics
294     private ListenableFuture<Boolean> collectFlowStatistics(final MultipartType multipartType, final boolean initial) {
295         return devState.isFlowStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
296                 statisticsGatheringOnTheFlyService,
297                 getDeviceInfo(),
298                 /*MultipartType.OFPMPFLOW*/ multipartType,
299                 deviceContext,
300                 deviceContext,
301                 initial, multipartReplyTranslator) : emptyFuture;
302     }
303
304     private ListenableFuture<Boolean> collectTableStatistics(final MultipartType multipartType) {
305         return devState.isTableStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
306                 statisticsGatheringService,
307                 getDeviceInfo(),
308                 /*MultipartType.OFPMPTABLE*/ multipartType,
309                 deviceContext,
310                 deviceContext,
311                 false, multipartReplyTranslator) : emptyFuture;
312     }
313
314     private ListenableFuture<Boolean> collectPortStatistics(final MultipartType multipartType) {
315         return devState.isPortStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
316                 statisticsGatheringService,
317                 getDeviceInfo(),
318                 /*MultipartType.OFPMPPORTSTATS*/ multipartType,
319                 deviceContext,
320                 deviceContext,
321                 false, multipartReplyTranslator) : emptyFuture;
322     }
323
324     private ListenableFuture<Boolean> collectQueueStatistics(final MultipartType multipartType) {
325         return !devState.isQueueStatisticsAvailable() ? emptyFuture : StatisticsGatheringUtils.gatherStatistics(
326                 statisticsGatheringService,
327                 getDeviceInfo(),
328                 /*MultipartType.OFPMPQUEUE*/ multipartType,
329                 deviceContext,
330                 deviceContext,
331                 false, multipartReplyTranslator);
332     }
333
334     private ListenableFuture<Boolean> collectGroupDescStatistics(final MultipartType multipartType) {
335         return devState.isGroupAvailable() ? StatisticsGatheringUtils.gatherStatistics(
336                 statisticsGatheringService,
337                 getDeviceInfo(),
338                 /*MultipartType.OFPMPGROUPDESC*/ multipartType,
339                 deviceContext,
340                 deviceContext,
341                 false, multipartReplyTranslator) : emptyFuture;
342     }
343
344     private ListenableFuture<Boolean> collectGroupStatistics(final MultipartType multipartType) {
345         return devState.isGroupAvailable() ? StatisticsGatheringUtils.gatherStatistics(
346                 statisticsGatheringService,
347                 getDeviceInfo(),
348                 /*MultipartType.OFPMPGROUP*/ multipartType,
349                 deviceContext,
350                 deviceContext,
351                 false, multipartReplyTranslator) : emptyFuture;
352     }
353
354     private ListenableFuture<Boolean> collectMeterConfigStatistics(final MultipartType multipartType) {
355         return devState.isMetersAvailable() ? StatisticsGatheringUtils.gatherStatistics(
356                 statisticsGatheringService,
357                 getDeviceInfo(),
358                 /*MultipartType.OFPMPMETERCONFIG*/ multipartType,
359                 deviceContext,
360                 deviceContext,
361                 false, multipartReplyTranslator) : emptyFuture;
362     }
363
364     private ListenableFuture<Boolean> collectMeterStatistics(final MultipartType multipartType) {
365         return devState.isMetersAvailable() ? StatisticsGatheringUtils.gatherStatistics(
366                 statisticsGatheringService,
367                 getDeviceInfo(),
368                 /*MultipartType.OFPMPMETER*/ multipartType,
369                 deviceContext,
370                 deviceContext,
371                 false, multipartReplyTranslator) : emptyFuture;
372     }
373
374     @VisibleForTesting
375     void setStatisticsGatheringService(final StatisticsGatheringService statisticsGatheringService) {
376         this.statisticsGatheringService = statisticsGatheringService;
377     }
378
379     @VisibleForTesting
380     void setStatisticsGatheringOnTheFlyService(final StatisticsGatheringOnTheFlyService
381                                                              statisticsGatheringOnTheFlyService) {
382         this.statisticsGatheringOnTheFlyService = statisticsGatheringOnTheFlyService;
383     }
384
385     @Override
386     public ItemLifecycleListener getItemLifeCycleListener () {
387         return itemLifeCycleListener;
388     }
389
390     @Override
391     public CONTEXT_STATE getState() {
392         return this.state;
393     }
394
395     @Override
396     public void setState(CONTEXT_STATE state) {
397         this.state = state;
398     }
399
400     @Override
401     public ServiceGroupIdentifier getServiceIdentifier() {
402         return this.deviceInfo.getServiceIdentifier();
403     }
404
405     @Override
406     public DeviceInfo getDeviceInfo() {
407         return this.deviceInfo;
408     }
409
410     @Override
411     public void startupClusterServices() throws ExecutionException, InterruptedException {
412         this.statListForCollectingInitialization();
413         this.initialGatherDynamicData();
414     }
415
416     @Override
417     public ListenableFuture<Void> stopClusterServices() {
418         return Futures.immediateFuture(null);
419     }
420 }