OPNFLWPLUG-1032: Neon-MRI: Bump odlparent, yangtools, mdsal
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / statistics / StatisticsContextImpl.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.impl.statistics;
10
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.collect.ImmutableList;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.ListeningExecutorService;
18 import com.google.common.util.concurrent.MoreExecutors;
19 import java.util.ArrayList;
20 import java.util.Collection;
21 import java.util.List;
22 import java.util.Objects;
23 import java.util.Optional;
24 import java.util.concurrent.ConcurrentHashMap;
25 import java.util.concurrent.atomic.AtomicBoolean;
26 import java.util.concurrent.atomic.AtomicReference;
27 import javax.annotation.Nonnull;
28 import javax.annotation.Nullable;
29
30 import org.opendaylight.mdsal.binding.api.TransactionChainClosedException;
31 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
32 import org.opendaylight.openflowplugin.api.ConnectionException;
33 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
34 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
35 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
36 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
37 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
38 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState;
39 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipWatcher;
40 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
41 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider;
42 import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
43 import org.opendaylight.openflowplugin.impl.services.util.RequestContextUtil;
44 import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringOnTheFlyService;
45 import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringService;
46 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.provider.config.rev160510.OpenflowProviderConfig;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
52
53 class StatisticsContextImpl<T extends OfHeader> implements StatisticsContext {
54
55     private static final Logger LOG = LoggerFactory.getLogger(StatisticsContextImpl.class);
56     private static final String CONNECTION_CLOSED = "Connection closed.";
57
58     private final Collection<RequestContext<?>> requestContexts = ConcurrentHashMap.newKeySet();
59     private final DeviceContext deviceContext;
60     private final DeviceState devState;
61     private final ListeningExecutorService executorService;
62     private final boolean isStatisticsPollingOn;
63     private final ConvertorExecutor convertorExecutor;
64     private final MultipartWriterProvider statisticsWriterProvider;
65     private final DeviceInfo deviceInfo;
66     private final TimeCounter timeCounter = new TimeCounter();
67     private final OpenflowProviderConfig config;
68     private final long statisticsPollingInterval;
69     private final long maximumPollingDelay;
70     private final boolean isUsingReconciliationFramework;
71     private final AtomicBoolean schedulingEnabled = new AtomicBoolean(true);
72     private final AtomicReference<ListenableFuture<Boolean>> lastDataGatheringRef = new AtomicReference<>();
73     private final AtomicReference<StatisticsPollingService> statisticsPollingServiceRef = new AtomicReference<>();
74     private List<MultipartType> collectingStatType;
75     private StatisticsGatheringService<T> statisticsGatheringService;
76     private StatisticsGatheringOnTheFlyService<T> statisticsGatheringOnTheFlyService;
77     private ContextChainMastershipWatcher contextChainMastershipWatcher;
78
79     StatisticsContextImpl(@Nonnull final DeviceContext deviceContext,
80                           @Nonnull final ConvertorExecutor convertorExecutor,
81                           @Nonnull final MultipartWriterProvider statisticsWriterProvider,
82                           @Nonnull final ListeningExecutorService executorService,
83                           @Nonnull final OpenflowProviderConfig config,
84                           boolean isStatisticsPollingOn,
85                           boolean isUsingReconciliationFramework) {
86         this.deviceContext = deviceContext;
87         this.devState = Preconditions.checkNotNull(deviceContext.getDeviceState());
88         this.executorService = executorService;
89         this.isStatisticsPollingOn = isStatisticsPollingOn;
90         this.config = config;
91         this.convertorExecutor = convertorExecutor;
92         this.deviceInfo = deviceContext.getDeviceInfo();
93         this.statisticsPollingInterval = config.getBasicTimerDelay().getValue();
94         this.maximumPollingDelay = config.getMaximumTimerDelay().getValue();
95         this.statisticsWriterProvider = statisticsWriterProvider;
96         this.isUsingReconciliationFramework = isUsingReconciliationFramework;
97
98         statisticsGatheringService = new StatisticsGatheringService<>(this, deviceContext);
99         statisticsGatheringOnTheFlyService = new StatisticsGatheringOnTheFlyService<>(this, deviceContext,
100                                                                                       convertorExecutor,
101                                                                                       statisticsWriterProvider);
102     }
103
104     @Override
105     public DeviceInfo getDeviceInfo() {
106         return this.deviceInfo;
107     }
108
109     @Nonnull
110     @Override
111     public ServiceGroupIdentifier getIdentifier() {
112         return deviceInfo.getServiceIdentifier();
113     }
114
115     @Override
116     public void registerMastershipWatcher(@Nonnull final ContextChainMastershipWatcher newWatcher) {
117         this.contextChainMastershipWatcher = newWatcher;
118     }
119
120     @Override
121     public <O> RequestContext<O> createRequestContext() {
122         final AbstractRequestContext<O> ret = new AbstractRequestContext<O>(deviceInfo.reserveXidForDeviceMessage()) {
123             @Override
124             public void close() {
125                 requestContexts.remove(this);
126             }
127         };
128
129         requestContexts.add(ret);
130         return ret;
131     }
132
133     @Override
134     public void enableGathering() {
135         this.schedulingEnabled.set(true);
136     }
137
138     @Override
139     public void disableGathering() {
140         this.schedulingEnabled.set(false);
141     }
142
143     @Override
144     public void continueInitializationAfterReconciliation() {
145         if (deviceContext.initialSubmitTransaction()) {
146             contextChainMastershipWatcher.onMasterRoleAcquired(deviceInfo, ContextChainMastershipState.INITIAL_SUBMIT);
147
148             startGatheringData();
149         } else {
150             contextChainMastershipWatcher
151                     .onNotAbleToStartMastershipMandatory(deviceInfo, "Initial transaction cannot be submitted.");
152         }
153     }
154
155     @Override
156     public void instantiateServiceInstance() {
157         final List<MultipartType> statListForCollecting = new ArrayList<>();
158
159         if (devState.isTableStatisticsAvailable() && config.isIsTableStatisticsPollingOn()) {
160             statListForCollecting.add(MultipartType.OFPMPTABLE);
161         }
162
163         if (devState.isGroupAvailable() && config.isIsGroupStatisticsPollingOn()) {
164             statListForCollecting.add(MultipartType.OFPMPGROUPDESC);
165             statListForCollecting.add(MultipartType.OFPMPGROUP);
166         }
167
168         if (devState.isMetersAvailable() && config.isIsMeterStatisticsPollingOn()) {
169             statListForCollecting.add(MultipartType.OFPMPMETERCONFIG);
170             statListForCollecting.add(MultipartType.OFPMPMETER);
171         }
172
173         if (devState.isFlowStatisticsAvailable() && config.isIsFlowStatisticsPollingOn()) {
174             statListForCollecting.add(MultipartType.OFPMPFLOW);
175         }
176
177         if (devState.isPortStatisticsAvailable() && config.isIsPortStatisticsPollingOn()) {
178             statListForCollecting.add(MultipartType.OFPMPPORTSTATS);
179         }
180
181         if (devState.isQueueStatisticsAvailable() && config.isIsQueueStatisticsPollingOn()) {
182             statListForCollecting.add(MultipartType.OFPMPQUEUE);
183         }
184
185         collectingStatType = ImmutableList.copyOf(statListForCollecting);
186         Futures.addCallback(gatherDynamicData(), new InitialSubmitCallback(), MoreExecutors.directExecutor());
187     }
188
189     @Override
190     public ListenableFuture<Void> closeServiceInstance() {
191         return stopGatheringData();
192     }
193
194     @Override
195     public void close() {
196         Futures.addCallback(stopGatheringData(), new FutureCallback<Void>() {
197             @Override
198             public void onSuccess(@Nullable final Void result) {
199                 requestContexts.forEach(requestContext -> RequestContextUtil
200                         .closeRequestContextWithRpcError(requestContext, CONNECTION_CLOSED));
201             }
202
203             @Override
204             public void onFailure(final Throwable throwable) {
205                 requestContexts.forEach(requestContext -> RequestContextUtil
206                         .closeRequestContextWithRpcError(requestContext, CONNECTION_CLOSED));
207             }
208         }, MoreExecutors.directExecutor());
209     }
210
211     private ListenableFuture<Boolean> gatherDynamicData() {
212         if (!isStatisticsPollingOn || !schedulingEnabled.get()) {
213             LOG.debug("Statistics for device {} are not enabled.", getDeviceInfo().getNodeId().getValue());
214             return Futures.immediateFuture(Boolean.TRUE);
215         }
216
217         return this.lastDataGatheringRef.updateAndGet(future -> {
218             // write start timestamp to state snapshot container
219             StatisticsGatheringUtils.markDeviceStateSnapshotStart(deviceInfo, deviceContext);
220
221             // recreate gathering future if it should be recreated
222             final ListenableFuture<Boolean> lastDataGathering =
223                     Objects.isNull(future) || future.isCancelled() || future.isDone() ? Futures
224                             .immediateFuture(Boolean.TRUE) : future;
225
226             // build statistics gathering future
227             final ListenableFuture<Boolean> newDataGathering = collectingStatType.stream()
228                     .reduce(lastDataGathering, this::statChainFuture,
229                         (listenableFuture, asyn) -> Futures.transformAsync(listenableFuture, result -> asyn,
230                                 MoreExecutors.directExecutor()));
231
232             // write end timestamp to state snapshot container
233             Futures.addCallback(newDataGathering, new FutureCallback<Boolean>() {
234                 @Override
235                 public void onSuccess(@Nonnull final Boolean result) {
236                     StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceInfo, deviceContext, result);
237                 }
238
239                 @Override
240                 public void onFailure(final Throwable throwable) {
241                     if (!(throwable instanceof TransactionChainClosedException)) {
242                         StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceInfo, deviceContext, false);
243                     }
244                 }
245             }, MoreExecutors.directExecutor());
246
247             return newDataGathering;
248         });
249     }
250
251     private ListenableFuture<Boolean> statChainFuture(final ListenableFuture<Boolean> prevFuture,
252                                                       final MultipartType multipartType) {
253         if (ConnectionContext.CONNECTION_STATE.RIP
254                 .equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) {
255             final String errMsg = String
256                     .format("Device connection for node %s doesn't exist anymore. Primary connection status : %s",
257                             getDeviceInfo().getNodeId(),
258                             deviceContext.getPrimaryConnectionContext().getConnectionState());
259
260             return Futures.immediateFailedFuture(new ConnectionException(errMsg));
261         }
262
263         return Futures.transformAsync(prevFuture, result -> {
264             LOG.debug("Status of previous stat iteration for node {}: {}", deviceInfo, result);
265             LOG.debug("Stats iterating to next type for node {} of type {}", deviceInfo, multipartType);
266             final boolean onTheFly = MultipartType.OFPMPFLOW.equals(multipartType);
267             final boolean supported = collectingStatType.contains(multipartType);
268
269             // TODO: Refactor twice sending deviceContext into gatheringStatistics
270             return supported ? StatisticsGatheringUtils
271                     .gatherStatistics(onTheFly ? statisticsGatheringOnTheFlyService : statisticsGatheringService,
272                                       getDeviceInfo(), multipartType, deviceContext, deviceContext, convertorExecutor,
273                                       statisticsWriterProvider, executorService) : Futures
274                     .immediateFuture(Boolean.FALSE);
275         }, MoreExecutors.directExecutor());
276     }
277
278     private void startGatheringData() {
279         if (!isStatisticsPollingOn) {
280             return;
281         }
282
283         LOG.info("Starting statistics gathering for node {}", deviceInfo);
284         final StatisticsPollingService statisticsPollingService =
285                 new StatisticsPollingService(timeCounter,
286                                              statisticsPollingInterval,
287                                              maximumPollingDelay,
288                                              StatisticsContextImpl.this::gatherDynamicData);
289
290         schedulingEnabled.set(true);
291         statisticsPollingService.startAsync();
292         this.statisticsPollingServiceRef.set(statisticsPollingService);
293     }
294
295     private ListenableFuture<Void> stopGatheringData() {
296         LOG.info("Stopping running statistics gathering for node {}", deviceInfo);
297         cancelLastDataGathering();
298
299         return Optional.ofNullable(statisticsPollingServiceRef.getAndSet(null)).map(StatisticsPollingService::stop)
300                 .orElseGet(() -> Futures.immediateFuture(null));
301     }
302
303     private void cancelLastDataGathering() {
304         final ListenableFuture<Boolean> future = lastDataGatheringRef.getAndSet(null);
305
306         if (Objects.nonNull(future) && !future.isDone() && !future.isCancelled()) {
307             future.cancel(true);
308         }
309     }
310
311     @VisibleForTesting
312     void setStatisticsGatheringService(final StatisticsGatheringService<T> statisticsGatheringService) {
313         this.statisticsGatheringService = statisticsGatheringService;
314     }
315
316     @VisibleForTesting
317     void setStatisticsGatheringOnTheFlyService(
318             final StatisticsGatheringOnTheFlyService<T> statisticsGatheringOnTheFlyService) {
319         this.statisticsGatheringOnTheFlyService = statisticsGatheringOnTheFlyService;
320     }
321
322     private final class InitialSubmitCallback implements FutureCallback<Boolean> {
323         @Override
324         public void onSuccess(@Nullable final Boolean result) {
325             contextChainMastershipWatcher
326                     .onMasterRoleAcquired(deviceInfo, ContextChainMastershipState.INITIAL_GATHERING);
327
328             if (!isUsingReconciliationFramework) {
329                 continueInitializationAfterReconciliation();
330             }
331         }
332
333         @Override
334         public void onFailure(@Nonnull final Throwable throwable) {
335             contextChainMastershipWatcher.onNotAbleToStartMastershipMandatory(deviceInfo,
336                                                                               "Initial gathering statistics "
337                                                                                       + "unsuccessful: "
338                                                                                       + throwable.getMessage());
339         }
340     }
341 }