2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.impl.statistics;
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.collect.ImmutableList;
14 import com.google.common.collect.Iterators;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.SettableFuture;
19 import io.netty.util.Timeout;
20 import java.util.ArrayList;
21 import java.util.Collection;
22 import java.util.HashSet;
23 import java.util.Iterator;
24 import java.util.List;
25 import java.util.Optional;
26 import javax.annotation.Nonnull;
27 import javax.annotation.Nullable;
28 import javax.annotation.concurrent.GuardedBy;
29 import org.opendaylight.mdsal.common.api.TransactionChainClosedException;
30 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
31 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
32 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
33 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
34 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
35 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
36 import org.opendaylight.openflowplugin.api.openflow.device.handlers.ClusterInitializationPhaseHandler;
37 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
38 import org.opendaylight.openflowplugin.api.openflow.rpc.listener.ItemLifecycleListener;
39 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
40 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
41 import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
42 import org.opendaylight.openflowplugin.impl.rpc.listener.ItemLifecycleListenerImpl;
43 import org.opendaylight.openflowplugin.impl.services.RequestContextUtil;
44 import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringOnTheFlyService;
45 import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringService;
46 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
51 class StatisticsContextImpl implements StatisticsContext {
53 private static final Logger LOG = LoggerFactory.getLogger(StatisticsContextImpl.class);
54 private static final String CONNECTION_CLOSED = "Connection closed.";
56 private final ItemLifecycleListener itemLifeCycleListener;
57 private final Collection<RequestContext<?>> requestContexts = new HashSet<>();
58 private final DeviceContext deviceContext;
59 private final DeviceState devState;
60 private final ListenableFuture<Boolean> emptyFuture;
61 private final boolean shuttingDownStatisticsPolling;
62 private final Object COLLECTION_STAT_TYPE_LOCK = new Object();
63 private final SinglePurposeMultipartReplyTranslator multipartReplyTranslator;
64 @GuardedBy("COLLECTION_STAT_TYPE_LOCK")
65 private List<MultipartType> collectingStatType;
67 private StatisticsGatheringService statisticsGatheringService;
68 private StatisticsGatheringOnTheFlyService statisticsGatheringOnTheFlyService;
69 private Timeout pollTimeout;
70 private final DeviceInfo deviceInfo;
71 private final StatisticsManager myManager;
72 private final LifecycleService lifecycleService;
74 private volatile boolean schedulingEnabled;
75 private volatile CONTEXT_STATE state;
76 private ClusterInitializationPhaseHandler clusterInitializationPhaseHandler;
77 private ClusterInitializationPhaseHandler initialSubmitHandler;
79 StatisticsContextImpl(@Nonnull final DeviceInfo deviceInfo,
80 final boolean shuttingDownStatisticsPolling,
81 @Nonnull final LifecycleService lifecycleService,
82 @Nonnull final ConvertorExecutor convertorExecutor,
83 @Nonnull final StatisticsManager myManager) {
84 this.lifecycleService = lifecycleService;
85 this.deviceContext = lifecycleService.getDeviceContext();
86 this.devState = Preconditions.checkNotNull(deviceContext.getDeviceState());
87 this.shuttingDownStatisticsPolling = shuttingDownStatisticsPolling;
88 multipartReplyTranslator = new SinglePurposeMultipartReplyTranslator(convertorExecutor);
89 emptyFuture = Futures.immediateFuture(false);
90 statisticsGatheringService = new StatisticsGatheringService(this, deviceContext);
91 statisticsGatheringOnTheFlyService = new StatisticsGatheringOnTheFlyService(this, deviceContext, convertorExecutor);
92 itemLifeCycleListener = new ItemLifecycleListenerImpl(deviceContext);
93 statListForCollectingInitialization();
94 setState(CONTEXT_STATE.INITIALIZATION);
95 this.deviceInfo = deviceInfo;
96 this.myManager = myManager;
100 public void statListForCollectingInitialization() {
101 synchronized (COLLECTION_STAT_TYPE_LOCK) {
102 final List<MultipartType> statListForCollecting = new ArrayList<>();
103 if (devState.isTableStatisticsAvailable()) {
104 statListForCollecting.add(MultipartType.OFPMPTABLE);
106 if (devState.isFlowStatisticsAvailable()) {
107 statListForCollecting.add(MultipartType.OFPMPFLOW);
109 if (devState.isGroupAvailable()) {
110 statListForCollecting.add(MultipartType.OFPMPGROUPDESC);
111 statListForCollecting.add(MultipartType.OFPMPGROUP);
113 if (devState.isMetersAvailable()) {
114 statListForCollecting.add(MultipartType.OFPMPMETERCONFIG);
115 statListForCollecting.add(MultipartType.OFPMPMETER);
117 if (devState.isPortStatisticsAvailable()) {
118 statListForCollecting.add(MultipartType.OFPMPPORTSTATS);
120 if (devState.isQueueStatisticsAvailable()) {
121 statListForCollecting.add(MultipartType.OFPMPQUEUE);
123 collectingStatType = ImmutableList.<MultipartType>copyOf(statListForCollecting);
129 public ListenableFuture<Boolean> initialGatherDynamicData() {
130 return gatherDynamicData(true);
134 public ListenableFuture<Boolean> gatherDynamicData(){
135 return gatherDynamicData(false);
138 private ListenableFuture<Boolean> gatherDynamicData(final boolean initial) {
139 if (shuttingDownStatisticsPolling) {
140 LOG.debug("Statistics for device {} is not enabled.", getDeviceInfo().getNodeId().getValue());
141 return Futures.immediateFuture(Boolean.TRUE);
143 final ListenableFuture<Boolean> errorResultFuture = deviceConnectionCheck();
144 if (errorResultFuture != null) {
145 return errorResultFuture;
147 synchronized (COLLECTION_STAT_TYPE_LOCK) {
148 final Iterator<MultipartType> statIterator = collectingStatType.iterator();
149 final SettableFuture<Boolean> settableStatResultFuture = SettableFuture.create();
151 // write start timestamp to state snapshot container
152 StatisticsGatheringUtils.markDeviceStateSnapshotStart(deviceContext);
154 statChainFuture(statIterator, settableStatResultFuture, initial);
156 // write end timestamp to state snapshot container
157 Futures.addCallback(settableStatResultFuture, new FutureCallback<Boolean>() {
159 public void onSuccess(@Nullable final Boolean result) {
160 StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceContext, true);
163 public void onFailure(final Throwable t) {
164 if (!(t instanceof TransactionChainClosedException)) {
165 StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceContext, false);
169 return settableStatResultFuture;
173 private ListenableFuture<Boolean> chooseStat(final MultipartType multipartType, final boolean initial){
174 switch (multipartType) {
176 return collectFlowStatistics(multipartType, initial);
178 return collectTableStatistics(multipartType);
180 return collectPortStatistics(multipartType);
182 return collectQueueStatistics(multipartType);
184 return collectGroupDescStatistics(multipartType);
186 return collectGroupStatistics(multipartType);
187 case OFPMPMETERCONFIG:
188 return collectMeterConfigStatistics(multipartType);
190 return collectMeterStatistics(multipartType);
192 LOG.warn("Unsuported Statistics type {}", multipartType);
193 return Futures.immediateCheckedFuture(Boolean.TRUE);
199 public <T> RequestContext<T> createRequestContext() {
200 final AbstractRequestContext<T> ret = new AbstractRequestContext<T>(deviceInfo.reserveXidForDeviceMessage()) {
202 public void close() {
203 requestContexts.remove(this);
206 requestContexts.add(ret);
211 public void close() {
212 if (CONTEXT_STATE.TERMINATION.equals(getState())) {
213 if (LOG.isDebugEnabled()) {
214 LOG.debug("Statistics context is already in state TERMINATION.");
217 setState(CONTEXT_STATE.TERMINATION);
218 schedulingEnabled = false;
219 for (final Iterator<RequestContext<?>> iterator = Iterators.consumingIterator(requestContexts.iterator());
220 iterator.hasNext(); ) {
221 RequestContextUtil.closeRequestContextWithRpcError(iterator.next(), CONNECTION_CLOSED);
223 if (null != pollTimeout && !pollTimeout.isExpired()) {
224 pollTimeout.cancel();
230 public void setSchedulingEnabled(final boolean schedulingEnabled) {
231 this.schedulingEnabled = schedulingEnabled;
235 public boolean isSchedulingEnabled() {
236 return schedulingEnabled;
240 public void setPollTimeout(final Timeout pollTimeout) {
241 this.pollTimeout = pollTimeout;
245 public Optional<Timeout> getPollTimeout() {
246 return Optional.ofNullable(pollTimeout);
249 private void statChainFuture(final Iterator<MultipartType> iterator, final SettableFuture<Boolean> resultFuture, final boolean initial) {
250 if (ConnectionContext.CONNECTION_STATE.RIP.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) {
251 final String errMsg = String.format("Device connection is closed for Node : %s.",
252 getDeviceInfo().getNodeId());
254 resultFuture.setException(new IllegalStateException(errMsg));
257 if ( ! iterator.hasNext()) {
258 resultFuture.set(Boolean.TRUE);
259 LOG.debug("Stats collection successfully finished for node {}", getDeviceInfo().getNodeId());
263 final MultipartType nextType = iterator.next();
264 LOG.debug("Stats iterating to next type for node {} of type {}", getDeviceInfo().getNodeId(), nextType);
266 final ListenableFuture<Boolean> deviceStatisticsCollectionFuture = chooseStat(nextType, initial);
267 Futures.addCallback(deviceStatisticsCollectionFuture, new FutureCallback<Boolean>() {
269 public void onSuccess(final Boolean result) {
270 statChainFuture(iterator, resultFuture, initial);
273 public void onFailure(@Nonnull final Throwable t) {
274 resultFuture.setException(t);
280 * Method checks a device state. It returns null for be able continue. Otherwise it returns immediateFuture
281 * which has to be returned from caller too
286 ListenableFuture<Boolean> deviceConnectionCheck() {
287 if (!ConnectionContext.CONNECTION_STATE.WORKING.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) {
288 ListenableFuture<Boolean> resultingFuture;
289 switch (deviceContext.getPrimaryConnectionContext().getConnectionState()) {
291 final String errMsg = String.format("Device connection doesn't exist anymore. Primary connection status : %s",
292 deviceContext.getPrimaryConnectionContext().getConnectionState());
293 resultingFuture = Futures.immediateFailedFuture(new Throwable(errMsg));
296 resultingFuture = Futures.immediateCheckedFuture(Boolean.TRUE);
299 return resultingFuture;
304 //TODO: Refactor twice sending deviceContext into gatheringStatistics
305 private ListenableFuture<Boolean> collectFlowStatistics(final MultipartType multipartType, final boolean initial) {
306 return devState.isFlowStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
307 statisticsGatheringOnTheFlyService,
309 /*MultipartType.OFPMPFLOW*/ multipartType,
312 initial, multipartReplyTranslator) : emptyFuture;
315 private ListenableFuture<Boolean> collectTableStatistics(final MultipartType multipartType) {
316 return devState.isTableStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
317 statisticsGatheringService,
319 /*MultipartType.OFPMPTABLE*/ multipartType,
322 false, multipartReplyTranslator) : emptyFuture;
325 private ListenableFuture<Boolean> collectPortStatistics(final MultipartType multipartType) {
326 return devState.isPortStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
327 statisticsGatheringService,
329 /*MultipartType.OFPMPPORTSTATS*/ multipartType,
332 false, multipartReplyTranslator) : emptyFuture;
335 private ListenableFuture<Boolean> collectQueueStatistics(final MultipartType multipartType) {
336 return !devState.isQueueStatisticsAvailable() ? emptyFuture : StatisticsGatheringUtils.gatherStatistics(
337 statisticsGatheringService,
339 /*MultipartType.OFPMPQUEUE*/ multipartType,
342 false, multipartReplyTranslator);
345 private ListenableFuture<Boolean> collectGroupDescStatistics(final MultipartType multipartType) {
346 return devState.isGroupAvailable() ? StatisticsGatheringUtils.gatherStatistics(
347 statisticsGatheringService,
349 /*MultipartType.OFPMPGROUPDESC*/ multipartType,
352 false, multipartReplyTranslator) : emptyFuture;
355 private ListenableFuture<Boolean> collectGroupStatistics(final MultipartType multipartType) {
356 return devState.isGroupAvailable() ? StatisticsGatheringUtils.gatherStatistics(
357 statisticsGatheringService,
359 /*MultipartType.OFPMPGROUP*/ multipartType,
362 false, multipartReplyTranslator) : emptyFuture;
365 private ListenableFuture<Boolean> collectMeterConfigStatistics(final MultipartType multipartType) {
366 return devState.isMetersAvailable() ? StatisticsGatheringUtils.gatherStatistics(
367 statisticsGatheringService,
369 /*MultipartType.OFPMPMETERCONFIG*/ multipartType,
372 false, multipartReplyTranslator) : emptyFuture;
375 private ListenableFuture<Boolean> collectMeterStatistics(final MultipartType multipartType) {
376 return devState.isMetersAvailable() ? StatisticsGatheringUtils.gatherStatistics(
377 statisticsGatheringService,
379 /*MultipartType.OFPMPMETER*/ multipartType,
382 false, multipartReplyTranslator) : emptyFuture;
386 void setStatisticsGatheringService(final StatisticsGatheringService statisticsGatheringService) {
387 this.statisticsGatheringService = statisticsGatheringService;
391 void setStatisticsGatheringOnTheFlyService(final StatisticsGatheringOnTheFlyService
392 statisticsGatheringOnTheFlyService) {
393 this.statisticsGatheringOnTheFlyService = statisticsGatheringOnTheFlyService;
397 public ItemLifecycleListener getItemLifeCycleListener () {
398 return itemLifeCycleListener;
402 public CONTEXT_STATE getState() {
407 public void setState(CONTEXT_STATE state) {
412 public ServiceGroupIdentifier getServiceIdentifier() {
413 return this.deviceInfo.getServiceIdentifier();
417 public DeviceInfo getDeviceInfo() {
418 return this.deviceInfo;
422 public ListenableFuture<Void> stopClusterServices(boolean deviceDisconnected) {
423 myManager.stopScheduling(deviceInfo);
424 return Futures.immediateFuture(null);
428 public LifecycleService getLifecycleService() {
429 return lifecycleService;
433 public void setLifecycleInitializationPhaseHandler(final ClusterInitializationPhaseHandler handler) {
434 this.clusterInitializationPhaseHandler = handler;
438 public boolean onContextInstantiateService(final ConnectionContext connectionContext) {
440 if (connectionContext.getConnectionState().equals(ConnectionContext.CONNECTION_STATE.RIP)) {
441 LOG.warn("Connection on device {} was interrupted, will stop starting master services.", deviceInfo.getLOGValue());
445 if (!this.shuttingDownStatisticsPolling) {
447 LOG.info("Starting statistics context cluster services for node {}", deviceInfo.getLOGValue());
449 this.statListForCollectingInitialization();
450 Futures.addCallback(this.initialGatherDynamicData(), new FutureCallback<Boolean>() {
453 public void onSuccess(@Nullable Boolean aBoolean) {
454 initialSubmitHandler.initialSubmitTransaction();
458 public void onFailure(Throwable throwable) {
459 LOG.warn("Initial gathering statistics unsuccessful for node {}", deviceInfo.getLOGValue());
460 lifecycleService.closeConnection();
464 myManager.startScheduling(deviceInfo);
468 return this.clusterInitializationPhaseHandler.onContextInstantiateService(connectionContext);
472 public void setInitialSubmitHandler(final ClusterInitializationPhaseHandler initialSubmitHandler) {
473 this.initialSubmitHandler = initialSubmitHandler;