2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.impl.statistics;
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Function;
13 import com.google.common.base.Preconditions;
14 import com.google.common.collect.ImmutableList;
15 import com.google.common.collect.Iterators;
16 import com.google.common.util.concurrent.AsyncFunction;
17 import com.google.common.util.concurrent.FutureCallback;
18 import com.google.common.util.concurrent.Futures;
19 import com.google.common.util.concurrent.ListenableFuture;
20 import io.netty.util.Timeout;
21 import java.util.ArrayList;
22 import java.util.Collection;
23 import java.util.HashSet;
24 import java.util.Iterator;
25 import java.util.List;
26 import java.util.Objects;
27 import javax.annotation.Nonnull;
28 import javax.annotation.Nullable;
29 import javax.annotation.concurrent.GuardedBy;
30 import org.opendaylight.mdsal.common.api.TransactionChainClosedException;
31 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
32 import org.opendaylight.openflowplugin.api.ConnectionException;
33 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
34 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
35 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
36 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
37 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
38 import org.opendaylight.openflowplugin.api.openflow.device.handlers.ClusterInitializationPhaseHandler;
39 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState;
40 import org.opendaylight.openflowplugin.api.openflow.lifecycle.MastershipChangeListener;
41 import org.opendaylight.openflowplugin.api.openflow.rpc.listener.ItemLifecycleListener;
42 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
43 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
44 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider;
45 import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
46 import org.opendaylight.openflowplugin.impl.rpc.listener.ItemLifecycleListenerImpl;
47 import org.opendaylight.openflowplugin.impl.services.util.RequestContextUtil;
48 import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringOnTheFlyService;
49 import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringService;
50 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
56 class StatisticsContextImpl<T extends OfHeader> implements StatisticsContext {
58 private static final Logger LOG = LoggerFactory.getLogger(StatisticsContextImpl.class);
59 private static final String CONNECTION_CLOSED = "Connection closed.";
61 private final ItemLifecycleListener itemLifeCycleListener;
62 private final Collection<RequestContext<?>> requestContexts = new HashSet<>();
63 private final DeviceContext deviceContext;
64 private final DeviceState devState;
65 private final boolean isStatisticsPollingOn;
66 private final Object collectionStatTypeLock = new Object();
67 private final ConvertorExecutor convertorExecutor;
68 private final MultipartWriterProvider statisticsWriterProvider;
69 @GuardedBy("collectionStatTypeLock")
70 private List<MultipartType> collectingStatType;
72 private StatisticsGatheringService<T> statisticsGatheringService;
73 private StatisticsGatheringOnTheFlyService<T> statisticsGatheringOnTheFlyService;
74 private Timeout pollTimeout;
75 private final DeviceInfo deviceInfo;
76 private final StatisticsManager myManager;
78 private volatile boolean schedulingEnabled;
79 private volatile ContextState state;
80 private ClusterInitializationPhaseHandler clusterInitializationPhaseHandler;
81 private ClusterInitializationPhaseHandler initialSubmitHandler;
83 private volatile ListenableFuture<Boolean> lastDataGathering;
85 StatisticsContextImpl(final boolean isStatisticsPollingOn,
86 @Nonnull final DeviceContext deviceContext,
87 @Nonnull final ConvertorExecutor convertorExecutor,
88 @Nonnull final StatisticsManager myManager,
89 @Nonnull final MultipartWriterProvider statisticsWriterProvider) {
90 this.deviceContext = deviceContext;
91 this.devState = Preconditions.checkNotNull(deviceContext.getDeviceState());
92 this.isStatisticsPollingOn = isStatisticsPollingOn;
93 this.convertorExecutor = convertorExecutor;
94 statisticsGatheringService = new StatisticsGatheringService<>(this, deviceContext);
95 statisticsGatheringOnTheFlyService = new StatisticsGatheringOnTheFlyService<>(this,
96 deviceContext, convertorExecutor, statisticsWriterProvider);
97 itemLifeCycleListener = new ItemLifecycleListenerImpl(deviceContext);
98 statListForCollectingInitialization();
99 this.state = ContextState.INITIALIZATION;
100 this.deviceInfo = deviceContext.getDeviceInfo();
101 this.myManager = myManager;
102 this.lastDataGathering = null;
103 this.statisticsWriterProvider = statisticsWriterProvider;
107 public void statListForCollectingInitialization() {
108 synchronized (collectionStatTypeLock) {
109 final List<MultipartType> statListForCollecting = new ArrayList<>();
110 if (devState.isTableStatisticsAvailable()) {
111 statListForCollecting.add(MultipartType.OFPMPTABLE);
113 if (devState.isFlowStatisticsAvailable()) {
114 statListForCollecting.add(MultipartType.OFPMPFLOW);
116 if (devState.isGroupAvailable()) {
117 statListForCollecting.add(MultipartType.OFPMPGROUPDESC);
118 statListForCollecting.add(MultipartType.OFPMPGROUP);
120 if (devState.isMetersAvailable()) {
121 statListForCollecting.add(MultipartType.OFPMPMETERCONFIG);
122 statListForCollecting.add(MultipartType.OFPMPMETER);
124 if (devState.isPortStatisticsAvailable()) {
125 statListForCollecting.add(MultipartType.OFPMPPORTSTATS);
127 if (devState.isQueueStatisticsAvailable()) {
128 statListForCollecting.add(MultipartType.OFPMPQUEUE);
130 collectingStatType = ImmutableList.copyOf(statListForCollecting);
135 public ListenableFuture<Boolean> gatherDynamicData() {
136 if (!isStatisticsPollingOn) {
137 LOG.debug("Statistics for device {} is not enabled.", getDeviceInfo().getNodeId().getValue());
138 return Futures.immediateFuture(Boolean.TRUE);
141 if (Objects.isNull(lastDataGathering)
142 || lastDataGathering.isCancelled()
143 || lastDataGathering.isDone()) {
144 lastDataGathering = Futures.immediateFuture(Boolean.TRUE);
147 synchronized (collectionStatTypeLock) {
148 // write start timestamp to state snapshot container
149 StatisticsGatheringUtils.markDeviceStateSnapshotStart(deviceContext);
151 lastDataGathering = collectingStatType.stream().reduce(
153 this::statChainFuture,
154 (a, b) -> Futures.transformAsync(a, (AsyncFunction<Boolean, Boolean>) result -> b));
156 // write end timestamp to state snapshot container
157 Futures.addCallback(lastDataGathering, new FutureCallback<Boolean>() {
159 public void onSuccess(final Boolean result) {
160 StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceContext, result);
164 public void onFailure(final Throwable t) {
165 if (!(t instanceof TransactionChainClosedException)) {
166 StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceContext, false);
172 return lastDataGathering;
175 private ListenableFuture<Boolean> chooseStat(final MultipartType multipartType){
176 ListenableFuture<Boolean> result = Futures.immediateCheckedFuture(Boolean.TRUE);
178 switch (multipartType) {
180 result = collectStatistics(multipartType, devState.isFlowStatisticsAvailable(), true);
183 result = collectStatistics(multipartType, devState.isTableStatisticsAvailable(), false);
186 result = collectStatistics(multipartType, devState.isPortStatisticsAvailable(), false);
189 result = collectStatistics(multipartType, devState.isQueueStatisticsAvailable(), false);
192 result = collectStatistics(multipartType, devState.isGroupAvailable(), false);
195 result = collectStatistics(multipartType, devState.isGroupAvailable(), false);
197 case OFPMPMETERCONFIG:
198 result = collectStatistics(multipartType, devState.isMetersAvailable(), false);
201 result = collectStatistics(multipartType, devState.isMetersAvailable(), false);
204 LOG.warn("Unsupported Statistics type {}", multipartType);
212 public <T> RequestContext<T> createRequestContext() {
213 final AbstractRequestContext<T> ret = new AbstractRequestContext<T>(deviceInfo.reserveXidForDeviceMessage()) {
215 public void close() {
216 requestContexts.remove(this);
219 requestContexts.add(ret);
224 public void close() {
225 if (ContextState.TERMINATION.equals(state)) {
226 if (LOG.isDebugEnabled()) {
227 LOG.debug("StatisticsContext for node {} is already in TERMINATION state.", getDeviceInfo().getLOGValue());
230 this.state = ContextState.TERMINATION;
233 for (final Iterator<RequestContext<?>> iterator = Iterators.consumingIterator(requestContexts.iterator());
234 iterator.hasNext(); ) {
235 RequestContextUtil.closeRequestContextWithRpcError(iterator.next(), CONNECTION_CLOSED);
241 public void setSchedulingEnabled(final boolean schedulingEnabled) {
242 this.schedulingEnabled = schedulingEnabled;
246 public boolean isSchedulingEnabled() {
247 return schedulingEnabled;
251 public void setPollTimeout(final Timeout pollTimeout) {
252 this.pollTimeout = pollTimeout;
255 private ListenableFuture<Boolean> statChainFuture(final ListenableFuture<Boolean> prevFuture, final MultipartType multipartType) {
256 return Futures.transformAsync(deviceConnectionCheck(), (AsyncFunction<Boolean, Boolean>) connectionResult -> Futures
257 .transformAsync(prevFuture, (AsyncFunction<Boolean, Boolean>) result -> {
258 LOG.debug("Status of previous stat iteration for node {}: {}", deviceInfo.getLOGValue(), result);
259 LOG.debug("Stats iterating to next type for node {} of type {}",
260 deviceInfo.getLOGValue(),
263 return chooseStat(multipartType);
268 ListenableFuture<Boolean> deviceConnectionCheck() {
269 if (ConnectionContext.CONNECTION_STATE.RIP.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) {
270 final String errMsg = String
271 .format("Device connection for node %s doesn't exist anymore. Primary connection status : %s",
272 getDeviceInfo().getNodeId(),
273 deviceContext.getPrimaryConnectionContext().getConnectionState());
275 return Futures.immediateFailedFuture(new ConnectionException(errMsg));
278 return Futures.immediateFuture(Boolean.TRUE);
281 private ListenableFuture<Boolean> collectStatistics(final MultipartType multipartType,
282 final boolean supported,
283 final boolean onTheFly) {
284 // TODO: Refactor twice sending deviceContext into gatheringStatistics
285 return supported ? StatisticsGatheringUtils.gatherStatistics(
286 onTheFly ? statisticsGatheringOnTheFlyService : statisticsGatheringService,
292 statisticsWriterProvider) : Futures.immediateFuture(Boolean.FALSE);
296 void setStatisticsGatheringService(final StatisticsGatheringService<T> statisticsGatheringService) {
297 this.statisticsGatheringService = statisticsGatheringService;
301 void setStatisticsGatheringOnTheFlyService(final StatisticsGatheringOnTheFlyService<T> statisticsGatheringOnTheFlyService) {
302 this.statisticsGatheringOnTheFlyService = statisticsGatheringOnTheFlyService;
306 public ItemLifecycleListener getItemLifeCycleListener () {
307 return itemLifeCycleListener;
311 public ServiceGroupIdentifier getServiceIdentifier() {
312 return this.deviceInfo.getServiceIdentifier();
316 public DeviceInfo getDeviceInfo() {
317 return this.deviceInfo;
321 public ListenableFuture<Void> stopClusterServices() {
322 if (ContextState.TERMINATION.equals(this.state)) {
323 return Futures.immediateCancelledFuture();
326 return Futures.transform(Futures.immediateFuture(null), new Function<Object, Void>() {
329 public Void apply(@Nullable Object input) {
330 schedulingEnabled = false;
338 public DeviceState gainDeviceState() {
339 return gainDeviceContext().getDeviceState();
343 public DeviceContext gainDeviceContext() {
344 return this.deviceContext;
348 public void stopGatheringData() {
349 LOG.info("Stopping running statistics gathering for node {}", deviceInfo.getLOGValue());
351 if (Objects.nonNull(lastDataGathering) && !lastDataGathering.isDone() && !lastDataGathering.isCancelled()) {
352 lastDataGathering.cancel(true);
355 if (Objects.nonNull(pollTimeout) && !pollTimeout.isExpired()) {
356 pollTimeout.cancel();
361 public void setLifecycleInitializationPhaseHandler(final ClusterInitializationPhaseHandler handler) {
362 this.clusterInitializationPhaseHandler = handler;
366 public boolean onContextInstantiateService(final MastershipChangeListener mastershipChangeListener) {
367 LOG.info("Starting statistics context cluster services for node {}", deviceInfo.getLOGValue());
368 this.statListForCollectingInitialization();
370 Futures.addCallback(this.gatherDynamicData(), new FutureCallback<Boolean>() {
372 public void onSuccess(@Nullable Boolean aBoolean) {
373 mastershipChangeListener.onMasterRoleAcquired(
375 ContextChainMastershipState.INITIAL_GATHERING
378 if (initialSubmitHandler.initialSubmitTransaction()) {
379 mastershipChangeListener.onMasterRoleAcquired(
381 ContextChainMastershipState.INITIAL_SUBMIT
384 if (isStatisticsPollingOn) {
385 myManager.startScheduling(deviceInfo);
388 mastershipChangeListener.onNotAbleToStartMastershipMandatory(
390 "Initial transaction cannot be submitted."
396 public void onFailure(@Nonnull Throwable throwable) {
397 mastershipChangeListener.onNotAbleToStartMastershipMandatory(
399 "Initial gathering statistics unsuccessful."
404 return this.clusterInitializationPhaseHandler.onContextInstantiateService(mastershipChangeListener);
408 public void setInitialSubmitHandler(final ClusterInitializationPhaseHandler initialSubmitHandler) {
409 this.initialSubmitHandler = initialSubmitHandler;