OPNFLWPLUG-983 Group and flow removal stats are not reported in order
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / statistics / StatisticsContextImpl.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.impl.statistics;
10
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.collect.ImmutableList;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.ListeningExecutorService;
18 import com.google.common.util.concurrent.MoreExecutors;
19 import java.util.ArrayList;
20 import java.util.Collection;
21 import java.util.HashSet;
22 import java.util.List;
23 import java.util.Objects;
24 import java.util.Optional;
25 import java.util.concurrent.atomic.AtomicBoolean;
26 import java.util.concurrent.atomic.AtomicReference;
27 import javax.annotation.Nonnull;
28 import javax.annotation.Nullable;
29 import org.opendaylight.mdsal.common.api.TransactionChainClosedException;
30 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
31 import org.opendaylight.openflowplugin.api.ConnectionException;
32 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
33 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
34 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
35 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
36 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
37 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState;
38 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipWatcher;
39 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
40 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider;
41 import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
42 import org.opendaylight.openflowplugin.impl.services.util.RequestContextUtil;
43 import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringOnTheFlyService;
44 import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringService;
45 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50
51 class StatisticsContextImpl<T extends OfHeader> implements StatisticsContext {
52
53     private static final Logger LOG = LoggerFactory.getLogger(StatisticsContextImpl.class);
54     private static final String CONNECTION_CLOSED = "Connection closed.";
55
56     private final Collection<RequestContext<?>> requestContexts = new HashSet<>();
57     private final DeviceContext deviceContext;
58     private final DeviceState devState;
59     private final ListeningExecutorService executorService;
60     private final boolean isStatisticsPollingOn;
61     private final ConvertorExecutor convertorExecutor;
62     private final MultipartWriterProvider statisticsWriterProvider;
63     private final DeviceInfo deviceInfo;
64     private final TimeCounter timeCounter = new TimeCounter();
65     private final long statisticsPollingInterval;
66     private final long maximumPollingDelay;
67     private final boolean isUsingReconciliationFramework;
68     private final AtomicBoolean schedulingEnabled = new AtomicBoolean(true);
69     private final AtomicReference<ListenableFuture<Boolean>> lastDataGatheringRef = new AtomicReference<>();
70     private final AtomicReference<StatisticsPollingService> statisticsPollingServiceRef = new AtomicReference<>();
71     private List<MultipartType> collectingStatType;
72     private StatisticsGatheringService<T> statisticsGatheringService;
73     private StatisticsGatheringOnTheFlyService<T> statisticsGatheringOnTheFlyService;
74     private ContextChainMastershipWatcher contextChainMastershipWatcher;
75
76     StatisticsContextImpl(@Nonnull final DeviceContext deviceContext,
77                           @Nonnull final ConvertorExecutor convertorExecutor,
78                           @Nonnull final MultipartWriterProvider statisticsWriterProvider,
79                           @Nonnull final ListeningExecutorService executorService, boolean isStatisticsPollingOn,
80                           boolean isUsingReconciliationFramework, long statisticsPollingInterval,
81                           long maximumPollingDelay) {
82         this.deviceContext = deviceContext;
83         this.devState = Preconditions.checkNotNull(deviceContext.getDeviceState());
84         this.executorService = executorService;
85         this.isStatisticsPollingOn = isStatisticsPollingOn;
86         this.convertorExecutor = convertorExecutor;
87         this.deviceInfo = deviceContext.getDeviceInfo();
88         this.statisticsPollingInterval = statisticsPollingInterval;
89         this.maximumPollingDelay = maximumPollingDelay;
90         this.statisticsWriterProvider = statisticsWriterProvider;
91         this.isUsingReconciliationFramework = isUsingReconciliationFramework;
92
93         statisticsGatheringService = new StatisticsGatheringService<>(this, deviceContext);
94         statisticsGatheringOnTheFlyService = new StatisticsGatheringOnTheFlyService<>(this, deviceContext,
95                                                                                       convertorExecutor,
96                                                                                       statisticsWriterProvider);
97     }
98
99     @Override
100     public DeviceInfo getDeviceInfo() {
101         return this.deviceInfo;
102     }
103
104     @Nonnull
105     @Override
106     public ServiceGroupIdentifier getIdentifier() {
107         return deviceInfo.getServiceIdentifier();
108     }
109
110     @Override
111     public void registerMastershipWatcher(@Nonnull final ContextChainMastershipWatcher newWatcher) {
112         this.contextChainMastershipWatcher = newWatcher;
113     }
114
115     @Override
116     public <O> RequestContext<O> createRequestContext() {
117         final AbstractRequestContext<O> ret = new AbstractRequestContext<O>(deviceInfo.reserveXidForDeviceMessage()) {
118             @Override
119             public void close() {
120                 requestContexts.remove(this);
121             }
122         };
123
124         requestContexts.add(ret);
125         return ret;
126     }
127
128     @Override
129     public void enableGathering() {
130         this.schedulingEnabled.set(true);
131     }
132
133     @Override
134     public void disableGathering() {
135         this.schedulingEnabled.set(false);
136     }
137
138     @Override
139     public void continueInitializationAfterReconciliation() {
140         if (deviceContext.initialSubmitTransaction()) {
141             contextChainMastershipWatcher.onMasterRoleAcquired(deviceInfo, ContextChainMastershipState.INITIAL_SUBMIT);
142
143             startGatheringData();
144         } else {
145             contextChainMastershipWatcher
146                     .onNotAbleToStartMastershipMandatory(deviceInfo, "Initial transaction cannot be submitted.");
147         }
148     }
149
150     @Override
151     public void instantiateServiceInstance() {
152         final List<MultipartType> statListForCollecting = new ArrayList<>();
153
154         if (devState.isTableStatisticsAvailable()) {
155             statListForCollecting.add(MultipartType.OFPMPTABLE);
156         }
157
158         if (devState.isGroupAvailable()) {
159             statListForCollecting.add(MultipartType.OFPMPGROUPDESC);
160             statListForCollecting.add(MultipartType.OFPMPGROUP);
161         }
162
163         if (devState.isMetersAvailable()) {
164             statListForCollecting.add(MultipartType.OFPMPMETERCONFIG);
165             statListForCollecting.add(MultipartType.OFPMPMETER);
166         }
167
168         if (devState.isFlowStatisticsAvailable()) {
169             statListForCollecting.add(MultipartType.OFPMPFLOW);
170         }
171
172         if (devState.isPortStatisticsAvailable()) {
173             statListForCollecting.add(MultipartType.OFPMPPORTSTATS);
174         }
175
176         if (devState.isQueueStatisticsAvailable()) {
177             statListForCollecting.add(MultipartType.OFPMPQUEUE);
178         }
179
180         collectingStatType = ImmutableList.copyOf(statListForCollecting);
181         Futures.addCallback(gatherDynamicData(), new InitialSubmitCallback(), MoreExecutors.directExecutor());
182     }
183
184     @Override
185     public ListenableFuture<Void> closeServiceInstance() {
186         return stopGatheringData();
187     }
188
189     @Override
190     public void close() {
191         Futures.addCallback(stopGatheringData(), new FutureCallback<Void>() {
192             @Override
193             public void onSuccess(@Nullable final Void result) {
194                 requestContexts.forEach(requestContext -> RequestContextUtil
195                         .closeRequestContextWithRpcError(requestContext, CONNECTION_CLOSED));
196             }
197
198             @Override
199             public void onFailure(final Throwable throwable) {
200                 requestContexts.forEach(requestContext -> RequestContextUtil
201                         .closeRequestContextWithRpcError(requestContext, CONNECTION_CLOSED));
202             }
203         }, MoreExecutors.directExecutor());
204     }
205
206     private ListenableFuture<Boolean> gatherDynamicData() {
207         if (!isStatisticsPollingOn || !schedulingEnabled.get()) {
208             LOG.debug("Statistics for device {} are not enabled.", getDeviceInfo().getNodeId().getValue());
209             return Futures.immediateFuture(Boolean.TRUE);
210         }
211
212         return this.lastDataGatheringRef.updateAndGet(future -> {
213             // write start timestamp to state snapshot container
214             StatisticsGatheringUtils.markDeviceStateSnapshotStart(deviceInfo, deviceContext);
215
216             // recreate gathering future if it should be recreated
217             final ListenableFuture<Boolean> lastDataGathering =
218                     Objects.isNull(future) || future.isCancelled() || future.isDone() ? Futures
219                             .immediateFuture(Boolean.TRUE) : future;
220
221             // build statistics gathering future
222             final ListenableFuture<Boolean> newDataGathering = collectingStatType.stream()
223                     .reduce(lastDataGathering, this::statChainFuture,
224                         (listenableFuture, asyn) -> Futures.transformAsync(listenableFuture, result -> asyn,
225                                 MoreExecutors.directExecutor()));
226
227             // write end timestamp to state snapshot container
228             Futures.addCallback(newDataGathering, new FutureCallback<Boolean>() {
229                 @Override
230                 public void onSuccess(final Boolean result) {
231                     StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceInfo, deviceContext, result);
232                 }
233
234                 @Override
235                 public void onFailure(final Throwable throwable) {
236                     if (!(throwable instanceof TransactionChainClosedException)) {
237                         StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceInfo, deviceContext, false);
238                     }
239                 }
240             }, MoreExecutors.directExecutor());
241
242             return newDataGathering;
243         });
244     }
245
246     private ListenableFuture<Boolean> statChainFuture(final ListenableFuture<Boolean> prevFuture,
247                                                       final MultipartType multipartType) {
248         if (ConnectionContext.CONNECTION_STATE.RIP
249                 .equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) {
250             final String errMsg = String
251                     .format("Device connection for node %s doesn't exist anymore. Primary connection status : %s",
252                             getDeviceInfo().getNodeId(),
253                             deviceContext.getPrimaryConnectionContext().getConnectionState());
254
255             return Futures.immediateFailedFuture(new ConnectionException(errMsg));
256         }
257
258         return Futures.transformAsync(prevFuture, result -> {
259             LOG.debug("Status of previous stat iteration for node {}: {}", deviceInfo, result);
260             LOG.debug("Stats iterating to next type for node {} of type {}", deviceInfo, multipartType);
261             final boolean onTheFly = MultipartType.OFPMPFLOW.equals(multipartType);
262             final boolean supported = collectingStatType.contains(multipartType);
263
264             // TODO: Refactor twice sending deviceContext into gatheringStatistics
265             return supported ? StatisticsGatheringUtils
266                     .gatherStatistics(onTheFly ? statisticsGatheringOnTheFlyService : statisticsGatheringService,
267                                       getDeviceInfo(), multipartType, deviceContext, deviceContext, convertorExecutor,
268                                       statisticsWriterProvider, executorService) : Futures
269                     .immediateFuture(Boolean.FALSE);
270         }, MoreExecutors.directExecutor());
271     }
272
273     private void startGatheringData() {
274         if (!isStatisticsPollingOn) {
275             return;
276         }
277
278         LOG.info("Starting statistics gathering for node {}", deviceInfo);
279         final StatisticsPollingService statisticsPollingService =
280                 new StatisticsPollingService(timeCounter,
281                                              statisticsPollingInterval,
282                                              maximumPollingDelay,
283                                              StatisticsContextImpl.this::gatherDynamicData);
284
285         schedulingEnabled.set(true);
286         statisticsPollingService.startAsync();
287         this.statisticsPollingServiceRef.set(statisticsPollingService);
288     }
289
290     private ListenableFuture<Void> stopGatheringData() {
291         LOG.info("Stopping running statistics gathering for node {}", deviceInfo);
292         cancelLastDataGathering();
293
294         return Optional.ofNullable(statisticsPollingServiceRef.getAndSet(null)).map(StatisticsPollingService::stop)
295                 .orElseGet(() -> Futures.immediateFuture(null));
296     }
297
298     private void cancelLastDataGathering() {
299         final ListenableFuture<Boolean> future = lastDataGatheringRef.getAndSet(null);
300
301         if (Objects.nonNull(future) && !future.isDone() && !future.isCancelled()) {
302             future.cancel(true);
303         }
304     }
305
306     @VisibleForTesting
307     void setStatisticsGatheringService(final StatisticsGatheringService<T> statisticsGatheringService) {
308         this.statisticsGatheringService = statisticsGatheringService;
309     }
310
311     @VisibleForTesting
312     void setStatisticsGatheringOnTheFlyService(
313             final StatisticsGatheringOnTheFlyService<T> statisticsGatheringOnTheFlyService) {
314         this.statisticsGatheringOnTheFlyService = statisticsGatheringOnTheFlyService;
315     }
316
317     private final class InitialSubmitCallback implements FutureCallback<Boolean> {
318         @Override
319         public void onSuccess(@Nullable final Boolean result) {
320             contextChainMastershipWatcher
321                     .onMasterRoleAcquired(deviceInfo, ContextChainMastershipState.INITIAL_GATHERING);
322
323             if (!isUsingReconciliationFramework) {
324                 continueInitializationAfterReconciliation();
325             }
326         }
327
328         @Override
329         public void onFailure(@Nonnull final Throwable throwable) {
330             contextChainMastershipWatcher.onNotAbleToStartMastershipMandatory(deviceInfo,
331                                                                               "Initial gathering statistics "
332                                                                                       + "unsuccessful: "
333                                                                                       + throwable.getMessage());
334         }
335     }
336 }