2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.impl.statistics;
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Function;
13 import com.google.common.base.Preconditions;
14 import com.google.common.collect.ImmutableList;
15 import com.google.common.collect.Iterators;
16 import com.google.common.util.concurrent.FutureCallback;
17 import com.google.common.util.concurrent.Futures;
18 import com.google.common.util.concurrent.ListenableFuture;
19 import com.google.common.util.concurrent.SettableFuture;
20 import io.netty.util.Timeout;
21 import java.util.ArrayList;
22 import java.util.Collection;
23 import java.util.HashSet;
24 import java.util.Iterator;
25 import java.util.List;
26 import java.util.Objects;
27 import java.util.Optional;
28 import javax.annotation.Nonnull;
29 import javax.annotation.Nullable;
30 import javax.annotation.concurrent.GuardedBy;
31 import org.opendaylight.mdsal.common.api.TransactionChainClosedException;
32 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
33 import org.opendaylight.openflowplugin.api.ConnectionException;
34 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
35 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
36 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
37 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
38 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
39 import org.opendaylight.openflowplugin.api.openflow.device.handlers.ClusterInitializationPhaseHandler;
40 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState;
41 import org.opendaylight.openflowplugin.api.openflow.lifecycle.MastershipChangeListener;
42 import org.opendaylight.openflowplugin.api.openflow.rpc.listener.ItemLifecycleListener;
43 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
44 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
45 import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
46 import org.opendaylight.openflowplugin.impl.rpc.listener.ItemLifecycleListenerImpl;
47 import org.opendaylight.openflowplugin.impl.services.util.RequestContextUtil;
48 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider;
49 import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringOnTheFlyService;
50 import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringService;
51 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
57 class StatisticsContextImpl<T extends OfHeader> implements StatisticsContext {
59 private static final Logger LOG = LoggerFactory.getLogger(StatisticsContextImpl.class);
60 private static final String CONNECTION_CLOSED = "Connection closed.";
62 private final ItemLifecycleListener itemLifeCycleListener;
63 private final Collection<RequestContext<?>> requestContexts = new HashSet<>();
64 private final DeviceContext deviceContext;
65 private final DeviceState devState;
66 private final ListenableFuture<Boolean> emptyFuture;
67 private final boolean isStatisticsPollingOn;
68 private final Object collectionStatTypeLock = new Object();
69 private final ConvertorExecutor convertorExecutor;
70 private final MultipartWriterProvider statisticsWriterProvider;
71 @GuardedBy("collectionStatTypeLock")
72 private List<MultipartType> collectingStatType;
74 private StatisticsGatheringService<T> statisticsGatheringService;
75 private StatisticsGatheringOnTheFlyService<T> statisticsGatheringOnTheFlyService;
76 private Timeout pollTimeout;
77 private final DeviceInfo deviceInfo;
78 private final StatisticsManager myManager;
80 private volatile boolean schedulingEnabled;
81 private volatile CONTEXT_STATE state;
82 private ClusterInitializationPhaseHandler clusterInitializationPhaseHandler;
83 private ClusterInitializationPhaseHandler initialSubmitHandler;
85 private ListenableFuture<Boolean> lastDataGathering;
87 StatisticsContextImpl(final boolean isStatisticsPollingOn,
88 @Nonnull final DeviceContext deviceContext,
89 @Nonnull final ConvertorExecutor convertorExecutor,
90 @Nonnull final StatisticsManager myManager,
91 @Nonnull final MultipartWriterProvider statisticsWriterProvider) {
92 this.deviceContext = deviceContext;
93 this.devState = Preconditions.checkNotNull(deviceContext.getDeviceState());
94 this.isStatisticsPollingOn = isStatisticsPollingOn;
95 this.convertorExecutor = convertorExecutor;
96 emptyFuture = Futures.immediateFuture(false);
97 statisticsGatheringService = new StatisticsGatheringService<>(this, deviceContext);
98 statisticsGatheringOnTheFlyService = new StatisticsGatheringOnTheFlyService<>(this,
99 deviceContext, convertorExecutor, statisticsWriterProvider);
100 itemLifeCycleListener = new ItemLifecycleListenerImpl(deviceContext);
101 statListForCollectingInitialization();
102 this.state = CONTEXT_STATE.INITIALIZATION;
103 this.deviceInfo = deviceContext.getDeviceInfo();
104 this.myManager = myManager;
105 this.lastDataGathering = null;
106 this.statisticsWriterProvider = statisticsWriterProvider;
110 public void statListForCollectingInitialization() {
111 synchronized (collectionStatTypeLock) {
112 final List<MultipartType> statListForCollecting = new ArrayList<>();
113 if (devState.isTableStatisticsAvailable()) {
114 statListForCollecting.add(MultipartType.OFPMPTABLE);
116 if (devState.isFlowStatisticsAvailable()) {
117 statListForCollecting.add(MultipartType.OFPMPFLOW);
119 if (devState.isGroupAvailable()) {
120 statListForCollecting.add(MultipartType.OFPMPGROUPDESC);
121 statListForCollecting.add(MultipartType.OFPMPGROUP);
123 if (devState.isMetersAvailable()) {
124 statListForCollecting.add(MultipartType.OFPMPMETERCONFIG);
125 statListForCollecting.add(MultipartType.OFPMPMETER);
127 if (devState.isPortStatisticsAvailable()) {
128 statListForCollecting.add(MultipartType.OFPMPPORTSTATS);
130 if (devState.isQueueStatisticsAvailable()) {
131 statListForCollecting.add(MultipartType.OFPMPQUEUE);
133 collectingStatType = ImmutableList.<MultipartType>copyOf(statListForCollecting);
139 public ListenableFuture<Boolean> initialGatherDynamicData() {
140 return gatherDynamicData(true);
144 public ListenableFuture<Boolean> gatherDynamicData(){
145 return gatherDynamicData(false);
148 private ListenableFuture<Boolean> gatherDynamicData(final boolean initial) {
149 this.lastDataGathering = null;
150 if (!isStatisticsPollingOn) {
151 LOG.debug("Statistics for device {} is not enabled.", getDeviceInfo().getNodeId().getValue());
152 return Futures.immediateFuture(Boolean.TRUE);
154 final ListenableFuture<Boolean> errorResultFuture = deviceConnectionCheck();
155 if (errorResultFuture != null) {
156 return errorResultFuture;
158 synchronized (collectionStatTypeLock) {
159 final Iterator<MultipartType> statIterator = collectingStatType.iterator();
160 final SettableFuture<Boolean> settableStatResultFuture = SettableFuture.create();
162 // write start timestamp to state snapshot container
163 StatisticsGatheringUtils.markDeviceStateSnapshotStart(deviceContext);
165 statChainFuture(statIterator, settableStatResultFuture, initial);
167 // write end timestamp to state snapshot container
168 Futures.addCallback(settableStatResultFuture, new FutureCallback<Boolean>() {
170 public void onSuccess(@Nullable final Boolean result) {
171 StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceContext, true);
174 public void onFailure(final Throwable t) {
175 if (!(t instanceof TransactionChainClosedException)) {
176 StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceContext, false);
180 this.lastDataGathering = settableStatResultFuture;
181 return settableStatResultFuture;
185 private ListenableFuture<Boolean> chooseStat(final MultipartType multipartType, final boolean initial){
186 ListenableFuture<Boolean> result = Futures.immediateCheckedFuture(Boolean.TRUE);
188 switch (multipartType) {
190 result = collectFlowStatistics(multipartType, initial);
193 result = collectTableStatistics(multipartType);
196 result = collectPortStatistics(multipartType);
199 result = collectQueueStatistics(multipartType);
202 result = collectGroupDescStatistics(multipartType);
205 result = collectGroupStatistics(multipartType);
207 case OFPMPMETERCONFIG:
208 result = collectMeterConfigStatistics(multipartType);
211 result = collectMeterStatistics(multipartType);
214 LOG.warn("Unsupported Statistics type {}", multipartType);
222 public <T> RequestContext<T> createRequestContext() {
223 final AbstractRequestContext<T> ret = new AbstractRequestContext<T>(deviceInfo.reserveXidForDeviceMessage()) {
225 public void close() {
226 requestContexts.remove(this);
229 requestContexts.add(ret);
234 public void close() {
235 if (CONTEXT_STATE.TERMINATION.equals(getState())) {
236 if (LOG.isDebugEnabled()) {
237 LOG.debug("StatisticsContext for node {} is already in TERMINATION state.", getDeviceInfo().getLOGValue());
241 this.state = CONTEXT_STATE.TERMINATION;
243 for (final Iterator<RequestContext<?>> iterator = Iterators.consumingIterator(requestContexts.iterator());
244 iterator.hasNext(); ) {
245 RequestContextUtil.closeRequestContextWithRpcError(iterator.next(), CONNECTION_CLOSED);
248 if (null != pollTimeout && !pollTimeout.isExpired()) {
249 pollTimeout.cancel();
255 public void setSchedulingEnabled(final boolean schedulingEnabled) {
256 this.schedulingEnabled = schedulingEnabled;
260 public boolean isSchedulingEnabled() {
261 return schedulingEnabled;
265 public void setPollTimeout(final Timeout pollTimeout) {
266 this.pollTimeout = pollTimeout;
270 public Optional<Timeout> getPollTimeout() {
271 return Optional.ofNullable(pollTimeout);
274 private void statChainFuture(final Iterator<MultipartType> iterator, final SettableFuture<Boolean> resultFuture, final boolean initial) {
275 if (ConnectionContext.CONNECTION_STATE.RIP.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) {
276 final String errMsg = String.format("Device connection is closed for Node : %s.",
277 getDeviceInfo().getNodeId());
279 resultFuture.setException(new ConnectionException(errMsg));
283 if (!iterator.hasNext()) {
285 Futures.addCallback(StatisticsGatheringUtils.gatherStatistics(
286 statisticsGatheringService,
288 MultipartType.OFPMPPORTDESC,
293 statisticsWriterProvider), new FutureCallback<Boolean>() {
295 public void onSuccess(final Boolean result) {
296 statChainFuture(iterator, resultFuture, false);
299 public void onFailure(@Nonnull final Throwable t) {
300 resultFuture.setException(t);
307 resultFuture.set(Boolean.TRUE);
308 LOG.debug("Stats collection successfully finished for node {}", getDeviceInfo().getLOGValue());
312 final MultipartType nextType = iterator.next();
313 LOG.debug("Stats iterating to next type for node {} of type {}", getDeviceInfo().getLOGValue(), nextType);
315 final ListenableFuture<Boolean> deviceStatisticsCollectionFuture = chooseStat(nextType, initial);
316 Futures.addCallback(deviceStatisticsCollectionFuture, new FutureCallback<Boolean>() {
318 public void onSuccess(final Boolean result) {
319 statChainFuture(iterator, resultFuture, initial);
322 public void onFailure(@Nonnull final Throwable t) {
323 resultFuture.setException(t);
329 * Method checks a device state. It returns null for be able continue. Otherwise it returns immediateFuture
330 * which has to be returned from caller too
335 ListenableFuture<Boolean> deviceConnectionCheck() {
336 if (!ConnectionContext.CONNECTION_STATE.WORKING.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) {
337 ListenableFuture<Boolean> resultingFuture;
338 switch (deviceContext.getPrimaryConnectionContext().getConnectionState()) {
340 final String errMsg = String.format("Device connection doesn't exist anymore. Primary connection status : %s",
341 deviceContext.getPrimaryConnectionContext().getConnectionState());
342 resultingFuture = Futures.immediateFailedFuture(new Throwable(errMsg));
345 resultingFuture = Futures.immediateCheckedFuture(Boolean.TRUE);
348 return resultingFuture;
353 //TODO: Refactor twice sending deviceContext into gatheringStatistics
354 private ListenableFuture<Boolean> collectFlowStatistics(final MultipartType multipartType, final boolean initial) {
355 return devState.isFlowStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
356 statisticsGatheringOnTheFlyService,
358 /*MultipartType.OFPMPFLOW*/ multipartType,
363 statisticsWriterProvider) : emptyFuture;
366 private ListenableFuture<Boolean> collectTableStatistics(final MultipartType multipartType) {
367 return devState.isTableStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
368 statisticsGatheringService,
370 /*MultipartType.OFPMPTABLE*/ multipartType,
375 statisticsWriterProvider) : emptyFuture;
378 private ListenableFuture<Boolean> collectPortStatistics(final MultipartType multipartType) {
379 return devState.isPortStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
380 statisticsGatheringService,
382 /*MultipartType.OFPMPPORTSTATS*/ multipartType,
387 statisticsWriterProvider) : emptyFuture;
390 private ListenableFuture<Boolean> collectQueueStatistics(final MultipartType multipartType) {
391 return !devState.isQueueStatisticsAvailable() ? emptyFuture : StatisticsGatheringUtils.gatherStatistics(
392 statisticsGatheringService,
394 /*MultipartType.OFPMPQUEUE*/ multipartType,
399 statisticsWriterProvider);
402 private ListenableFuture<Boolean> collectGroupDescStatistics(final MultipartType multipartType) {
403 return devState.isGroupAvailable() ? StatisticsGatheringUtils.gatherStatistics(
404 statisticsGatheringService,
406 /*MultipartType.OFPMPGROUPDESC*/ multipartType,
411 statisticsWriterProvider) : emptyFuture;
414 private ListenableFuture<Boolean> collectGroupStatistics(final MultipartType multipartType) {
415 return devState.isGroupAvailable() ? StatisticsGatheringUtils.gatherStatistics(
416 statisticsGatheringService,
418 /*MultipartType.OFPMPGROUP*/ multipartType,
423 statisticsWriterProvider) : emptyFuture;
426 private ListenableFuture<Boolean> collectMeterConfigStatistics(final MultipartType multipartType) {
427 return devState.isMetersAvailable() ? StatisticsGatheringUtils.gatherStatistics(
428 statisticsGatheringService,
430 /*MultipartType.OFPMPMETERCONFIG*/ multipartType,
435 statisticsWriterProvider) : emptyFuture;
438 private ListenableFuture<Boolean> collectMeterStatistics(final MultipartType multipartType) {
439 return devState.isMetersAvailable() ? StatisticsGatheringUtils.gatherStatistics(
440 statisticsGatheringService,
442 /*MultipartType.OFPMPMETER*/ multipartType,
447 statisticsWriterProvider) : emptyFuture;
451 void setStatisticsGatheringService(final StatisticsGatheringService<T> statisticsGatheringService) {
452 this.statisticsGatheringService = statisticsGatheringService;
456 void setStatisticsGatheringOnTheFlyService(final StatisticsGatheringOnTheFlyService<T> statisticsGatheringOnTheFlyService) {
457 this.statisticsGatheringOnTheFlyService = statisticsGatheringOnTheFlyService;
461 public ItemLifecycleListener getItemLifeCycleListener () {
462 return itemLifeCycleListener;
466 public CONTEXT_STATE getState() {
471 public ServiceGroupIdentifier getServiceIdentifier() {
472 return this.deviceInfo.getServiceIdentifier();
476 public DeviceInfo getDeviceInfo() {
477 return this.deviceInfo;
481 public ListenableFuture<Void> stopClusterServices() {
482 if (CONTEXT_STATE.TERMINATION.equals(this.state)) {
483 return Futures.immediateCancelledFuture();
486 return Futures.transform(Futures.immediateFuture(null), new Function<Object, Void>() {
489 public Void apply(@Nullable Object input) {
490 schedulingEnabled = false;
498 public DeviceState gainDeviceState() {
499 return gainDeviceContext().getDeviceState();
503 public DeviceContext gainDeviceContext() {
504 return this.deviceContext;
508 public void stopGatheringData() {
509 if (Objects.nonNull(this.lastDataGathering)) {
510 if (LOG.isDebugEnabled()) {
511 LOG.debug("Stop the running statistics gathering for node {}", this.deviceInfo.getLOGValue());
514 lastDataGathering.cancel(true);
519 public void setLifecycleInitializationPhaseHandler(final ClusterInitializationPhaseHandler handler) {
520 this.clusterInitializationPhaseHandler = handler;
524 public boolean onContextInstantiateService(final MastershipChangeListener mastershipChangeListener) {
526 LOG.info("Starting statistics context cluster services for node {}", deviceInfo.getLOGValue());
528 this.statListForCollectingInitialization();
529 Futures.addCallback(this.initialGatherDynamicData(), new FutureCallback<Boolean>() {
532 public void onSuccess(@Nullable Boolean aBoolean) {
533 mastershipChangeListener.onMasterRoleAcquired(
535 ContextChainMastershipState.INITIAL_GATHERING
537 if (initialSubmitHandler.initialSubmitTransaction()) {
538 mastershipChangeListener.onMasterRoleAcquired(
540 ContextChainMastershipState.INITIAL_SUBMIT
542 if (isStatisticsPollingOn) {
543 myManager.startScheduling(deviceInfo);
546 mastershipChangeListener.onNotAbleToStartMastership(
548 "Initial transaction cannot be submitted."
554 public void onFailure(@Nonnull Throwable throwable) {
555 mastershipChangeListener.onNotAbleToStartMastership(
557 "Initial gathering statistics unsuccessful."
562 return this.clusterInitializationPhaseHandler.onContextInstantiateService(mastershipChangeListener);
566 public void setInitialSubmitHandler(final ClusterInitializationPhaseHandler initialSubmitHandler) {
567 this.initialSubmitHandler = initialSubmitHandler;