2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.impl.statistics;
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.collect.ImmutableList;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import java.util.ArrayList;
18 import java.util.Collection;
19 import java.util.HashSet;
20 import java.util.List;
21 import java.util.Objects;
22 import java.util.Optional;
23 import java.util.concurrent.atomic.AtomicBoolean;
24 import java.util.concurrent.atomic.AtomicReference;
25 import javax.annotation.Nonnull;
26 import javax.annotation.Nullable;
27 import org.opendaylight.mdsal.common.api.TransactionChainClosedException;
28 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
29 import org.opendaylight.openflowplugin.api.ConnectionException;
30 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
31 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
32 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
33 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
34 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
35 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState;
36 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipWatcher;
37 import org.opendaylight.openflowplugin.api.openflow.rpc.listener.ItemLifecycleListener;
38 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
39 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider;
40 import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
41 import org.opendaylight.openflowplugin.impl.rpc.listener.ItemLifecycleListenerImpl;
42 import org.opendaylight.openflowplugin.impl.services.util.RequestContextUtil;
43 import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringOnTheFlyService;
44 import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringService;
45 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
51 class StatisticsContextImpl<T extends OfHeader> implements StatisticsContext {
53 private static final Logger LOG = LoggerFactory.getLogger(StatisticsContextImpl.class);
54 private static final String CONNECTION_CLOSED = "Connection closed.";
56 private final ItemLifecycleListener itemLifeCycleListener;
57 private final Collection<RequestContext<?>> requestContexts = new HashSet<>();
58 private final DeviceContext deviceContext;
59 private final DeviceState devState;
60 private final boolean isStatisticsPollingOn;
61 private final ConvertorExecutor convertorExecutor;
62 private final MultipartWriterProvider statisticsWriterProvider;
63 private final DeviceInfo deviceInfo;
64 private final TimeCounter timeCounter = new TimeCounter();
65 private final long statisticsPollingInterval;
66 private final long maximumPollingDelay;
67 private final boolean isUsingReconciliationFramework;
68 private final AtomicBoolean schedulingEnabled = new AtomicBoolean(true);
69 private final AtomicReference<ListenableFuture<Boolean>> lastDataGathering = new AtomicReference<>();
70 private final AtomicReference<StatisticsPollingService> statisticsPollingService = new AtomicReference<>();
71 private List<MultipartType> collectingStatType;
72 private StatisticsGatheringService<T> statisticsGatheringService;
73 private StatisticsGatheringOnTheFlyService<T> statisticsGatheringOnTheFlyService;
74 private ContextChainMastershipWatcher contextChainMastershipWatcher;
76 StatisticsContextImpl(@Nonnull final DeviceContext deviceContext,
77 @Nonnull final ConvertorExecutor convertorExecutor,
78 @Nonnull final MultipartWriterProvider statisticsWriterProvider,
79 boolean isStatisticsPollingOn,
80 boolean isUsingReconciliationFramework,
81 long statisticsPollingInterval,
82 long maximumPollingDelay) {
83 this.deviceContext = deviceContext;
84 this.devState = Preconditions.checkNotNull(deviceContext.getDeviceState());
85 this.isStatisticsPollingOn = isStatisticsPollingOn;
86 this.convertorExecutor = convertorExecutor;
87 this.itemLifeCycleListener = new ItemLifecycleListenerImpl(deviceContext);
88 this.deviceInfo = deviceContext.getDeviceInfo();
89 this.statisticsPollingInterval = statisticsPollingInterval;
90 this.maximumPollingDelay = maximumPollingDelay;
91 this.statisticsWriterProvider = statisticsWriterProvider;
92 this.isUsingReconciliationFramework = isUsingReconciliationFramework;
94 statisticsGatheringService = new StatisticsGatheringService<>(this, deviceContext);
95 statisticsGatheringOnTheFlyService = new StatisticsGatheringOnTheFlyService<>(this,
96 deviceContext, convertorExecutor, statisticsWriterProvider);
100 public DeviceInfo getDeviceInfo() {
101 return this.deviceInfo;
106 public ServiceGroupIdentifier getIdentifier() {
107 return deviceInfo.getServiceIdentifier();
111 public void registerMastershipWatcher(@Nonnull final ContextChainMastershipWatcher contextChainMastershipWatcher) {
112 this.contextChainMastershipWatcher = contextChainMastershipWatcher;
116 public <O> RequestContext<O> createRequestContext() {
117 final AbstractRequestContext<O> ret = new AbstractRequestContext<O>(deviceInfo.reserveXidForDeviceMessage()) {
119 public void close() {
120 requestContexts.remove(this);
124 requestContexts.add(ret);
129 public void enableGathering() {
130 this.schedulingEnabled.set(true);
131 deviceContext.getItemLifeCycleSourceRegistry()
132 .getLifeCycleSources().forEach(itemLifeCycleSource -> itemLifeCycleSource
133 .setItemLifecycleListener(null));
137 public void disableGathering() {
138 this.schedulingEnabled.set(false);
139 deviceContext.getItemLifeCycleSourceRegistry()
140 .getLifeCycleSources().forEach(itemLifeCycleSource -> itemLifeCycleSource
141 .setItemLifecycleListener(itemLifeCycleListener));
145 public void continueInitializationAfterReconciliation() {
146 if (deviceContext.initialSubmitTransaction()) {
147 contextChainMastershipWatcher.onMasterRoleAcquired(
149 ContextChainMastershipState.INITIAL_SUBMIT);
151 startGatheringData();
153 contextChainMastershipWatcher.onNotAbleToStartMastershipMandatory(
155 "Initial transaction cannot be submitted.");
160 public void instantiateServiceInstance() {
161 final List<MultipartType> statListForCollecting = new ArrayList<>();
163 if (devState.isTableStatisticsAvailable()) {
164 statListForCollecting.add(MultipartType.OFPMPTABLE);
167 if (devState.isFlowStatisticsAvailable()) {
168 statListForCollecting.add(MultipartType.OFPMPFLOW);
171 if (devState.isGroupAvailable()) {
172 statListForCollecting.add(MultipartType.OFPMPGROUPDESC);
173 statListForCollecting.add(MultipartType.OFPMPGROUP);
176 if (devState.isMetersAvailable()) {
177 statListForCollecting.add(MultipartType.OFPMPMETERCONFIG);
178 statListForCollecting.add(MultipartType.OFPMPMETER);
181 if (devState.isPortStatisticsAvailable()) {
182 statListForCollecting.add(MultipartType.OFPMPPORTSTATS);
185 if (devState.isQueueStatisticsAvailable()) {
186 statListForCollecting.add(MultipartType.OFPMPQUEUE);
189 collectingStatType = ImmutableList.copyOf(statListForCollecting);
190 Futures.addCallback(gatherDynamicData(), new InitialSubmitCallback());
194 public ListenableFuture<Void> closeServiceInstance() {
195 return stopGatheringData();
199 public void close() {
200 Futures.addCallback(stopGatheringData(), new FutureCallback<Void>() {
202 public void onSuccess(@Nullable final Void result) {
203 requestContexts.forEach(requestContext -> RequestContextUtil
204 .closeRequestContextWithRpcError(requestContext, CONNECTION_CLOSED));
208 public void onFailure(final Throwable t) {
209 requestContexts.forEach(requestContext -> RequestContextUtil
210 .closeRequestContextWithRpcError(requestContext, CONNECTION_CLOSED));
215 private ListenableFuture<Boolean> gatherDynamicData() {
216 if (!isStatisticsPollingOn || !schedulingEnabled.get()) {
217 LOG.debug("Statistics for device {} are not enabled.", getDeviceInfo().getNodeId().getValue());
218 return Futures.immediateFuture(Boolean.TRUE);
221 return this.lastDataGathering.updateAndGet(future -> {
222 // write start timestamp to state snapshot container
223 StatisticsGatheringUtils.markDeviceStateSnapshotStart(deviceInfo, deviceContext);
225 // recreate gathering future if it should be recreated
226 final ListenableFuture<Boolean> lastDataGathering = Objects.isNull(future) ||
227 future.isCancelled() ||
229 Futures.immediateFuture(Boolean.TRUE) :
232 // build statistics gathering future
233 final ListenableFuture<Boolean> newDataGathering = collectingStatType.stream().reduce(
235 this::statChainFuture,
236 (a, b) -> Futures.transformAsync(a, result -> b));
238 // write end timestamp to state snapshot container
239 Futures.addCallback(newDataGathering, new FutureCallback<Boolean>() {
241 public void onSuccess(final Boolean result) {
242 StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceInfo, deviceContext, result);
246 public void onFailure(final Throwable t) {
247 if (!(t instanceof TransactionChainClosedException)) {
248 StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceInfo, deviceContext, false);
253 return newDataGathering;
257 private ListenableFuture<Boolean> statChainFuture(final ListenableFuture<Boolean> prevFuture, final MultipartType multipartType) {
258 if (ConnectionContext.CONNECTION_STATE.RIP.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) {
259 final String errMsg = String
260 .format("Device connection for node %s doesn't exist anymore. Primary connection status : %s",
261 getDeviceInfo().getNodeId(),
262 deviceContext.getPrimaryConnectionContext().getConnectionState());
264 return Futures.immediateFailedFuture(new ConnectionException(errMsg));
267 return Futures.transformAsync(prevFuture, result -> {
268 LOG.debug("Status of previous stat iteration for node {}: {}", deviceInfo, result);
269 LOG.debug("Stats iterating to next type for node {} of type {}", deviceInfo, multipartType);
270 final boolean onTheFly = MultipartType.OFPMPFLOW.equals(multipartType);
271 final boolean supported = collectingStatType.contains(multipartType);
273 // TODO: Refactor twice sending deviceContext into gatheringStatistics
274 return supported ? StatisticsGatheringUtils.gatherStatistics(
275 onTheFly ? statisticsGatheringOnTheFlyService : statisticsGatheringService,
281 statisticsWriterProvider) : Futures.immediateFuture(Boolean.FALSE);
285 private void startGatheringData() {
286 if (!isStatisticsPollingOn) {
290 LOG.info("Starting statistics gathering for node {}", deviceInfo);
291 final StatisticsPollingService statisticsPollingService = new StatisticsPollingService(timeCounter,
292 statisticsPollingInterval, maximumPollingDelay,
293 StatisticsContextImpl.this::gatherDynamicData);
295 schedulingEnabled.set(true);
296 statisticsPollingService.startAsync();
297 this.statisticsPollingService.set(statisticsPollingService);
300 private ListenableFuture<Void> stopGatheringData() {
301 LOG.info("Stopping running statistics gathering for node {}", deviceInfo);
302 cancelLastDataGathering();
305 .ofNullable(statisticsPollingService.getAndSet(null))
306 .map(StatisticsPollingService::stop)
307 .orElseGet(() -> Futures.immediateFuture(null));
310 private void cancelLastDataGathering() {
311 final ListenableFuture<Boolean> future = lastDataGathering.getAndSet(null);
313 if (Objects.nonNull(future) && !future.isDone() && !future.isCancelled()) {
319 void setStatisticsGatheringService(final StatisticsGatheringService<T> statisticsGatheringService) {
320 this.statisticsGatheringService = statisticsGatheringService;
324 void setStatisticsGatheringOnTheFlyService(final StatisticsGatheringOnTheFlyService<T> statisticsGatheringOnTheFlyService) {
325 this.statisticsGatheringOnTheFlyService = statisticsGatheringOnTheFlyService;
328 private final class InitialSubmitCallback implements FutureCallback<Boolean> {
330 public void onSuccess(@Nullable final Boolean result) {
331 contextChainMastershipWatcher.onMasterRoleAcquired(
333 ContextChainMastershipState.INITIAL_GATHERING
336 if (!isUsingReconciliationFramework) {
337 continueInitializationAfterReconciliation();
342 public void onFailure(@Nonnull final Throwable t) {
343 contextChainMastershipWatcher.onNotAbleToStartMastershipMandatory(
345 "Initial gathering statistics unsuccessful: " + t.getMessage());