2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.impl.statistics;
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.collect.ImmutableList;
14 import com.google.common.collect.Iterators;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.SettableFuture;
19 import io.netty.util.Timeout;
20 import java.util.ArrayList;
21 import java.util.Collection;
22 import java.util.HashSet;
23 import java.util.Iterator;
24 import java.util.List;
25 import java.util.Objects;
26 import java.util.Optional;
27 import javax.annotation.Nonnull;
28 import javax.annotation.Nullable;
29 import javax.annotation.concurrent.GuardedBy;
30 import org.opendaylight.mdsal.common.api.TransactionChainClosedException;
31 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
32 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
33 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
34 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
35 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
36 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
37 import org.opendaylight.openflowplugin.api.openflow.device.handlers.ClusterInitializationPhaseHandler;
38 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
39 import org.opendaylight.openflowplugin.api.openflow.rpc.listener.ItemLifecycleListener;
40 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
41 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
42 import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
43 import org.opendaylight.openflowplugin.impl.rpc.listener.ItemLifecycleListenerImpl;
44 import org.opendaylight.openflowplugin.impl.services.RequestContextUtil;
45 import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringOnTheFlyService;
46 import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringService;
47 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
52 class StatisticsContextImpl implements StatisticsContext {
54 private static final Logger LOG = LoggerFactory.getLogger(StatisticsContextImpl.class);
55 private static final String CONNECTION_CLOSED = "Connection closed.";
57 private final ItemLifecycleListener itemLifeCycleListener;
58 private final Collection<RequestContext<?>> requestContexts = new HashSet<>();
59 private final DeviceContext deviceContext;
60 private final DeviceState devState;
61 private final ListenableFuture<Boolean> emptyFuture;
62 private final boolean shuttingDownStatisticsPolling;
63 private final SinglePurposeMultipartReplyTranslator multipartReplyTranslator;
64 private final Object collectionStatTypeLock = new Object();
65 @GuardedBy("collectionStatTypeLock")
66 private List<MultipartType> collectingStatType;
68 private StatisticsGatheringService statisticsGatheringService;
69 private StatisticsGatheringOnTheFlyService statisticsGatheringOnTheFlyService;
70 private Timeout pollTimeout;
71 private final DeviceInfo deviceInfo;
72 private final StatisticsManager myManager;
73 private final LifecycleService lifecycleService;
75 private volatile boolean schedulingEnabled;
76 private volatile CONTEXT_STATE state;
77 private ClusterInitializationPhaseHandler clusterInitializationPhaseHandler;
78 private ClusterInitializationPhaseHandler initialSubmitHandler;
80 private ListenableFuture<Boolean> lastDataGathering;
82 StatisticsContextImpl(@Nonnull final DeviceInfo deviceInfo,
83 final boolean shuttingDownStatisticsPolling,
84 @Nonnull final LifecycleService lifecycleService,
85 @Nonnull final ConvertorExecutor convertorExecutor,
86 @Nonnull final StatisticsManager myManager) {
87 this.lifecycleService = lifecycleService;
88 this.deviceContext = lifecycleService.getDeviceContext();
89 this.devState = Preconditions.checkNotNull(deviceContext.getDeviceState());
90 this.shuttingDownStatisticsPolling = shuttingDownStatisticsPolling;
91 multipartReplyTranslator = new SinglePurposeMultipartReplyTranslator(convertorExecutor);
92 emptyFuture = Futures.immediateFuture(false);
93 statisticsGatheringService = new StatisticsGatheringService(this, deviceContext);
94 statisticsGatheringOnTheFlyService = new StatisticsGatheringOnTheFlyService(this, deviceContext, convertorExecutor);
95 itemLifeCycleListener = new ItemLifecycleListenerImpl(deviceContext);
96 statListForCollectingInitialization();
97 setState(CONTEXT_STATE.INITIALIZATION);
98 this.deviceInfo = deviceInfo;
99 this.myManager = myManager;
100 this.lastDataGathering = null;
104 public void statListForCollectingInitialization() {
105 synchronized (collectionStatTypeLock) {
106 final List<MultipartType> statListForCollecting = new ArrayList<>();
107 if (devState.isTableStatisticsAvailable()) {
108 statListForCollecting.add(MultipartType.OFPMPTABLE);
110 if (devState.isFlowStatisticsAvailable()) {
111 statListForCollecting.add(MultipartType.OFPMPFLOW);
113 if (devState.isGroupAvailable()) {
114 statListForCollecting.add(MultipartType.OFPMPGROUPDESC);
115 statListForCollecting.add(MultipartType.OFPMPGROUP);
117 if (devState.isMetersAvailable()) {
118 statListForCollecting.add(MultipartType.OFPMPMETERCONFIG);
119 statListForCollecting.add(MultipartType.OFPMPMETER);
121 if (devState.isPortStatisticsAvailable()) {
122 statListForCollecting.add(MultipartType.OFPMPPORTSTATS);
124 if (devState.isQueueStatisticsAvailable()) {
125 statListForCollecting.add(MultipartType.OFPMPQUEUE);
127 collectingStatType = ImmutableList.<MultipartType>copyOf(statListForCollecting);
133 public ListenableFuture<Boolean> initialGatherDynamicData() {
134 return gatherDynamicData(true);
138 public ListenableFuture<Boolean> gatherDynamicData(){
139 return gatherDynamicData(false);
142 private ListenableFuture<Boolean> gatherDynamicData(final boolean initial) {
143 this.lastDataGathering = null;
144 if (shuttingDownStatisticsPolling) {
145 LOG.debug("Statistics for device {} is not enabled.", getDeviceInfo().getNodeId().getValue());
146 return Futures.immediateFuture(Boolean.TRUE);
148 final ListenableFuture<Boolean> errorResultFuture = deviceConnectionCheck();
149 if (errorResultFuture != null) {
150 return errorResultFuture;
152 synchronized (collectionStatTypeLock) {
153 final Iterator<MultipartType> statIterator = collectingStatType.iterator();
154 final SettableFuture<Boolean> settableStatResultFuture = SettableFuture.create();
156 // write start timestamp to state snapshot container
157 StatisticsGatheringUtils.markDeviceStateSnapshotStart(deviceContext);
159 statChainFuture(statIterator, settableStatResultFuture, initial);
161 // write end timestamp to state snapshot container
162 Futures.addCallback(settableStatResultFuture, new FutureCallback<Boolean>() {
164 public void onSuccess(@Nullable final Boolean result) {
165 StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceContext, true);
168 public void onFailure(final Throwable t) {
169 if (!(t instanceof TransactionChainClosedException)) {
170 StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceContext, false);
174 this.lastDataGathering = settableStatResultFuture;
175 return settableStatResultFuture;
179 private ListenableFuture<Boolean> chooseStat(final MultipartType multipartType, final boolean initial){
180 ListenableFuture<Boolean> result = Futures.immediateCheckedFuture(Boolean.TRUE);
182 switch (multipartType) {
184 result = collectFlowStatistics(multipartType, initial);
187 result = collectTableStatistics(multipartType);
190 result = collectPortStatistics(multipartType);
193 result = collectQueueStatistics(multipartType);
196 result = collectGroupDescStatistics(multipartType);
199 result = collectGroupStatistics(multipartType);
201 case OFPMPMETERCONFIG:
202 result = collectMeterConfigStatistics(multipartType);
205 result = collectMeterStatistics(multipartType);
208 LOG.warn("Unsupported Statistics type {}", multipartType);
216 public <T> RequestContext<T> createRequestContext() {
217 final AbstractRequestContext<T> ret = new AbstractRequestContext<T>(deviceInfo.reserveXidForDeviceMessage()) {
219 public void close() {
220 requestContexts.remove(this);
223 requestContexts.add(ret);
228 public void close() {
229 if (CONTEXT_STATE.TERMINATION.equals(getState())) {
230 if (LOG.isDebugEnabled()) {
231 LOG.debug("Statistics context is already in state TERMINATION.");
235 setState(CONTEXT_STATE.TERMINATION);
236 schedulingEnabled = false;
237 for (final Iterator<RequestContext<?>> iterator = Iterators.consumingIterator(requestContexts.iterator());
238 iterator.hasNext(); ) {
239 RequestContextUtil.closeRequestContextWithRpcError(iterator.next(), CONNECTION_CLOSED);
241 if (null != pollTimeout && !pollTimeout.isExpired()) {
242 pollTimeout.cancel();
248 public void setSchedulingEnabled(final boolean schedulingEnabled) {
249 this.schedulingEnabled = schedulingEnabled;
253 public boolean isSchedulingEnabled() {
254 return schedulingEnabled;
258 public void setPollTimeout(final Timeout pollTimeout) {
259 this.pollTimeout = pollTimeout;
263 public Optional<Timeout> getPollTimeout() {
264 return Optional.ofNullable(pollTimeout);
267 private void statChainFuture(final Iterator<MultipartType> iterator, final SettableFuture<Boolean> resultFuture, final boolean initial) {
268 if (ConnectionContext.CONNECTION_STATE.RIP.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) {
269 final String errMsg = String.format("Device connection is closed for Node : %s.",
270 getDeviceInfo().getNodeId());
272 resultFuture.setException(new IllegalStateException(errMsg));
275 if ( ! iterator.hasNext()) {
276 resultFuture.set(Boolean.TRUE);
277 LOG.debug("Stats collection successfully finished for node {}", getDeviceInfo().getLOGValue());
281 final MultipartType nextType = iterator.next();
282 LOG.debug("Stats iterating to next type for node {} of type {}", getDeviceInfo().getLOGValue(), nextType);
284 final ListenableFuture<Boolean> deviceStatisticsCollectionFuture = chooseStat(nextType, initial);
285 Futures.addCallback(deviceStatisticsCollectionFuture, new FutureCallback<Boolean>() {
287 public void onSuccess(final Boolean result) {
288 statChainFuture(iterator, resultFuture, initial);
291 public void onFailure(@Nonnull final Throwable t) {
292 resultFuture.setException(t);
298 * Method checks a device state. It returns null for be able continue. Otherwise it returns immediateFuture
299 * which has to be returned from caller too
304 ListenableFuture<Boolean> deviceConnectionCheck() {
305 if (!ConnectionContext.CONNECTION_STATE.WORKING.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) {
306 ListenableFuture<Boolean> resultingFuture;
307 switch (deviceContext.getPrimaryConnectionContext().getConnectionState()) {
309 final String errMsg = String.format("Device connection doesn't exist anymore. Primary connection status : %s",
310 deviceContext.getPrimaryConnectionContext().getConnectionState());
311 resultingFuture = Futures.immediateFailedFuture(new Throwable(errMsg));
314 resultingFuture = Futures.immediateCheckedFuture(Boolean.TRUE);
317 return resultingFuture;
322 //TODO: Refactor twice sending deviceContext into gatheringStatistics
323 private ListenableFuture<Boolean> collectFlowStatistics(final MultipartType multipartType, final boolean initial) {
324 return devState.isFlowStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
325 statisticsGatheringOnTheFlyService,
327 /*MultipartType.OFPMPFLOW*/ multipartType,
330 initial, multipartReplyTranslator) : emptyFuture;
333 private ListenableFuture<Boolean> collectTableStatistics(final MultipartType multipartType) {
334 return devState.isTableStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
335 statisticsGatheringService,
337 /*MultipartType.OFPMPTABLE*/ multipartType,
340 false, multipartReplyTranslator) : emptyFuture;
343 private ListenableFuture<Boolean> collectPortStatistics(final MultipartType multipartType) {
344 return devState.isPortStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
345 statisticsGatheringService,
347 /*MultipartType.OFPMPPORTSTATS*/ multipartType,
350 false, multipartReplyTranslator) : emptyFuture;
353 private ListenableFuture<Boolean> collectQueueStatistics(final MultipartType multipartType) {
354 return !devState.isQueueStatisticsAvailable() ? emptyFuture : StatisticsGatheringUtils.gatherStatistics(
355 statisticsGatheringService,
357 /*MultipartType.OFPMPQUEUE*/ multipartType,
360 false, multipartReplyTranslator);
363 private ListenableFuture<Boolean> collectGroupDescStatistics(final MultipartType multipartType) {
364 return devState.isGroupAvailable() ? StatisticsGatheringUtils.gatherStatistics(
365 statisticsGatheringService,
367 /*MultipartType.OFPMPGROUPDESC*/ multipartType,
370 false, multipartReplyTranslator) : emptyFuture;
373 private ListenableFuture<Boolean> collectGroupStatistics(final MultipartType multipartType) {
374 return devState.isGroupAvailable() ? StatisticsGatheringUtils.gatherStatistics(
375 statisticsGatheringService,
377 /*MultipartType.OFPMPGROUP*/ multipartType,
380 false, multipartReplyTranslator) : emptyFuture;
383 private ListenableFuture<Boolean> collectMeterConfigStatistics(final MultipartType multipartType) {
384 return devState.isMetersAvailable() ? StatisticsGatheringUtils.gatherStatistics(
385 statisticsGatheringService,
387 /*MultipartType.OFPMPMETERCONFIG*/ multipartType,
390 false, multipartReplyTranslator) : emptyFuture;
393 private ListenableFuture<Boolean> collectMeterStatistics(final MultipartType multipartType) {
394 return devState.isMetersAvailable() ? StatisticsGatheringUtils.gatherStatistics(
395 statisticsGatheringService,
397 /*MultipartType.OFPMPMETER*/ multipartType,
400 false, multipartReplyTranslator) : emptyFuture;
404 void setStatisticsGatheringService(final StatisticsGatheringService statisticsGatheringService) {
405 this.statisticsGatheringService = statisticsGatheringService;
409 void setStatisticsGatheringOnTheFlyService(final StatisticsGatheringOnTheFlyService
410 statisticsGatheringOnTheFlyService) {
411 this.statisticsGatheringOnTheFlyService = statisticsGatheringOnTheFlyService;
415 public ItemLifecycleListener getItemLifeCycleListener () {
416 return itemLifeCycleListener;
420 public CONTEXT_STATE getState() {
425 public void setState(CONTEXT_STATE state) {
430 public ServiceGroupIdentifier getServiceIdentifier() {
431 return this.deviceInfo.getServiceIdentifier();
435 public DeviceInfo getDeviceInfo() {
436 return this.deviceInfo;
440 public ListenableFuture<Void> stopClusterServices(boolean deviceDisconnected) {
442 myManager.stopScheduling(deviceInfo);
443 return Futures.immediateFuture(null);
447 public DeviceState gainDeviceState() {
448 return gainDeviceContext().getDeviceState();
452 public DeviceContext gainDeviceContext() {
453 return this.lifecycleService.getDeviceContext();
457 public void stopGatheringData() {
458 if (Objects.nonNull(this.lastDataGathering)) {
459 if (LOG.isDebugEnabled()) {
460 LOG.debug("Stop the running statistics gathering for node {}", this.deviceInfo.getLOGValue());
462 this.lastDataGathering.cancel(true);
467 public void setLifecycleInitializationPhaseHandler(final ClusterInitializationPhaseHandler handler) {
468 this.clusterInitializationPhaseHandler = handler;
472 public boolean onContextInstantiateService(final ConnectionContext connectionContext) {
474 if (connectionContext.getConnectionState().equals(ConnectionContext.CONNECTION_STATE.RIP)) {
475 LOG.warn("Connection on device {} was interrupted, will stop starting master services.", deviceInfo.getLOGValue());
479 LOG.info("Starting statistics context cluster services for node {}", deviceInfo.getLOGValue());
481 this.statListForCollectingInitialization();
482 Futures.addCallback(this.initialGatherDynamicData(), new FutureCallback<Boolean>() {
485 public void onSuccess(@Nullable Boolean aBoolean) {
486 initialSubmitHandler.initialSubmitTransaction();
490 public void onFailure(Throwable throwable) {
491 LOG.warn("Initial gathering statistics unsuccessful for node {}", deviceInfo.getLOGValue());
492 lifecycleService.closeConnection();
496 if (!this.shuttingDownStatisticsPolling) {
497 myManager.startScheduling(deviceInfo);
500 return this.clusterInitializationPhaseHandler.onContextInstantiateService(connectionContext);
504 public void setInitialSubmitHandler(final ClusterInitializationPhaseHandler initialSubmitHandler) {
505 this.initialSubmitHandler = initialSubmitHandler;