Fix spotbugs logging complaints
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / statistics / StatisticsContextImpl.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.statistics;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import com.google.common.collect.ImmutableList;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import com.google.common.util.concurrent.ListeningExecutorService;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import java.util.ArrayList;
19 import java.util.Collection;
20 import java.util.List;
21 import java.util.Optional;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.atomic.AtomicBoolean;
24 import java.util.concurrent.atomic.AtomicReference;
25 import javax.annotation.Nonnull;
26 import javax.annotation.Nullable;
27 import org.opendaylight.mdsal.binding.api.TransactionChainClosedException;
28 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
29 import org.opendaylight.openflowplugin.api.ConnectionException;
30 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
31 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
32 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
33 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
34 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
35 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState;
36 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipWatcher;
37 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
38 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider;
39 import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
40 import org.opendaylight.openflowplugin.impl.services.util.RequestContextUtil;
41 import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringOnTheFlyService;
42 import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringService;
43 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.provider.config.rev160510.OpenflowProviderConfig;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49
50 class StatisticsContextImpl<T extends OfHeader> implements StatisticsContext {
51
52     private static final Logger LOG = LoggerFactory.getLogger(StatisticsContextImpl.class);
53     private static final String CONNECTION_CLOSED = "Connection closed.";
54
55     private final Collection<RequestContext<?>> requestContexts = ConcurrentHashMap.newKeySet();
56     private final DeviceContext deviceContext;
57     private final DeviceState devState;
58     private final ListeningExecutorService executorService;
59     private final boolean isStatisticsPollingOn;
60     private final ConvertorExecutor convertorExecutor;
61     private final MultipartWriterProvider statisticsWriterProvider;
62     private final DeviceInfo deviceInfo;
63     private final TimeCounter timeCounter = new TimeCounter();
64     private final OpenflowProviderConfig config;
65     private final long statisticsPollingInterval;
66     private final long maximumPollingDelay;
67     private final boolean isUsingReconciliationFramework;
68     private final AtomicBoolean schedulingEnabled = new AtomicBoolean(true);
69     private final AtomicReference<ListenableFuture<Boolean>> lastDataGatheringRef = new AtomicReference<>();
70     private final AtomicReference<StatisticsPollingService> statisticsPollingServiceRef = new AtomicReference<>();
71     private List<MultipartType> collectingStatType;
72     private StatisticsGatheringService<T> statisticsGatheringService;
73     private StatisticsGatheringOnTheFlyService<T> statisticsGatheringOnTheFlyService;
74     private ContextChainMastershipWatcher contextChainMastershipWatcher;
75
76     StatisticsContextImpl(@Nonnull final DeviceContext deviceContext,
77                           @Nonnull final ConvertorExecutor convertorExecutor,
78                           @Nonnull final MultipartWriterProvider statisticsWriterProvider,
79                           @Nonnull final ListeningExecutorService executorService,
80                           @Nonnull final OpenflowProviderConfig config,
81                           boolean isStatisticsPollingOn,
82                           boolean isUsingReconciliationFramework) {
83         this.deviceContext = deviceContext;
84         this.devState = Preconditions.checkNotNull(deviceContext.getDeviceState());
85         this.executorService = executorService;
86         this.isStatisticsPollingOn = isStatisticsPollingOn;
87         this.config = config;
88         this.convertorExecutor = convertorExecutor;
89         this.deviceInfo = deviceContext.getDeviceInfo();
90         this.statisticsPollingInterval = config.getBasicTimerDelay().getValue();
91         this.maximumPollingDelay = config.getMaximumTimerDelay().getValue();
92         this.statisticsWriterProvider = statisticsWriterProvider;
93         this.isUsingReconciliationFramework = isUsingReconciliationFramework;
94
95         statisticsGatheringService = new StatisticsGatheringService<>(this, deviceContext);
96         statisticsGatheringOnTheFlyService = new StatisticsGatheringOnTheFlyService<>(this, deviceContext,
97                                                                                       convertorExecutor,
98                                                                                       statisticsWriterProvider);
99     }
100
101     @Override
102     public DeviceInfo getDeviceInfo() {
103         return this.deviceInfo;
104     }
105
106     @Nonnull
107     @Override
108     public ServiceGroupIdentifier getIdentifier() {
109         return deviceInfo.getServiceIdentifier();
110     }
111
112     @Override
113     public void registerMastershipWatcher(@Nonnull final ContextChainMastershipWatcher newWatcher) {
114         this.contextChainMastershipWatcher = newWatcher;
115     }
116
117     @Override
118     public <O> RequestContext<O> createRequestContext() {
119         final AbstractRequestContext<O> ret = new AbstractRequestContext<O>(deviceInfo.reserveXidForDeviceMessage()) {
120             @Override
121             public void close() {
122                 requestContexts.remove(this);
123             }
124         };
125
126         requestContexts.add(ret);
127         return ret;
128     }
129
130     @Override
131     public void enableGathering() {
132         this.schedulingEnabled.set(true);
133     }
134
135     @Override
136     public void disableGathering() {
137         this.schedulingEnabled.set(false);
138     }
139
140     @Override
141     public void continueInitializationAfterReconciliation() {
142         if (deviceContext.initialSubmitTransaction()) {
143             contextChainMastershipWatcher.onMasterRoleAcquired(deviceInfo, ContextChainMastershipState.INITIAL_SUBMIT);
144
145             startGatheringData();
146         } else {
147             contextChainMastershipWatcher
148                     .onNotAbleToStartMastershipMandatory(deviceInfo, "Initial transaction cannot be submitted.");
149         }
150     }
151
152     @Override
153     public void instantiateServiceInstance() {
154         final List<MultipartType> statListForCollecting = new ArrayList<>();
155
156         if (devState.isTableStatisticsAvailable() && config.isIsTableStatisticsPollingOn()) {
157             statListForCollecting.add(MultipartType.OFPMPTABLE);
158         }
159
160         if (devState.isGroupAvailable() && config.isIsGroupStatisticsPollingOn()) {
161             statListForCollecting.add(MultipartType.OFPMPGROUPDESC);
162             statListForCollecting.add(MultipartType.OFPMPGROUP);
163         }
164
165         if (devState.isMetersAvailable() && config.isIsMeterStatisticsPollingOn()) {
166             statListForCollecting.add(MultipartType.OFPMPMETERCONFIG);
167             statListForCollecting.add(MultipartType.OFPMPMETER);
168         }
169
170         if (devState.isFlowStatisticsAvailable() && config.isIsFlowStatisticsPollingOn()) {
171             statListForCollecting.add(MultipartType.OFPMPFLOW);
172         }
173
174         if (devState.isPortStatisticsAvailable() && config.isIsPortStatisticsPollingOn()) {
175             statListForCollecting.add(MultipartType.OFPMPPORTSTATS);
176         }
177
178         if (devState.isQueueStatisticsAvailable() && config.isIsQueueStatisticsPollingOn()) {
179             statListForCollecting.add(MultipartType.OFPMPQUEUE);
180         }
181
182         collectingStatType = ImmutableList.copyOf(statListForCollecting);
183         Futures.addCallback(gatherDynamicData(), new InitialSubmitCallback(), MoreExecutors.directExecutor());
184     }
185
186     @Override
187     public ListenableFuture<Void> closeServiceInstance() {
188         return stopGatheringData();
189     }
190
191     @Override
192     public void close() {
193         Futures.addCallback(stopGatheringData(), new FutureCallback<Void>() {
194             @Override
195             public void onSuccess(@Nullable final Void result) {
196                 requestContexts.forEach(requestContext -> RequestContextUtil
197                         .closeRequestContextWithRpcError(requestContext, CONNECTION_CLOSED));
198             }
199
200             @Override
201             public void onFailure(final Throwable throwable) {
202                 requestContexts.forEach(requestContext -> RequestContextUtil
203                         .closeRequestContextWithRpcError(requestContext, CONNECTION_CLOSED));
204             }
205         }, MoreExecutors.directExecutor());
206     }
207
208     private ListenableFuture<Boolean> gatherDynamicData() {
209         if (!isStatisticsPollingOn || !schedulingEnabled.get()) {
210             LOG.debug("Statistics for device {} are not enabled.", getDeviceInfo().getNodeId().getValue());
211             return Futures.immediateFuture(Boolean.TRUE);
212         }
213
214         return this.lastDataGatheringRef.updateAndGet(future -> {
215             // write start timestamp to state snapshot container
216             StatisticsGatheringUtils.markDeviceStateSnapshotStart(deviceInfo, deviceContext);
217
218             // recreate gathering future if it should be recreated
219             final ListenableFuture<Boolean> lastDataGathering = future == null || future.isCancelled()
220                     || future.isDone() ? Futures.immediateFuture(Boolean.TRUE) : future;
221
222             // build statistics gathering future
223             final ListenableFuture<Boolean> newDataGathering = collectingStatType.stream()
224                     .reduce(lastDataGathering, this::statChainFuture,
225                         (listenableFuture, asyn) -> Futures.transformAsync(listenableFuture, result -> asyn,
226                                 MoreExecutors.directExecutor()));
227
228             // write end timestamp to state snapshot container
229             Futures.addCallback(newDataGathering, new FutureCallback<Boolean>() {
230                 @Override
231                 public void onSuccess(@Nonnull final Boolean result) {
232                     StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceInfo, deviceContext, result);
233                 }
234
235                 @Override
236                 public void onFailure(final Throwable throwable) {
237                     if (!(throwable instanceof TransactionChainClosedException)) {
238                         StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceInfo, deviceContext, false);
239                     }
240                 }
241             }, MoreExecutors.directExecutor());
242
243             return newDataGathering;
244         });
245     }
246
247     private ListenableFuture<Boolean> statChainFuture(final ListenableFuture<Boolean> prevFuture,
248                                                       final MultipartType multipartType) {
249         if (ConnectionContext.CONNECTION_STATE.RIP
250                 .equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) {
251             final String errMsg = String
252                     .format("Device connection for node %s doesn't exist anymore. Primary connection status : %s",
253                             getDeviceInfo().getNodeId(),
254                             deviceContext.getPrimaryConnectionContext().getConnectionState());
255
256             return Futures.immediateFailedFuture(new ConnectionException(errMsg));
257         }
258
259         return Futures.transformAsync(prevFuture, result -> {
260             LOG.debug("Status of previous stat iteration for node {}: {}", deviceInfo, result);
261             LOG.debug("Stats iterating to next type for node {} of type {}", deviceInfo, multipartType);
262             final boolean onTheFly = MultipartType.OFPMPFLOW.equals(multipartType);
263             final boolean supported = collectingStatType.contains(multipartType);
264
265             // TODO: Refactor twice sending deviceContext into gatheringStatistics
266             return supported ? StatisticsGatheringUtils
267                     .gatherStatistics(onTheFly ? statisticsGatheringOnTheFlyService : statisticsGatheringService,
268                                       getDeviceInfo(), multipartType, deviceContext, deviceContext, convertorExecutor,
269                                       statisticsWriterProvider, executorService) : Futures
270                     .immediateFuture(Boolean.FALSE);
271         }, MoreExecutors.directExecutor());
272     }
273
274     private void startGatheringData() {
275         if (!isStatisticsPollingOn) {
276             return;
277         }
278
279         LOG.info("Starting statistics gathering for node {}", deviceInfo);
280         final StatisticsPollingService statisticsPollingService =
281                 new StatisticsPollingService(timeCounter,
282                                              statisticsPollingInterval,
283                                              maximumPollingDelay,
284                                              StatisticsContextImpl.this::gatherDynamicData);
285
286         schedulingEnabled.set(true);
287         statisticsPollingService.startAsync();
288         this.statisticsPollingServiceRef.set(statisticsPollingService);
289     }
290
291     private ListenableFuture<Void> stopGatheringData() {
292         LOG.info("Stopping running statistics gathering for node {}", deviceInfo);
293         cancelLastDataGathering();
294
295         return Optional.ofNullable(statisticsPollingServiceRef.getAndSet(null)).map(StatisticsPollingService::stop)
296                 .orElseGet(() -> Futures.immediateFuture(null));
297     }
298
299     private void cancelLastDataGathering() {
300         final ListenableFuture<Boolean> future = lastDataGatheringRef.getAndSet(null);
301
302         if (future != null && !future.isDone() && !future.isCancelled()) {
303             future.cancel(true);
304         }
305     }
306
307     @VisibleForTesting
308     void setStatisticsGatheringService(final StatisticsGatheringService<T> statisticsGatheringService) {
309         this.statisticsGatheringService = statisticsGatheringService;
310     }
311
312     @VisibleForTesting
313     void setStatisticsGatheringOnTheFlyService(
314             final StatisticsGatheringOnTheFlyService<T> statisticsGatheringOnTheFlyService) {
315         this.statisticsGatheringOnTheFlyService = statisticsGatheringOnTheFlyService;
316     }
317
318     private final class InitialSubmitCallback implements FutureCallback<Boolean> {
319         @Override
320         public void onSuccess(@Nullable final Boolean result) {
321             contextChainMastershipWatcher
322                     .onMasterRoleAcquired(deviceInfo, ContextChainMastershipState.INITIAL_GATHERING);
323
324             if (!isUsingReconciliationFramework) {
325                 continueInitializationAfterReconciliation();
326             }
327         }
328
329         @Override
330         public void onFailure(@Nonnull final Throwable throwable) {
331             contextChainMastershipWatcher.onNotAbleToStartMastershipMandatory(deviceInfo,
332                                                                               "Initial gathering statistics "
333                                                                                       + "unsuccessful: "
334                                                                                       + throwable.getMessage());
335         }
336     }
337 }