Fix codestyle
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / statistics / StatisticsContextImpl.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.impl.statistics;
10
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.collect.ImmutableList;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import java.util.ArrayList;
18 import java.util.Collection;
19 import java.util.HashSet;
20 import java.util.List;
21 import java.util.Objects;
22 import java.util.Optional;
23 import java.util.concurrent.atomic.AtomicBoolean;
24 import java.util.concurrent.atomic.AtomicReference;
25 import javax.annotation.Nonnull;
26 import javax.annotation.Nullable;
27 import org.opendaylight.mdsal.common.api.TransactionChainClosedException;
28 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
29 import org.opendaylight.openflowplugin.api.ConnectionException;
30 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
31 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
32 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
33 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
34 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
35 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState;
36 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipWatcher;
37 import org.opendaylight.openflowplugin.api.openflow.rpc.listener.ItemLifecycleListener;
38 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
39 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider;
40 import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
41 import org.opendaylight.openflowplugin.impl.rpc.listener.ItemLifecycleListenerImpl;
42 import org.opendaylight.openflowplugin.impl.services.util.RequestContextUtil;
43 import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringOnTheFlyService;
44 import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringService;
45 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50
51 class StatisticsContextImpl<T extends OfHeader> implements StatisticsContext {
52
53     private static final Logger LOG = LoggerFactory.getLogger(StatisticsContextImpl.class);
54     private static final String CONNECTION_CLOSED = "Connection closed.";
55
56     private final ItemLifecycleListener itemLifeCycleListener;
57     private final Collection<RequestContext<?>> requestContexts = new HashSet<>();
58     private final DeviceContext deviceContext;
59     private final DeviceState devState;
60     private final boolean isStatisticsPollingOn;
61     private final ConvertorExecutor convertorExecutor;
62     private final MultipartWriterProvider statisticsWriterProvider;
63     private final DeviceInfo deviceInfo;
64     private final TimeCounter timeCounter = new TimeCounter();
65     private final long statisticsPollingInterval;
66     private final long maximumPollingDelay;
67     private final boolean isUsingReconciliationFramework;
68     private final AtomicBoolean schedulingEnabled = new AtomicBoolean(true);
69     private final AtomicReference<ListenableFuture<Boolean>> lastDataGathering = new AtomicReference<>();
70     private final AtomicReference<StatisticsPollingService> statisticsPollingService = new AtomicReference<>();
71     private List<MultipartType> collectingStatType;
72     private StatisticsGatheringService<T> statisticsGatheringService;
73     private StatisticsGatheringOnTheFlyService<T> statisticsGatheringOnTheFlyService;
74     private ContextChainMastershipWatcher contextChainMastershipWatcher;
75
76     StatisticsContextImpl(@Nonnull final DeviceContext deviceContext,
77                           @Nonnull final ConvertorExecutor convertorExecutor,
78                           @Nonnull final MultipartWriterProvider statisticsWriterProvider,
79                           boolean isStatisticsPollingOn,
80                           boolean isUsingReconciliationFramework,
81                           long statisticsPollingInterval,
82                           long maximumPollingDelay) {
83         this.deviceContext = deviceContext;
84         this.devState = Preconditions.checkNotNull(deviceContext.getDeviceState());
85         this.isStatisticsPollingOn = isStatisticsPollingOn;
86         this.convertorExecutor = convertorExecutor;
87         this.itemLifeCycleListener = new ItemLifecycleListenerImpl(deviceContext);
88         this.deviceInfo = deviceContext.getDeviceInfo();
89         this.statisticsPollingInterval = statisticsPollingInterval;
90         this.maximumPollingDelay = maximumPollingDelay;
91         this.statisticsWriterProvider = statisticsWriterProvider;
92         this.isUsingReconciliationFramework = isUsingReconciliationFramework;
93
94         statisticsGatheringService = new StatisticsGatheringService<>(this, deviceContext);
95         statisticsGatheringOnTheFlyService = new StatisticsGatheringOnTheFlyService<>(this,
96                 deviceContext, convertorExecutor, statisticsWriterProvider);
97     }
98
99     @Override
100     public DeviceInfo getDeviceInfo() {
101         return this.deviceInfo;
102     }
103
104     @Nonnull
105     @Override
106     public ServiceGroupIdentifier getIdentifier() {
107         return deviceInfo.getServiceIdentifier();
108     }
109
110     @Override
111     public void registerMastershipWatcher(@Nonnull final ContextChainMastershipWatcher contextChainMastershipWatcher) {
112         this.contextChainMastershipWatcher = contextChainMastershipWatcher;
113     }
114
115     @Override
116     public <O> RequestContext<O> createRequestContext() {
117         final AbstractRequestContext<O> ret = new AbstractRequestContext<O>(deviceInfo.reserveXidForDeviceMessage()) {
118             @Override
119             public void close() {
120                 requestContexts.remove(this);
121             }
122         };
123
124         requestContexts.add(ret);
125         return ret;
126     }
127
128     @Override
129     public void enableGathering() {
130         this.schedulingEnabled.set(true);
131         deviceContext.getItemLifeCycleSourceRegistry()
132                 .getLifeCycleSources().forEach(itemLifeCycleSource -> itemLifeCycleSource
133                 .setItemLifecycleListener(null));
134     }
135
136     @Override
137     public void disableGathering() {
138         this.schedulingEnabled.set(false);
139         deviceContext.getItemLifeCycleSourceRegistry()
140                 .getLifeCycleSources().forEach(itemLifeCycleSource -> itemLifeCycleSource
141                 .setItemLifecycleListener(itemLifeCycleListener));
142     }
143
144     @Override
145     public void continueInitializationAfterReconciliation() {
146         if (deviceContext.initialSubmitTransaction()) {
147             contextChainMastershipWatcher.onMasterRoleAcquired(
148                     deviceInfo,
149                     ContextChainMastershipState.INITIAL_SUBMIT);
150
151             startGatheringData();
152         } else {
153             contextChainMastershipWatcher.onNotAbleToStartMastershipMandatory(
154                     deviceInfo,
155                     "Initial transaction cannot be submitted.");
156         }
157     }
158
159     @Override
160     public void instantiateServiceInstance() {
161         final List<MultipartType> statListForCollecting = new ArrayList<>();
162
163         if (devState.isTableStatisticsAvailable()) {
164             statListForCollecting.add(MultipartType.OFPMPTABLE);
165         }
166
167         if (devState.isFlowStatisticsAvailable()) {
168             statListForCollecting.add(MultipartType.OFPMPFLOW);
169         }
170
171         if (devState.isGroupAvailable()) {
172             statListForCollecting.add(MultipartType.OFPMPGROUPDESC);
173             statListForCollecting.add(MultipartType.OFPMPGROUP);
174         }
175
176         if (devState.isMetersAvailable()) {
177             statListForCollecting.add(MultipartType.OFPMPMETERCONFIG);
178             statListForCollecting.add(MultipartType.OFPMPMETER);
179         }
180
181         if (devState.isPortStatisticsAvailable()) {
182             statListForCollecting.add(MultipartType.OFPMPPORTSTATS);
183         }
184
185         if (devState.isQueueStatisticsAvailable()) {
186             statListForCollecting.add(MultipartType.OFPMPQUEUE);
187         }
188
189         collectingStatType = ImmutableList.copyOf(statListForCollecting);
190         Futures.addCallback(gatherDynamicData(), new InitialSubmitCallback());
191     }
192
193     @Override
194     public ListenableFuture<Void> closeServiceInstance() {
195         return stopGatheringData();
196     }
197
198     @Override
199     public void close() {
200          Futures.addCallback(stopGatheringData(), new FutureCallback<Void>() {
201             @Override
202             public void onSuccess(@Nullable final Void result) {
203                 requestContexts.forEach(requestContext -> RequestContextUtil
204                         .closeRequestContextWithRpcError(requestContext, CONNECTION_CLOSED));
205             }
206
207             @Override
208             public void onFailure(final Throwable t) {
209                 requestContexts.forEach(requestContext -> RequestContextUtil
210                         .closeRequestContextWithRpcError(requestContext, CONNECTION_CLOSED));
211             }
212         });
213     }
214
215     private ListenableFuture<Boolean> gatherDynamicData() {
216         if (!isStatisticsPollingOn || !schedulingEnabled.get()) {
217             LOG.debug("Statistics for device {} are not enabled.", getDeviceInfo().getNodeId().getValue());
218             return Futures.immediateFuture(Boolean.TRUE);
219         }
220
221         return this.lastDataGathering.updateAndGet(future -> {
222             // write start timestamp to state snapshot container
223             StatisticsGatheringUtils.markDeviceStateSnapshotStart(deviceInfo, deviceContext);
224
225             // recreate gathering future if it should be recreated
226             final ListenableFuture<Boolean> lastDataGathering = Objects.isNull(future) ||
227                     future.isCancelled() ||
228                     future.isDone() ?
229                     Futures.immediateFuture(Boolean.TRUE) :
230                     future;
231
232             // build statistics gathering future
233             final ListenableFuture<Boolean> newDataGathering = collectingStatType.stream().reduce(
234                     lastDataGathering,
235                     this::statChainFuture,
236                     (a, b) -> Futures.transformAsync(a, result -> b));
237
238             // write end timestamp to state snapshot container
239             Futures.addCallback(newDataGathering, new FutureCallback<Boolean>() {
240                 @Override
241                 public void onSuccess(final Boolean result) {
242                     StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceInfo, deviceContext, result);
243                 }
244
245                 @Override
246                 public void onFailure(final Throwable t) {
247                     if (!(t instanceof TransactionChainClosedException)) {
248                         StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceInfo, deviceContext, false);
249                     }
250                 }
251             });
252
253             return newDataGathering;
254         });
255     }
256
257     private ListenableFuture<Boolean> statChainFuture(final ListenableFuture<Boolean> prevFuture, final MultipartType multipartType) {
258         if (ConnectionContext.CONNECTION_STATE.RIP.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) {
259             final String errMsg = String
260                     .format("Device connection for node %s doesn't exist anymore. Primary connection status : %s",
261                             getDeviceInfo().getNodeId(),
262                             deviceContext.getPrimaryConnectionContext().getConnectionState());
263
264             return Futures.immediateFailedFuture(new ConnectionException(errMsg));
265         }
266
267         return Futures.transformAsync(prevFuture, result -> {
268             LOG.debug("Status of previous stat iteration for node {}: {}", deviceInfo, result);
269             LOG.debug("Stats iterating to next type for node {} of type {}", deviceInfo, multipartType);
270             final boolean onTheFly = MultipartType.OFPMPFLOW.equals(multipartType);
271             final boolean supported = collectingStatType.contains(multipartType);
272
273             // TODO: Refactor twice sending deviceContext into gatheringStatistics
274             return supported ? StatisticsGatheringUtils.gatherStatistics(
275                     onTheFly ? statisticsGatheringOnTheFlyService : statisticsGatheringService,
276                     getDeviceInfo(),
277                     multipartType,
278                     deviceContext,
279                     deviceContext,
280                     convertorExecutor,
281                     statisticsWriterProvider) : Futures.immediateFuture(Boolean.FALSE);
282         });
283     }
284
285     private void startGatheringData() {
286         if (!isStatisticsPollingOn) {
287             return;
288         }
289
290         LOG.info("Starting statistics gathering for node {}", deviceInfo);
291         final StatisticsPollingService statisticsPollingService = new StatisticsPollingService(timeCounter,
292                 statisticsPollingInterval, maximumPollingDelay,
293                 StatisticsContextImpl.this::gatherDynamicData);
294
295         schedulingEnabled.set(true);
296         statisticsPollingService.startAsync();
297         this.statisticsPollingService.set(statisticsPollingService);
298     }
299
300     private ListenableFuture<Void> stopGatheringData() {
301         LOG.info("Stopping running statistics gathering for node {}", deviceInfo);
302         cancelLastDataGathering();
303
304         return Optional
305                 .ofNullable(statisticsPollingService.getAndSet(null))
306                 .map(StatisticsPollingService::stop)
307                 .orElseGet(() -> Futures.immediateFuture(null));
308     }
309
310     private void cancelLastDataGathering() {
311         final ListenableFuture<Boolean> future = lastDataGathering.getAndSet(null);
312
313         if (Objects.nonNull(future) && !future.isDone() && !future.isCancelled()) {
314             future.cancel(true);
315         }
316     }
317
318     @VisibleForTesting
319     void setStatisticsGatheringService(final StatisticsGatheringService<T> statisticsGatheringService) {
320         this.statisticsGatheringService = statisticsGatheringService;
321     }
322
323     @VisibleForTesting
324     void setStatisticsGatheringOnTheFlyService(final StatisticsGatheringOnTheFlyService<T> statisticsGatheringOnTheFlyService) {
325         this.statisticsGatheringOnTheFlyService = statisticsGatheringOnTheFlyService;
326     }
327
328     private final class InitialSubmitCallback implements FutureCallback<Boolean> {
329         @Override
330         public void onSuccess(@Nullable final Boolean result) {
331             contextChainMastershipWatcher.onMasterRoleAcquired(
332                     deviceInfo,
333                     ContextChainMastershipState.INITIAL_GATHERING
334             );
335
336             if (!isUsingReconciliationFramework) {
337                 continueInitializationAfterReconciliation();
338             }
339         }
340
341         @Override
342         public void onFailure(@Nonnull final Throwable t) {
343             contextChainMastershipWatcher.onNotAbleToStartMastershipMandatory(
344                     deviceInfo,
345                     "Initial gathering statistics unsuccessful: " + t.getMessage());
346         }
347     }
348 }