2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.impl.statistics;
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Function;
13 import com.google.common.base.Preconditions;
14 import com.google.common.collect.ImmutableList;
15 import com.google.common.collect.Iterators;
16 import com.google.common.util.concurrent.FutureCallback;
17 import com.google.common.util.concurrent.Futures;
18 import com.google.common.util.concurrent.ListenableFuture;
19 import com.google.common.util.concurrent.SettableFuture;
20 import io.netty.util.Timeout;
21 import java.util.ArrayList;
22 import java.util.Collection;
23 import java.util.HashSet;
24 import java.util.Iterator;
25 import java.util.List;
26 import java.util.Objects;
27 import java.util.Optional;
28 import javax.annotation.Nonnull;
29 import javax.annotation.Nullable;
30 import javax.annotation.concurrent.GuardedBy;
31 import org.opendaylight.mdsal.common.api.TransactionChainClosedException;
32 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
33 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
34 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
35 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
36 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
37 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
38 import org.opendaylight.openflowplugin.api.openflow.device.handlers.ClusterInitializationPhaseHandler;
39 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
40 import org.opendaylight.openflowplugin.api.openflow.rpc.listener.ItemLifecycleListener;
41 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
42 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
43 import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
44 import org.opendaylight.openflowplugin.impl.rpc.listener.ItemLifecycleListenerImpl;
45 import org.opendaylight.openflowplugin.impl.services.RequestContextUtil;
46 import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringOnTheFlyService;
47 import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringService;
48 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
53 class StatisticsContextImpl implements StatisticsContext {
55 private static final Logger LOG = LoggerFactory.getLogger(StatisticsContextImpl.class);
56 private static final String CONNECTION_CLOSED = "Connection closed.";
58 private final ItemLifecycleListener itemLifeCycleListener;
59 private final Collection<RequestContext<?>> requestContexts = new HashSet<>();
60 private final DeviceContext deviceContext;
61 private final DeviceState devState;
62 private final ListenableFuture<Boolean> emptyFuture;
63 private final boolean isStatisticsPollingOn;
64 private final SinglePurposeMultipartReplyTranslator multipartReplyTranslator;
65 private final Object collectionStatTypeLock = new Object();
66 @GuardedBy("collectionStatTypeLock")
67 private List<MultipartType> collectingStatType;
69 private StatisticsGatheringService statisticsGatheringService;
70 private StatisticsGatheringOnTheFlyService statisticsGatheringOnTheFlyService;
71 private Timeout pollTimeout;
72 private final DeviceInfo deviceInfo;
73 private final StatisticsManager myManager;
74 private final LifecycleService lifecycleService;
76 private volatile boolean schedulingEnabled;
77 private volatile CONTEXT_STATE state;
78 private ClusterInitializationPhaseHandler clusterInitializationPhaseHandler;
79 private ClusterInitializationPhaseHandler initialSubmitHandler;
81 private ListenableFuture<Boolean> lastDataGathering;
83 StatisticsContextImpl(@Nonnull final DeviceInfo deviceInfo,
84 final boolean isStatisticsPollingOn,
85 @Nonnull final LifecycleService lifecycleService,
86 @Nonnull final ConvertorExecutor convertorExecutor,
87 @Nonnull final StatisticsManager myManager) {
88 this.lifecycleService = lifecycleService;
89 this.deviceContext = lifecycleService.getDeviceContext();
90 this.devState = Preconditions.checkNotNull(deviceContext.getDeviceState());
91 this.isStatisticsPollingOn = isStatisticsPollingOn;
92 multipartReplyTranslator = new SinglePurposeMultipartReplyTranslator(convertorExecutor);
93 emptyFuture = Futures.immediateFuture(false);
94 statisticsGatheringService = new StatisticsGatheringService(this, deviceContext);
95 statisticsGatheringOnTheFlyService = new StatisticsGatheringOnTheFlyService(this, deviceContext, convertorExecutor);
96 itemLifeCycleListener = new ItemLifecycleListenerImpl(deviceContext);
97 statListForCollectingInitialization();
98 this.state = CONTEXT_STATE.INITIALIZATION;
99 this.deviceInfo = deviceInfo;
100 this.myManager = myManager;
101 this.lastDataGathering = null;
105 public void statListForCollectingInitialization() {
106 synchronized (collectionStatTypeLock) {
107 final List<MultipartType> statListForCollecting = new ArrayList<>();
108 if (devState.isTableStatisticsAvailable()) {
109 statListForCollecting.add(MultipartType.OFPMPTABLE);
111 if (devState.isFlowStatisticsAvailable()) {
112 statListForCollecting.add(MultipartType.OFPMPFLOW);
114 if (devState.isGroupAvailable()) {
115 statListForCollecting.add(MultipartType.OFPMPGROUPDESC);
116 statListForCollecting.add(MultipartType.OFPMPGROUP);
118 if (devState.isMetersAvailable()) {
119 statListForCollecting.add(MultipartType.OFPMPMETERCONFIG);
120 statListForCollecting.add(MultipartType.OFPMPMETER);
122 if (devState.isPortStatisticsAvailable()) {
123 statListForCollecting.add(MultipartType.OFPMPPORTSTATS);
125 if (devState.isQueueStatisticsAvailable()) {
126 statListForCollecting.add(MultipartType.OFPMPQUEUE);
128 collectingStatType = ImmutableList.<MultipartType>copyOf(statListForCollecting);
134 public ListenableFuture<Boolean> initialGatherDynamicData() {
135 return gatherDynamicData(true);
139 public ListenableFuture<Boolean> gatherDynamicData(){
140 return gatherDynamicData(false);
143 private ListenableFuture<Boolean> gatherDynamicData(final boolean initial) {
144 this.lastDataGathering = null;
145 if (!isStatisticsPollingOn) {
146 LOG.debug("Statistics for device {} is not enabled.", getDeviceInfo().getNodeId().getValue());
147 return Futures.immediateFuture(Boolean.TRUE);
149 final ListenableFuture<Boolean> errorResultFuture = deviceConnectionCheck();
150 if (errorResultFuture != null) {
151 return errorResultFuture;
153 synchronized (collectionStatTypeLock) {
154 final Iterator<MultipartType> statIterator = collectingStatType.iterator();
155 final SettableFuture<Boolean> settableStatResultFuture = SettableFuture.create();
157 // write start timestamp to state snapshot container
158 StatisticsGatheringUtils.markDeviceStateSnapshotStart(deviceContext);
160 statChainFuture(statIterator, settableStatResultFuture, initial);
162 // write end timestamp to state snapshot container
163 Futures.addCallback(settableStatResultFuture, new FutureCallback<Boolean>() {
165 public void onSuccess(@Nullable final Boolean result) {
166 StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceContext, true);
169 public void onFailure(final Throwable t) {
170 if (!(t instanceof TransactionChainClosedException)) {
171 StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceContext, false);
175 this.lastDataGathering = settableStatResultFuture;
176 return settableStatResultFuture;
180 private ListenableFuture<Boolean> chooseStat(final MultipartType multipartType, final boolean initial){
181 ListenableFuture<Boolean> result = Futures.immediateCheckedFuture(Boolean.TRUE);
183 switch (multipartType) {
185 result = collectFlowStatistics(multipartType, initial);
188 result = collectTableStatistics(multipartType);
191 result = collectPortStatistics(multipartType);
194 result = collectQueueStatistics(multipartType);
197 result = collectGroupDescStatistics(multipartType);
200 result = collectGroupStatistics(multipartType);
202 case OFPMPMETERCONFIG:
203 result = collectMeterConfigStatistics(multipartType);
206 result = collectMeterStatistics(multipartType);
209 LOG.warn("Unsupported Statistics type {}", multipartType);
217 public <T> RequestContext<T> createRequestContext() {
218 final AbstractRequestContext<T> ret = new AbstractRequestContext<T>(deviceInfo.reserveXidForDeviceMessage()) {
220 public void close() {
221 requestContexts.remove(this);
224 requestContexts.add(ret);
229 public void close() {
230 if (CONTEXT_STATE.TERMINATION.equals(getState())) {
231 if (LOG.isDebugEnabled()) {
232 LOG.debug("StatisticsContext for node {} is already in TERMINATION state.", getDeviceInfo().getLOGValue());
236 stopClusterServices(true).get();
237 } catch (Exception e) {
238 LOG.debug("Failed to close StatisticsContext for node {} with exception: ", getDeviceInfo().getLOGValue(), e);
241 this.state = CONTEXT_STATE.TERMINATION;
243 for (final Iterator<RequestContext<?>> iterator = Iterators.consumingIterator(requestContexts.iterator());
244 iterator.hasNext(); ) {
245 RequestContextUtil.closeRequestContextWithRpcError(iterator.next(), CONNECTION_CLOSED);
248 if (null != pollTimeout && !pollTimeout.isExpired()) {
249 pollTimeout.cancel();
255 public void setSchedulingEnabled(final boolean schedulingEnabled) {
256 this.schedulingEnabled = schedulingEnabled;
260 public boolean isSchedulingEnabled() {
261 return schedulingEnabled;
265 public void setPollTimeout(final Timeout pollTimeout) {
266 this.pollTimeout = pollTimeout;
270 public Optional<Timeout> getPollTimeout() {
271 return Optional.ofNullable(pollTimeout);
274 private void statChainFuture(final Iterator<MultipartType> iterator, final SettableFuture<Boolean> resultFuture, final boolean initial) {
275 if (ConnectionContext.CONNECTION_STATE.RIP.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) {
276 final String errMsg = String.format("Device connection is closed for Node : %s.",
277 getDeviceInfo().getNodeId());
279 resultFuture.setException(new IllegalStateException(errMsg));
282 if ( ! iterator.hasNext()) {
283 resultFuture.set(Boolean.TRUE);
284 LOG.debug("Stats collection successfully finished for node {}", getDeviceInfo().getLOGValue());
288 final MultipartType nextType = iterator.next();
289 LOG.debug("Stats iterating to next type for node {} of type {}", getDeviceInfo().getLOGValue(), nextType);
291 final ListenableFuture<Boolean> deviceStatisticsCollectionFuture = chooseStat(nextType, initial);
292 Futures.addCallback(deviceStatisticsCollectionFuture, new FutureCallback<Boolean>() {
294 public void onSuccess(final Boolean result) {
295 statChainFuture(iterator, resultFuture, initial);
298 public void onFailure(@Nonnull final Throwable t) {
299 resultFuture.setException(t);
305 * Method checks a device state. It returns null for be able continue. Otherwise it returns immediateFuture
306 * which has to be returned from caller too
311 ListenableFuture<Boolean> deviceConnectionCheck() {
312 if (!ConnectionContext.CONNECTION_STATE.WORKING.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) {
313 ListenableFuture<Boolean> resultingFuture;
314 switch (deviceContext.getPrimaryConnectionContext().getConnectionState()) {
316 final String errMsg = String.format("Device connection doesn't exist anymore. Primary connection status : %s",
317 deviceContext.getPrimaryConnectionContext().getConnectionState());
318 resultingFuture = Futures.immediateFailedFuture(new Throwable(errMsg));
321 resultingFuture = Futures.immediateCheckedFuture(Boolean.TRUE);
324 return resultingFuture;
329 //TODO: Refactor twice sending deviceContext into gatheringStatistics
330 private ListenableFuture<Boolean> collectFlowStatistics(final MultipartType multipartType, final boolean initial) {
331 return devState.isFlowStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
332 statisticsGatheringOnTheFlyService,
334 /*MultipartType.OFPMPFLOW*/ multipartType,
337 initial, multipartReplyTranslator) : emptyFuture;
340 private ListenableFuture<Boolean> collectTableStatistics(final MultipartType multipartType) {
341 return devState.isTableStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
342 statisticsGatheringService,
344 /*MultipartType.OFPMPTABLE*/ multipartType,
347 false, multipartReplyTranslator) : emptyFuture;
350 private ListenableFuture<Boolean> collectPortStatistics(final MultipartType multipartType) {
351 return devState.isPortStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
352 statisticsGatheringService,
354 /*MultipartType.OFPMPPORTSTATS*/ multipartType,
357 false, multipartReplyTranslator) : emptyFuture;
360 private ListenableFuture<Boolean> collectQueueStatistics(final MultipartType multipartType) {
361 return !devState.isQueueStatisticsAvailable() ? emptyFuture : StatisticsGatheringUtils.gatherStatistics(
362 statisticsGatheringService,
364 /*MultipartType.OFPMPQUEUE*/ multipartType,
367 false, multipartReplyTranslator);
370 private ListenableFuture<Boolean> collectGroupDescStatistics(final MultipartType multipartType) {
371 return devState.isGroupAvailable() ? StatisticsGatheringUtils.gatherStatistics(
372 statisticsGatheringService,
374 /*MultipartType.OFPMPGROUPDESC*/ multipartType,
377 false, multipartReplyTranslator) : emptyFuture;
380 private ListenableFuture<Boolean> collectGroupStatistics(final MultipartType multipartType) {
381 return devState.isGroupAvailable() ? StatisticsGatheringUtils.gatherStatistics(
382 statisticsGatheringService,
384 /*MultipartType.OFPMPGROUP*/ multipartType,
387 false, multipartReplyTranslator) : emptyFuture;
390 private ListenableFuture<Boolean> collectMeterConfigStatistics(final MultipartType multipartType) {
391 return devState.isMetersAvailable() ? StatisticsGatheringUtils.gatherStatistics(
392 statisticsGatheringService,
394 /*MultipartType.OFPMPMETERCONFIG*/ multipartType,
397 false, multipartReplyTranslator) : emptyFuture;
400 private ListenableFuture<Boolean> collectMeterStatistics(final MultipartType multipartType) {
401 return devState.isMetersAvailable() ? StatisticsGatheringUtils.gatherStatistics(
402 statisticsGatheringService,
404 /*MultipartType.OFPMPMETER*/ multipartType,
407 false, multipartReplyTranslator) : emptyFuture;
411 void setStatisticsGatheringService(final StatisticsGatheringService statisticsGatheringService) {
412 this.statisticsGatheringService = statisticsGatheringService;
416 void setStatisticsGatheringOnTheFlyService(final StatisticsGatheringOnTheFlyService
417 statisticsGatheringOnTheFlyService) {
418 this.statisticsGatheringOnTheFlyService = statisticsGatheringOnTheFlyService;
422 public ItemLifecycleListener getItemLifeCycleListener () {
423 return itemLifeCycleListener;
427 public CONTEXT_STATE getState() {
432 public ServiceGroupIdentifier getServiceIdentifier() {
433 return this.deviceInfo.getServiceIdentifier();
437 public DeviceInfo getDeviceInfo() {
438 return this.deviceInfo;
442 public ListenableFuture<Void> stopClusterServices(boolean connectionInterrupted) {
443 if (CONTEXT_STATE.TERMINATION.equals(getState())) {
444 return Futures.immediateCancelledFuture();
447 return Futures.transform(Futures.immediateFuture(null), new Function<Object, Void>() {
450 public Void apply(@Nullable Object input) {
451 schedulingEnabled = false;
459 public DeviceState gainDeviceState() {
460 return gainDeviceContext().getDeviceState();
464 public DeviceContext gainDeviceContext() {
465 return this.lifecycleService.getDeviceContext();
469 public void stopGatheringData() {
470 if (Objects.nonNull(this.lastDataGathering)) {
471 if (LOG.isDebugEnabled()) {
472 LOG.debug("Stop the running statistics gathering for node {}", this.deviceInfo.getLOGValue());
475 lastDataGathering.cancel(true);
480 public void setLifecycleInitializationPhaseHandler(final ClusterInitializationPhaseHandler handler) {
481 this.clusterInitializationPhaseHandler = handler;
485 public boolean onContextInstantiateService(final ConnectionContext connectionContext) {
486 if (connectionContext.getConnectionState().equals(ConnectionContext.CONNECTION_STATE.RIP)) {
487 LOG.warn("Connection on device {} was interrupted, will stop starting master services.", deviceInfo.getLOGValue());
491 LOG.info("Starting statistics context cluster services for node {}", deviceInfo.getLOGValue());
493 this.statListForCollectingInitialization();
494 Futures.addCallback(this.initialGatherDynamicData(), new FutureCallback<Boolean>() {
497 public void onSuccess(@Nullable Boolean aBoolean) {
498 initialSubmitHandler.initialSubmitTransaction();
502 public void onFailure(Throwable throwable) {
503 LOG.warn("Initial gathering statistics unsuccessful for node {}", deviceInfo.getLOGValue());
504 lifecycleService.closeConnection();
508 if (this.isStatisticsPollingOn) {
509 myManager.startScheduling(deviceInfo);
512 return this.clusterInitializationPhaseHandler.onContextInstantiateService(connectionContext);
516 public void setInitialSubmitHandler(final ClusterInitializationPhaseHandler initialSubmitHandler) {
517 this.initialSubmitHandler = initialSubmitHandler;