2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.impl.statistics;
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.collect.ImmutableList;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.ListeningExecutorService;
18 import com.google.common.util.concurrent.MoreExecutors;
19 import java.util.ArrayList;
20 import java.util.Collection;
21 import java.util.List;
22 import java.util.Objects;
23 import java.util.Optional;
24 import java.util.concurrent.ConcurrentHashMap;
25 import java.util.concurrent.atomic.AtomicBoolean;
26 import java.util.concurrent.atomic.AtomicReference;
27 import javax.annotation.Nonnull;
28 import javax.annotation.Nullable;
30 import org.opendaylight.mdsal.binding.api.TransactionChainClosedException;
31 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
32 import org.opendaylight.openflowplugin.api.ConnectionException;
33 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
34 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
35 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
36 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
37 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
38 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState;
39 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipWatcher;
40 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
41 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider;
42 import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
43 import org.opendaylight.openflowplugin.impl.services.util.RequestContextUtil;
44 import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringOnTheFlyService;
45 import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringService;
46 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.provider.config.rev160510.OpenflowProviderConfig;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
53 class StatisticsContextImpl<T extends OfHeader> implements StatisticsContext {
55 private static final Logger LOG = LoggerFactory.getLogger(StatisticsContextImpl.class);
56 private static final String CONNECTION_CLOSED = "Connection closed.";
58 private final Collection<RequestContext<?>> requestContexts = ConcurrentHashMap.newKeySet();
59 private final DeviceContext deviceContext;
60 private final DeviceState devState;
61 private final ListeningExecutorService executorService;
62 private final boolean isStatisticsPollingOn;
63 private final ConvertorExecutor convertorExecutor;
64 private final MultipartWriterProvider statisticsWriterProvider;
65 private final DeviceInfo deviceInfo;
66 private final TimeCounter timeCounter = new TimeCounter();
67 private final OpenflowProviderConfig config;
68 private final long statisticsPollingInterval;
69 private final long maximumPollingDelay;
70 private final boolean isUsingReconciliationFramework;
71 private final AtomicBoolean schedulingEnabled = new AtomicBoolean(true);
72 private final AtomicReference<ListenableFuture<Boolean>> lastDataGatheringRef = new AtomicReference<>();
73 private final AtomicReference<StatisticsPollingService> statisticsPollingServiceRef = new AtomicReference<>();
74 private List<MultipartType> collectingStatType;
75 private StatisticsGatheringService<T> statisticsGatheringService;
76 private StatisticsGatheringOnTheFlyService<T> statisticsGatheringOnTheFlyService;
77 private ContextChainMastershipWatcher contextChainMastershipWatcher;
79 StatisticsContextImpl(@Nonnull final DeviceContext deviceContext,
80 @Nonnull final ConvertorExecutor convertorExecutor,
81 @Nonnull final MultipartWriterProvider statisticsWriterProvider,
82 @Nonnull final ListeningExecutorService executorService,
83 @Nonnull final OpenflowProviderConfig config,
84 boolean isStatisticsPollingOn,
85 boolean isUsingReconciliationFramework) {
86 this.deviceContext = deviceContext;
87 this.devState = Preconditions.checkNotNull(deviceContext.getDeviceState());
88 this.executorService = executorService;
89 this.isStatisticsPollingOn = isStatisticsPollingOn;
91 this.convertorExecutor = convertorExecutor;
92 this.deviceInfo = deviceContext.getDeviceInfo();
93 this.statisticsPollingInterval = config.getBasicTimerDelay().getValue();
94 this.maximumPollingDelay = config.getMaximumTimerDelay().getValue();
95 this.statisticsWriterProvider = statisticsWriterProvider;
96 this.isUsingReconciliationFramework = isUsingReconciliationFramework;
98 statisticsGatheringService = new StatisticsGatheringService<>(this, deviceContext);
99 statisticsGatheringOnTheFlyService = new StatisticsGatheringOnTheFlyService<>(this, deviceContext,
101 statisticsWriterProvider);
105 public DeviceInfo getDeviceInfo() {
106 return this.deviceInfo;
111 public ServiceGroupIdentifier getIdentifier() {
112 return deviceInfo.getServiceIdentifier();
116 public void registerMastershipWatcher(@Nonnull final ContextChainMastershipWatcher newWatcher) {
117 this.contextChainMastershipWatcher = newWatcher;
121 public <O> RequestContext<O> createRequestContext() {
122 final AbstractRequestContext<O> ret = new AbstractRequestContext<O>(deviceInfo.reserveXidForDeviceMessage()) {
124 public void close() {
125 requestContexts.remove(this);
129 requestContexts.add(ret);
134 public void enableGathering() {
135 this.schedulingEnabled.set(true);
139 public void disableGathering() {
140 this.schedulingEnabled.set(false);
144 public void continueInitializationAfterReconciliation() {
145 if (deviceContext.initialSubmitTransaction()) {
146 contextChainMastershipWatcher.onMasterRoleAcquired(deviceInfo, ContextChainMastershipState.INITIAL_SUBMIT);
148 startGatheringData();
150 contextChainMastershipWatcher
151 .onNotAbleToStartMastershipMandatory(deviceInfo, "Initial transaction cannot be submitted.");
156 public void instantiateServiceInstance() {
157 final List<MultipartType> statListForCollecting = new ArrayList<>();
159 if (devState.isTableStatisticsAvailable() && config.isIsTableStatisticsPollingOn()) {
160 statListForCollecting.add(MultipartType.OFPMPTABLE);
163 if (devState.isGroupAvailable() && config.isIsGroupStatisticsPollingOn()) {
164 statListForCollecting.add(MultipartType.OFPMPGROUPDESC);
165 statListForCollecting.add(MultipartType.OFPMPGROUP);
168 if (devState.isMetersAvailable() && config.isIsMeterStatisticsPollingOn()) {
169 statListForCollecting.add(MultipartType.OFPMPMETERCONFIG);
170 statListForCollecting.add(MultipartType.OFPMPMETER);
173 if (devState.isFlowStatisticsAvailable() && config.isIsFlowStatisticsPollingOn()) {
174 statListForCollecting.add(MultipartType.OFPMPFLOW);
177 if (devState.isPortStatisticsAvailable() && config.isIsPortStatisticsPollingOn()) {
178 statListForCollecting.add(MultipartType.OFPMPPORTSTATS);
181 if (devState.isQueueStatisticsAvailable() && config.isIsQueueStatisticsPollingOn()) {
182 statListForCollecting.add(MultipartType.OFPMPQUEUE);
185 collectingStatType = ImmutableList.copyOf(statListForCollecting);
186 Futures.addCallback(gatherDynamicData(), new InitialSubmitCallback(), MoreExecutors.directExecutor());
190 public ListenableFuture<Void> closeServiceInstance() {
191 return stopGatheringData();
195 public void close() {
196 Futures.addCallback(stopGatheringData(), new FutureCallback<Void>() {
198 public void onSuccess(@Nullable final Void result) {
199 requestContexts.forEach(requestContext -> RequestContextUtil
200 .closeRequestContextWithRpcError(requestContext, CONNECTION_CLOSED));
204 public void onFailure(final Throwable throwable) {
205 requestContexts.forEach(requestContext -> RequestContextUtil
206 .closeRequestContextWithRpcError(requestContext, CONNECTION_CLOSED));
208 }, MoreExecutors.directExecutor());
211 private ListenableFuture<Boolean> gatherDynamicData() {
212 if (!isStatisticsPollingOn || !schedulingEnabled.get()) {
213 LOG.debug("Statistics for device {} are not enabled.", getDeviceInfo().getNodeId().getValue());
214 return Futures.immediateFuture(Boolean.TRUE);
217 return this.lastDataGatheringRef.updateAndGet(future -> {
218 // write start timestamp to state snapshot container
219 StatisticsGatheringUtils.markDeviceStateSnapshotStart(deviceInfo, deviceContext);
221 // recreate gathering future if it should be recreated
222 final ListenableFuture<Boolean> lastDataGathering =
223 Objects.isNull(future) || future.isCancelled() || future.isDone() ? Futures
224 .immediateFuture(Boolean.TRUE) : future;
226 // build statistics gathering future
227 final ListenableFuture<Boolean> newDataGathering = collectingStatType.stream()
228 .reduce(lastDataGathering, this::statChainFuture,
229 (listenableFuture, asyn) -> Futures.transformAsync(listenableFuture, result -> asyn,
230 MoreExecutors.directExecutor()));
232 // write end timestamp to state snapshot container
233 Futures.addCallback(newDataGathering, new FutureCallback<Boolean>() {
235 public void onSuccess(@Nonnull final Boolean result) {
236 StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceInfo, deviceContext, result);
240 public void onFailure(final Throwable throwable) {
241 if (!(throwable instanceof TransactionChainClosedException)) {
242 StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceInfo, deviceContext, false);
245 }, MoreExecutors.directExecutor());
247 return newDataGathering;
251 private ListenableFuture<Boolean> statChainFuture(final ListenableFuture<Boolean> prevFuture,
252 final MultipartType multipartType) {
253 if (ConnectionContext.CONNECTION_STATE.RIP
254 .equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) {
255 final String errMsg = String
256 .format("Device connection for node %s doesn't exist anymore. Primary connection status : %s",
257 getDeviceInfo().getNodeId(),
258 deviceContext.getPrimaryConnectionContext().getConnectionState());
260 return Futures.immediateFailedFuture(new ConnectionException(errMsg));
263 return Futures.transformAsync(prevFuture, result -> {
264 LOG.debug("Status of previous stat iteration for node {}: {}", deviceInfo, result);
265 LOG.debug("Stats iterating to next type for node {} of type {}", deviceInfo, multipartType);
266 final boolean onTheFly = MultipartType.OFPMPFLOW.equals(multipartType);
267 final boolean supported = collectingStatType.contains(multipartType);
269 // TODO: Refactor twice sending deviceContext into gatheringStatistics
270 return supported ? StatisticsGatheringUtils
271 .gatherStatistics(onTheFly ? statisticsGatheringOnTheFlyService : statisticsGatheringService,
272 getDeviceInfo(), multipartType, deviceContext, deviceContext, convertorExecutor,
273 statisticsWriterProvider, executorService) : Futures
274 .immediateFuture(Boolean.FALSE);
275 }, MoreExecutors.directExecutor());
278 private void startGatheringData() {
279 if (!isStatisticsPollingOn) {
283 LOG.info("Starting statistics gathering for node {}", deviceInfo);
284 final StatisticsPollingService statisticsPollingService =
285 new StatisticsPollingService(timeCounter,
286 statisticsPollingInterval,
288 StatisticsContextImpl.this::gatherDynamicData);
290 schedulingEnabled.set(true);
291 statisticsPollingService.startAsync();
292 this.statisticsPollingServiceRef.set(statisticsPollingService);
295 private ListenableFuture<Void> stopGatheringData() {
296 LOG.info("Stopping running statistics gathering for node {}", deviceInfo);
297 cancelLastDataGathering();
299 return Optional.ofNullable(statisticsPollingServiceRef.getAndSet(null)).map(StatisticsPollingService::stop)
300 .orElseGet(() -> Futures.immediateFuture(null));
303 private void cancelLastDataGathering() {
304 final ListenableFuture<Boolean> future = lastDataGatheringRef.getAndSet(null);
306 if (Objects.nonNull(future) && !future.isDone() && !future.isCancelled()) {
312 void setStatisticsGatheringService(final StatisticsGatheringService<T> statisticsGatheringService) {
313 this.statisticsGatheringService = statisticsGatheringService;
317 void setStatisticsGatheringOnTheFlyService(
318 final StatisticsGatheringOnTheFlyService<T> statisticsGatheringOnTheFlyService) {
319 this.statisticsGatheringOnTheFlyService = statisticsGatheringOnTheFlyService;
322 private final class InitialSubmitCallback implements FutureCallback<Boolean> {
324 public void onSuccess(@Nullable final Boolean result) {
325 contextChainMastershipWatcher
326 .onMasterRoleAcquired(deviceInfo, ContextChainMastershipState.INITIAL_GATHERING);
328 if (!isUsingReconciliationFramework) {
329 continueInitializationAfterReconciliation();
334 public void onFailure(@Nonnull final Throwable throwable) {
335 contextChainMastershipWatcher.onNotAbleToStartMastershipMandatory(deviceInfo,
336 "Initial gathering statistics "
338 + throwable.getMessage());