Merge "Report (TCP) port number for switches"
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / statistics / StatisticsContextImpl.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.impl.statistics;
10
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Function;
13 import com.google.common.base.Preconditions;
14 import com.google.common.collect.ImmutableList;
15 import com.google.common.collect.Iterators;
16 import com.google.common.util.concurrent.AsyncFunction;
17 import com.google.common.util.concurrent.FutureCallback;
18 import com.google.common.util.concurrent.Futures;
19 import com.google.common.util.concurrent.ListenableFuture;
20 import io.netty.util.Timeout;
21 import java.util.ArrayList;
22 import java.util.Collection;
23 import java.util.HashSet;
24 import java.util.Iterator;
25 import java.util.List;
26 import java.util.Objects;
27 import javax.annotation.Nonnull;
28 import javax.annotation.Nullable;
29 import javax.annotation.concurrent.GuardedBy;
30 import org.opendaylight.mdsal.common.api.TransactionChainClosedException;
31 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
32 import org.opendaylight.openflowplugin.api.ConnectionException;
33 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
34 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
35 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
36 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
37 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
38 import org.opendaylight.openflowplugin.api.openflow.device.handlers.ClusterInitializationPhaseHandler;
39 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState;
40 import org.opendaylight.openflowplugin.api.openflow.lifecycle.MastershipChangeListener;
41 import org.opendaylight.openflowplugin.api.openflow.rpc.listener.ItemLifecycleListener;
42 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
43 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
44 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider;
45 import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
46 import org.opendaylight.openflowplugin.impl.rpc.listener.ItemLifecycleListenerImpl;
47 import org.opendaylight.openflowplugin.impl.services.util.RequestContextUtil;
48 import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringOnTheFlyService;
49 import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringService;
50 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55
56 class StatisticsContextImpl<T extends OfHeader> implements StatisticsContext {
57
58     private static final Logger LOG = LoggerFactory.getLogger(StatisticsContextImpl.class);
59     private static final String CONNECTION_CLOSED = "Connection closed.";
60
61     private final ItemLifecycleListener itemLifeCycleListener;
62     private final Collection<RequestContext<?>> requestContexts = new HashSet<>();
63     private final DeviceContext deviceContext;
64     private final DeviceState devState;
65     private final boolean isStatisticsPollingOn;
66     private final Object collectionStatTypeLock = new Object();
67     private final ConvertorExecutor convertorExecutor;
68     private final MultipartWriterProvider statisticsWriterProvider;
69     @GuardedBy("collectionStatTypeLock")
70     private List<MultipartType> collectingStatType;
71
72     private StatisticsGatheringService<T> statisticsGatheringService;
73     private StatisticsGatheringOnTheFlyService<T> statisticsGatheringOnTheFlyService;
74     private Timeout pollTimeout;
75     private final DeviceInfo deviceInfo;
76     private final StatisticsManager myManager;
77
78     private volatile boolean schedulingEnabled;
79     private volatile ContextState state;
80     private ClusterInitializationPhaseHandler clusterInitializationPhaseHandler;
81     private ClusterInitializationPhaseHandler initialSubmitHandler;
82
83     private volatile ListenableFuture<Boolean> lastDataGathering;
84
85     StatisticsContextImpl(final boolean isStatisticsPollingOn,
86                           @Nonnull final DeviceContext deviceContext,
87                           @Nonnull final ConvertorExecutor convertorExecutor,
88                           @Nonnull final StatisticsManager myManager,
89                           @Nonnull final MultipartWriterProvider statisticsWriterProvider) {
90         this.deviceContext = deviceContext;
91         this.devState = Preconditions.checkNotNull(deviceContext.getDeviceState());
92         this.isStatisticsPollingOn = isStatisticsPollingOn;
93         this.convertorExecutor = convertorExecutor;
94         statisticsGatheringService = new StatisticsGatheringService<>(this, deviceContext);
95         statisticsGatheringOnTheFlyService = new StatisticsGatheringOnTheFlyService<>(this,
96             deviceContext, convertorExecutor, statisticsWriterProvider);
97         itemLifeCycleListener = new ItemLifecycleListenerImpl(deviceContext);
98         statListForCollectingInitialization();
99         this.state = ContextState.INITIALIZATION;
100         this.deviceInfo = deviceContext.getDeviceInfo();
101         this.myManager = myManager;
102         this.lastDataGathering = null;
103         this.statisticsWriterProvider = statisticsWriterProvider;
104     }
105
106     @Override
107     public void statListForCollectingInitialization() {
108         synchronized (collectionStatTypeLock) {
109             final List<MultipartType> statListForCollecting = new ArrayList<>();
110             if (devState.isTableStatisticsAvailable()) {
111                 statListForCollecting.add(MultipartType.OFPMPTABLE);
112             }
113             if (devState.isFlowStatisticsAvailable()) {
114                 statListForCollecting.add(MultipartType.OFPMPFLOW);
115             }
116             if (devState.isGroupAvailable()) {
117                 statListForCollecting.add(MultipartType.OFPMPGROUPDESC);
118                 statListForCollecting.add(MultipartType.OFPMPGROUP);
119             }
120             if (devState.isMetersAvailable()) {
121                 statListForCollecting.add(MultipartType.OFPMPMETERCONFIG);
122                 statListForCollecting.add(MultipartType.OFPMPMETER);
123             }
124             if (devState.isPortStatisticsAvailable()) {
125                 statListForCollecting.add(MultipartType.OFPMPPORTSTATS);
126             }
127             if (devState.isQueueStatisticsAvailable()) {
128                 statListForCollecting.add(MultipartType.OFPMPQUEUE);
129             }
130             collectingStatType = ImmutableList.copyOf(statListForCollecting);
131         }
132     }
133
134     @Override
135     public ListenableFuture<Boolean> gatherDynamicData() {
136         if (!isStatisticsPollingOn) {
137             LOG.debug("Statistics for device {} is not enabled.", getDeviceInfo().getNodeId().getValue());
138             return Futures.immediateFuture(Boolean.TRUE);
139         }
140
141         if (Objects.isNull(lastDataGathering)
142                 || lastDataGathering.isCancelled()
143                 || lastDataGathering.isDone()) {
144             lastDataGathering = Futures.immediateFuture(Boolean.TRUE);
145         }
146
147         synchronized (collectionStatTypeLock) {
148             // write start timestamp to state snapshot container
149             StatisticsGatheringUtils.markDeviceStateSnapshotStart(deviceContext);
150
151             lastDataGathering = collectingStatType.stream().reduce(
152                     lastDataGathering,
153                     this::statChainFuture,
154                     (a, b) -> Futures.transformAsync(a, (AsyncFunction<Boolean, Boolean>) result -> b));
155
156             // write end timestamp to state snapshot container
157             Futures.addCallback(lastDataGathering, new FutureCallback<Boolean>() {
158                 @Override
159                 public void onSuccess(final Boolean result) {
160                     StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceContext, result);
161                 }
162
163                 @Override
164                 public void onFailure(final Throwable t) {
165                     if (!(t instanceof TransactionChainClosedException)) {
166                         StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceContext, false);
167                     }
168                 }
169             });
170         }
171
172         return lastDataGathering;
173     }
174
175     private ListenableFuture<Boolean> chooseStat(final MultipartType multipartType){
176         ListenableFuture<Boolean> result = Futures.immediateCheckedFuture(Boolean.TRUE);
177
178         switch (multipartType) {
179             case OFPMPFLOW:
180                 result = collectStatistics(multipartType, devState.isFlowStatisticsAvailable(), true);
181                 break;
182             case OFPMPTABLE:
183                 result = collectStatistics(multipartType, devState.isTableStatisticsAvailable(), false);
184                 break;
185             case OFPMPPORTSTATS:
186                 result = collectStatistics(multipartType, devState.isPortStatisticsAvailable(), false);
187                 break;
188             case OFPMPQUEUE:
189                 result = collectStatistics(multipartType, devState.isQueueStatisticsAvailable(), false);
190                 break;
191             case OFPMPGROUPDESC:
192                 result = collectStatistics(multipartType, devState.isGroupAvailable(), false);
193                 break;
194             case OFPMPGROUP:
195                 result = collectStatistics(multipartType, devState.isGroupAvailable(), false);
196                 break;
197             case OFPMPMETERCONFIG:
198                 result = collectStatistics(multipartType, devState.isMetersAvailable(), false);
199                 break;
200             case OFPMPMETER:
201                 result = collectStatistics(multipartType, devState.isMetersAvailable(), false);
202                 break;
203             default:
204                 LOG.warn("Unsupported Statistics type {}", multipartType);
205         }
206
207         return result;
208     }
209
210
211     @Override
212     public <T> RequestContext<T> createRequestContext() {
213         final AbstractRequestContext<T> ret = new AbstractRequestContext<T>(deviceInfo.reserveXidForDeviceMessage()) {
214             @Override
215             public void close() {
216                 requestContexts.remove(this);
217             }
218         };
219         requestContexts.add(ret);
220         return ret;
221     }
222
223     @Override
224     public void close() {
225         if (ContextState.TERMINATION.equals(state)) {
226             if (LOG.isDebugEnabled()) {
227                 LOG.debug("StatisticsContext for node {} is already in TERMINATION state.", getDeviceInfo().getLOGValue());
228             }
229         } else {
230             this.state = ContextState.TERMINATION;
231             stopGatheringData();
232
233             for (final Iterator<RequestContext<?>> iterator = Iterators.consumingIterator(requestContexts.iterator());
234                  iterator.hasNext(); ) {
235                 RequestContextUtil.closeRequestContextWithRpcError(iterator.next(), CONNECTION_CLOSED);
236             }
237         }
238     }
239
240     @Override
241     public void setSchedulingEnabled(final boolean schedulingEnabled) {
242         this.schedulingEnabled = schedulingEnabled;
243     }
244
245     @Override
246     public boolean isSchedulingEnabled() {
247         return schedulingEnabled;
248     }
249
250     @Override
251     public void setPollTimeout(final Timeout pollTimeout) {
252         this.pollTimeout = pollTimeout;
253     }
254
255     private ListenableFuture<Boolean> statChainFuture(final ListenableFuture<Boolean> prevFuture, final MultipartType multipartType) {
256         return Futures.transformAsync(deviceConnectionCheck(), (AsyncFunction<Boolean, Boolean>) connectionResult -> Futures
257                 .transformAsync(prevFuture, (AsyncFunction<Boolean, Boolean>) result -> {
258                     LOG.debug("Status of previous stat iteration for node {}: {}", deviceInfo.getLOGValue(), result);
259                     LOG.debug("Stats iterating to next type for node {} of type {}",
260                             deviceInfo.getLOGValue(),
261                             multipartType);
262
263                     return chooseStat(multipartType);
264                 }));
265     }
266
267     @VisibleForTesting
268     ListenableFuture<Boolean> deviceConnectionCheck() {
269         if (ConnectionContext.CONNECTION_STATE.RIP.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) {
270             final String errMsg = String
271                     .format("Device connection for node %s doesn't exist anymore. Primary connection status : %s",
272                             getDeviceInfo().getNodeId(),
273                             deviceContext.getPrimaryConnectionContext().getConnectionState());
274
275             return Futures.immediateFailedFuture(new ConnectionException(errMsg));
276         }
277
278         return Futures.immediateFuture(Boolean.TRUE);
279     }
280
281     private ListenableFuture<Boolean> collectStatistics(final MultipartType multipartType,
282                                                         final boolean supported,
283                                                         final boolean onTheFly) {
284         // TODO: Refactor twice sending deviceContext into gatheringStatistics
285         return supported ? StatisticsGatheringUtils.gatherStatistics(
286                 onTheFly ? statisticsGatheringOnTheFlyService : statisticsGatheringService,
287                 getDeviceInfo(),
288                 multipartType,
289                 deviceContext,
290                 deviceContext,
291                 convertorExecutor,
292                 statisticsWriterProvider) : Futures.immediateFuture(Boolean.FALSE);
293     }
294
295     @VisibleForTesting
296     void setStatisticsGatheringService(final StatisticsGatheringService<T> statisticsGatheringService) {
297         this.statisticsGatheringService = statisticsGatheringService;
298     }
299
300     @VisibleForTesting
301     void setStatisticsGatheringOnTheFlyService(final StatisticsGatheringOnTheFlyService<T> statisticsGatheringOnTheFlyService) {
302         this.statisticsGatheringOnTheFlyService = statisticsGatheringOnTheFlyService;
303     }
304
305     @Override
306     public ItemLifecycleListener getItemLifeCycleListener () {
307         return itemLifeCycleListener;
308     }
309
310     @Override
311     public ServiceGroupIdentifier getServiceIdentifier() {
312         return this.deviceInfo.getServiceIdentifier();
313     }
314
315     @Override
316     public DeviceInfo getDeviceInfo() {
317         return this.deviceInfo;
318     }
319
320     @Override
321     public ListenableFuture<Void> stopClusterServices() {
322         if (ContextState.TERMINATION.equals(this.state)) {
323             return Futures.immediateCancelledFuture();
324         }
325
326         return Futures.transform(Futures.immediateFuture(null), new Function<Object, Void>() {
327             @Nullable
328             @Override
329             public Void apply(@Nullable Object input) {
330                 schedulingEnabled = false;
331                 stopGatheringData();
332                 return null;
333             }
334         });
335     }
336
337     @Override
338     public DeviceState gainDeviceState() {
339         return gainDeviceContext().getDeviceState();
340     }
341
342     @Override
343     public DeviceContext gainDeviceContext() {
344         return this.deviceContext;
345     }
346
347     @Override
348     public void stopGatheringData() {
349         LOG.info("Stopping running statistics gathering for node {}", deviceInfo.getLOGValue());
350
351         if (Objects.nonNull(lastDataGathering) && !lastDataGathering.isDone() && !lastDataGathering.isCancelled()) {
352             lastDataGathering.cancel(true);
353         }
354
355         if (Objects.nonNull(pollTimeout) && !pollTimeout.isExpired()) {
356             pollTimeout.cancel();
357         }
358     }
359
360     @Override
361     public void setLifecycleInitializationPhaseHandler(final ClusterInitializationPhaseHandler handler) {
362         this.clusterInitializationPhaseHandler = handler;
363     }
364
365     @Override
366     public boolean onContextInstantiateService(final MastershipChangeListener mastershipChangeListener) {
367         LOG.info("Starting statistics context cluster services for node {}", deviceInfo.getLOGValue());
368         this.statListForCollectingInitialization();
369
370         Futures.addCallback(this.gatherDynamicData(), new FutureCallback<Boolean>() {
371             @Override
372             public void onSuccess(@Nullable Boolean aBoolean) {
373                 mastershipChangeListener.onMasterRoleAcquired(
374                         deviceInfo,
375                         ContextChainMastershipState.INITIAL_GATHERING
376                 );
377
378                 if (initialSubmitHandler.initialSubmitTransaction()) {
379                     mastershipChangeListener.onMasterRoleAcquired(
380                             deviceInfo,
381                             ContextChainMastershipState.INITIAL_SUBMIT
382                     );
383
384                     if (isStatisticsPollingOn) {
385                         myManager.startScheduling(deviceInfo);
386                     }
387                 } else {
388                     mastershipChangeListener.onNotAbleToStartMastershipMandatory(
389                             deviceInfo,
390                             "Initial transaction cannot be submitted."
391                     );
392                 }
393             }
394
395             @Override
396             public void onFailure(@Nonnull Throwable throwable) {
397                 mastershipChangeListener.onNotAbleToStartMastershipMandatory(
398                         deviceInfo,
399                         "Initial gathering statistics unsuccessful."
400                 );
401             }
402         });
403
404         return this.clusterInitializationPhaseHandler.onContextInstantiateService(mastershipChangeListener);
405     }
406
407     @Override
408     public void setInitialSubmitHandler(final ClusterInitializationPhaseHandler initialSubmitHandler) {
409         this.initialSubmitHandler = initialSubmitHandler;
410     }
411 }