2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.impl.statistics;
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import com.google.common.collect.ImmutableList;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import com.google.common.util.concurrent.ListeningExecutorService;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import java.util.ArrayList;
19 import java.util.Collection;
20 import java.util.List;
21 import java.util.Optional;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.atomic.AtomicBoolean;
24 import java.util.concurrent.atomic.AtomicReference;
25 import org.eclipse.jdt.annotation.NonNull;
26 import org.opendaylight.mdsal.binding.api.TransactionChainClosedException;
27 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
28 import org.opendaylight.openflowplugin.api.ConnectionException;
29 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
30 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
31 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
32 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
33 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
34 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState;
35 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipWatcher;
36 import org.opendaylight.openflowplugin.api.openflow.lifecycle.DeviceInitializationContext;
37 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
38 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider;
39 import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
40 import org.opendaylight.openflowplugin.impl.services.util.RequestContextUtil;
41 import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringOnTheFlyService;
42 import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringService;
43 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.provider.config.rev160510.OpenflowProviderConfig;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
50 class StatisticsContextImpl<T extends OfHeader> implements StatisticsContext, DeviceInitializationContext {
52 private static final Logger LOG = LoggerFactory.getLogger(StatisticsContextImpl.class);
53 private static final String CONNECTION_CLOSED = "Connection closed.";
55 private final Collection<RequestContext<?>> requestContexts = ConcurrentHashMap.newKeySet();
56 private final DeviceContext deviceContext;
57 private final DeviceState devState;
58 private final ListeningExecutorService executorService;
59 private final boolean isStatisticsPollingOn;
60 private final ConvertorExecutor convertorExecutor;
61 private final MultipartWriterProvider statisticsWriterProvider;
62 private final DeviceInfo deviceInfo;
63 private final TimeCounter timeCounter = new TimeCounter();
64 private final OpenflowProviderConfig config;
65 private final long statisticsPollingInterval;
66 private final long maximumPollingDelay;
67 private final boolean isUsingReconciliationFramework;
68 private final AtomicBoolean schedulingEnabled = new AtomicBoolean(true);
69 private final AtomicReference<ListenableFuture<Boolean>> lastDataGatheringRef = new AtomicReference<>();
70 private final AtomicReference<StatisticsPollingService> statisticsPollingServiceRef = new AtomicReference<>();
71 private List<MultipartType> collectingStatType;
72 private StatisticsGatheringService<T> statisticsGatheringService;
73 private StatisticsGatheringOnTheFlyService<T> statisticsGatheringOnTheFlyService;
74 private ContextChainMastershipWatcher contextChainMastershipWatcher;
76 StatisticsContextImpl(@NonNull final DeviceContext deviceContext,
77 @NonNull final ConvertorExecutor convertorExecutor,
78 @NonNull final MultipartWriterProvider statisticsWriterProvider,
79 @NonNull final ListeningExecutorService executorService,
80 @NonNull final OpenflowProviderConfig config,
81 final boolean isStatisticsPollingOn,
82 final boolean isUsingReconciliationFramework) {
83 this.deviceContext = deviceContext;
84 this.devState = Preconditions.checkNotNull(deviceContext.getDeviceState());
85 this.executorService = executorService;
86 this.isStatisticsPollingOn = isStatisticsPollingOn;
88 this.convertorExecutor = convertorExecutor;
89 this.deviceInfo = deviceContext.getDeviceInfo();
90 this.statisticsPollingInterval = config.getBasicTimerDelay().getValue().toJava();
91 this.maximumPollingDelay = config.getMaximumTimerDelay().getValue().toJava();
92 this.statisticsWriterProvider = statisticsWriterProvider;
93 this.isUsingReconciliationFramework = isUsingReconciliationFramework;
95 statisticsGatheringService = new StatisticsGatheringService<>(this, deviceContext);
96 statisticsGatheringOnTheFlyService = new StatisticsGatheringOnTheFlyService<>(this, deviceContext,
98 statisticsWriterProvider);
102 public DeviceInfo getDeviceInfo() {
103 return this.deviceInfo;
108 public ServiceGroupIdentifier getIdentifier() {
109 return deviceInfo.getServiceIdentifier();
113 public void registerMastershipWatcher(@NonNull final ContextChainMastershipWatcher newWatcher) {
114 this.contextChainMastershipWatcher = newWatcher;
118 public <O> RequestContext<O> createRequestContext() {
119 final AbstractRequestContext<O> ret = new AbstractRequestContext<>(deviceInfo.reserveXidForDeviceMessage()) {
121 public void close() {
122 requestContexts.remove(this);
126 requestContexts.add(ret);
131 public void enableGathering() {
132 this.schedulingEnabled.set(true);
136 public void disableGathering() {
137 this.schedulingEnabled.set(false);
141 public void continueInitializationAfterReconciliation() {
142 if (deviceContext.initialSubmitTransaction()) {
143 contextChainMastershipWatcher.onMasterRoleAcquired(deviceInfo, ContextChainMastershipState.INITIAL_SUBMIT);
145 startGatheringData();
147 contextChainMastershipWatcher
148 .onNotAbleToStartMastershipMandatory(deviceInfo, "Initial transaction cannot be submitted.");
153 public void instantiateServiceInstance() {
158 public void initializeDevice() {
159 final List<MultipartType> statListForCollecting = new ArrayList<>();
161 if (devState.isGroupAvailable() && config.isIsGroupStatisticsPollingOn()) {
162 statListForCollecting.add(MultipartType.OFPMPGROUPDESC);
163 statListForCollecting.add(MultipartType.OFPMPGROUP);
166 if (devState.isMetersAvailable() && config.isIsMeterStatisticsPollingOn()) {
167 statListForCollecting.add(MultipartType.OFPMPMETERCONFIG);
168 statListForCollecting.add(MultipartType.OFPMPMETER);
171 if (devState.isFlowStatisticsAvailable() && config.isIsFlowStatisticsPollingOn()) {
172 statListForCollecting.add(MultipartType.OFPMPFLOW);
175 if (devState.isTableStatisticsAvailable() && config.isIsTableStatisticsPollingOn()) {
176 statListForCollecting.add(MultipartType.OFPMPTABLE);
179 if (devState.isPortStatisticsAvailable() && config.isIsPortStatisticsPollingOn()) {
180 statListForCollecting.add(MultipartType.OFPMPPORTSTATS);
183 if (devState.isQueueStatisticsAvailable() && config.isIsQueueStatisticsPollingOn()) {
184 statListForCollecting.add(MultipartType.OFPMPQUEUE);
187 collectingStatType = ImmutableList.copyOf(statListForCollecting);
188 Futures.addCallback(gatherDynamicData(), new InitialSubmitCallback(), MoreExecutors.directExecutor());
192 public ListenableFuture<Void> closeServiceInstance() {
193 return stopGatheringData();
197 public void close() {
198 Futures.addCallback(stopGatheringData(), new FutureCallback<Void>() {
200 public void onSuccess(final Void result) {
201 requestContexts.forEach(requestContext -> RequestContextUtil
202 .closeRequestContextWithRpcError(requestContext, CONNECTION_CLOSED));
206 public void onFailure(final Throwable throwable) {
207 requestContexts.forEach(requestContext -> RequestContextUtil
208 .closeRequestContextWithRpcError(requestContext, CONNECTION_CLOSED));
210 }, MoreExecutors.directExecutor());
213 private ListenableFuture<Boolean> gatherDynamicData() {
214 if (!isStatisticsPollingOn || !schedulingEnabled.get()) {
215 LOG.debug("Statistics for device {} are not enabled.", getDeviceInfo().getNodeId().getValue());
216 return Futures.immediateFuture(Boolean.TRUE);
219 return this.lastDataGatheringRef.updateAndGet(future -> {
220 // write start timestamp to state snapshot container
221 StatisticsGatheringUtils.markDeviceStateSnapshotStart(deviceInfo, deviceContext);
223 // recreate gathering future if it should be recreated
224 final ListenableFuture<Boolean> lastDataGathering = future == null || future.isCancelled()
225 || future.isDone() ? Futures.immediateFuture(Boolean.TRUE) : future;
227 // build statistics gathering future
228 final ListenableFuture<Boolean> newDataGathering = collectingStatType.stream()
229 .reduce(lastDataGathering, this::statChainFuture,
230 (listenableFuture, asyn) -> Futures.transformAsync(listenableFuture, result -> asyn,
231 MoreExecutors.directExecutor()));
233 // write end timestamp to state snapshot container
234 Futures.addCallback(newDataGathering, new FutureCallback<Boolean>() {
236 public void onSuccess(final Boolean result) {
237 StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceInfo, deviceContext, result);
241 public void onFailure(final Throwable throwable) {
242 if (!(throwable instanceof TransactionChainClosedException)) {
243 StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceInfo, deviceContext, false);
246 }, MoreExecutors.directExecutor());
248 return newDataGathering;
252 private ListenableFuture<Boolean> statChainFuture(final ListenableFuture<Boolean> prevFuture,
253 final MultipartType multipartType) {
254 if (ConnectionContext.CONNECTION_STATE.RIP
255 .equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) {
256 final String errMsg = String
257 .format("Device connection for node %s doesn't exist anymore. Primary connection status : %s",
258 getDeviceInfo().getNodeId(),
259 deviceContext.getPrimaryConnectionContext().getConnectionState());
261 return Futures.immediateFailedFuture(new ConnectionException(errMsg));
264 return Futures.transformAsync(prevFuture, result -> {
265 LOG.debug("Status of previous stat iteration for node {}: {}", deviceInfo, result);
266 LOG.debug("Stats iterating to next type for node {} of type {}", deviceInfo, multipartType);
267 final boolean onTheFly = MultipartType.OFPMPFLOW.equals(multipartType);
268 final boolean supported = collectingStatType.contains(multipartType);
270 // TODO: Refactor twice sending deviceContext into gatheringStatistics
271 return supported ? StatisticsGatheringUtils
272 .gatherStatistics(onTheFly ? statisticsGatheringOnTheFlyService : statisticsGatheringService,
273 getDeviceInfo(), multipartType, deviceContext, deviceContext, convertorExecutor,
274 statisticsWriterProvider, executorService) : Futures
275 .immediateFuture(Boolean.FALSE);
276 }, MoreExecutors.directExecutor());
279 private void startGatheringData() {
280 if (!isStatisticsPollingOn) {
284 LOG.info("Starting statistics gathering for node {}", deviceInfo);
285 final StatisticsPollingService statisticsPollingService =
286 new StatisticsPollingService(timeCounter,
287 statisticsPollingInterval,
289 StatisticsContextImpl.this::gatherDynamicData);
291 schedulingEnabled.set(true);
292 statisticsPollingService.startAsync();
293 this.statisticsPollingServiceRef.set(statisticsPollingService);
296 private ListenableFuture<Void> stopGatheringData() {
297 LOG.info("Stopping running statistics gathering for node {}", deviceInfo);
298 cancelLastDataGathering();
300 return Optional.ofNullable(statisticsPollingServiceRef.getAndSet(null)).map(StatisticsPollingService::stop)
301 .orElseGet(() -> Futures.immediateFuture(null));
304 private void cancelLastDataGathering() {
305 final ListenableFuture<Boolean> future = lastDataGatheringRef.getAndSet(null);
307 if (future != null && !future.isDone() && !future.isCancelled()) {
313 void setStatisticsGatheringService(final StatisticsGatheringService<T> statisticsGatheringService) {
314 this.statisticsGatheringService = statisticsGatheringService;
318 void setStatisticsGatheringOnTheFlyService(
319 final StatisticsGatheringOnTheFlyService<T> statisticsGatheringOnTheFlyService) {
320 this.statisticsGatheringOnTheFlyService = statisticsGatheringOnTheFlyService;
323 private final class InitialSubmitCallback implements FutureCallback<Boolean> {
325 public void onSuccess(final Boolean result) {
326 if (!isUsingReconciliationFramework) {
327 continueInitializationAfterReconciliation();
332 public void onFailure(final Throwable throwable) {
333 contextChainMastershipWatcher.onNotAbleToStartMastershipMandatory(deviceInfo,
334 "Initial gathering statistics "
336 + throwable.getMessage());