2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.impl.statistics;
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.collect.ImmutableList;
14 import com.google.common.collect.Iterators;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.SettableFuture;
19 import io.netty.util.Timeout;
20 import java.util.ArrayList;
21 import java.util.Collection;
22 import java.util.HashSet;
23 import java.util.Iterator;
24 import java.util.List;
25 import java.util.Optional;
26 import java.util.concurrent.ExecutionException;
27 import javax.annotation.Nonnull;
28 import javax.annotation.Nullable;
29 import javax.annotation.concurrent.GuardedBy;
30 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
31 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
32 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
33 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
34 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
35 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
36 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleConductor;
37 import org.opendaylight.openflowplugin.api.openflow.rpc.listener.ItemLifecycleListener;
38 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
39 import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
40 import org.opendaylight.openflowplugin.impl.rpc.listener.ItemLifecycleListenerImpl;
41 import org.opendaylight.openflowplugin.impl.services.RequestContextUtil;
42 import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringOnTheFlyService;
43 import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringService;
44 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
49 class StatisticsContextImpl implements StatisticsContext {
51 private static final Logger LOG = LoggerFactory.getLogger(StatisticsContextImpl.class);
52 private static final String CONNECTION_CLOSED = "Connection closed.";
54 private final ItemLifecycleListener itemLifeCycleListener;
55 private final Collection<RequestContext<?>> requestContexts = new HashSet<>();
56 private final DeviceContext deviceContext;
57 private final DeviceState devState;
58 private final ListenableFuture<Boolean> emptyFuture;
59 private final boolean shuttingDownStatisticsPolling;
60 private final Object COLLECTION_STAT_TYPE_LOCK = new Object();
61 private final SinglePurposeMultipartReplyTranslator multipartReplyTranslator;
62 @GuardedBy("COLLECTION_STAT_TYPE_LOCK")
63 private List<MultipartType> collectingStatType;
65 private StatisticsGatheringService statisticsGatheringService;
66 private StatisticsGatheringOnTheFlyService statisticsGatheringOnTheFlyService;
67 private Timeout pollTimeout;
68 private final DeviceInfo deviceInfo;
70 private volatile boolean schedulingEnabled;
71 private volatile CONTEXT_STATE state;
73 StatisticsContextImpl(@Nonnull final DeviceInfo deviceInfo,
74 @Nonnull final boolean shuttingDownStatisticsPolling,
75 @Nonnull final LifecycleConductor lifecycleConductor,
76 @Nonnull final ConvertorExecutor convertorExecutor) {
77 this.deviceContext = Preconditions.checkNotNull(lifecycleConductor.getDeviceContext(deviceInfo));
78 this.devState = Preconditions.checkNotNull(deviceContext.getDeviceState());
79 this.shuttingDownStatisticsPolling = shuttingDownStatisticsPolling;
80 multipartReplyTranslator = new SinglePurposeMultipartReplyTranslator(convertorExecutor);
81 emptyFuture = Futures.immediateFuture(false);
82 statisticsGatheringService = new StatisticsGatheringService(this, deviceContext);
83 statisticsGatheringOnTheFlyService = new StatisticsGatheringOnTheFlyService(this, deviceContext, convertorExecutor);
84 itemLifeCycleListener = new ItemLifecycleListenerImpl(deviceContext);
85 statListForCollectingInitialization();
86 setState(CONTEXT_STATE.INITIALIZATION);
87 this.deviceInfo = deviceInfo;
91 public void statListForCollectingInitialization() {
92 synchronized (COLLECTION_STAT_TYPE_LOCK) {
93 final List<MultipartType> statListForCollecting = new ArrayList<>();
94 if (devState.isTableStatisticsAvailable()) {
95 statListForCollecting.add(MultipartType.OFPMPTABLE);
97 if (devState.isFlowStatisticsAvailable()) {
98 statListForCollecting.add(MultipartType.OFPMPFLOW);
100 if (devState.isGroupAvailable()) {
101 statListForCollecting.add(MultipartType.OFPMPGROUPDESC);
102 statListForCollecting.add(MultipartType.OFPMPGROUP);
104 if (devState.isMetersAvailable()) {
105 statListForCollecting.add(MultipartType.OFPMPMETERCONFIG);
106 statListForCollecting.add(MultipartType.OFPMPMETER);
108 if (devState.isPortStatisticsAvailable()) {
109 statListForCollecting.add(MultipartType.OFPMPPORTSTATS);
111 if (devState.isQueueStatisticsAvailable()) {
112 statListForCollecting.add(MultipartType.OFPMPQUEUE);
114 collectingStatType = ImmutableList.<MultipartType>copyOf(statListForCollecting);
120 public ListenableFuture<Boolean> initialGatherDynamicData() {
121 return gatherDynamicData(true);
125 public ListenableFuture<Boolean> gatherDynamicData(){
126 return gatherDynamicData(false);
129 private ListenableFuture<Boolean> gatherDynamicData(final boolean initial) {
130 if (shuttingDownStatisticsPolling) {
131 LOG.debug("Statistics for device {} is not enabled.", getDeviceInfo().getNodeId());
132 return Futures.immediateFuture(Boolean.TRUE);
134 final ListenableFuture<Boolean> errorResultFuture = deviceConnectionCheck();
135 if (errorResultFuture != null) {
136 return errorResultFuture;
138 synchronized (COLLECTION_STAT_TYPE_LOCK) {
139 final Iterator<MultipartType> statIterator = collectingStatType.iterator();
140 final SettableFuture<Boolean> settableStatResultFuture = SettableFuture.create();
142 // write start timestamp to state snapshot container
143 StatisticsGatheringUtils.markDeviceStateSnapshotStart(deviceContext);
145 statChainFuture(statIterator, settableStatResultFuture, initial);
147 // write end timestamp to state snapshot container
148 Futures.addCallback(settableStatResultFuture, new FutureCallback<Boolean>() {
150 public void onSuccess(@Nullable final Boolean result) {
151 StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceContext, true);
154 public void onFailure(final Throwable t) {
155 StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceContext, false);
158 return settableStatResultFuture;
162 private ListenableFuture<Boolean> chooseStat(final MultipartType multipartType, final boolean initial){
163 switch (multipartType) {
165 return collectFlowStatistics(multipartType, initial);
167 return collectTableStatistics(multipartType);
169 return collectPortStatistics(multipartType);
171 return collectQueueStatistics(multipartType);
173 return collectGroupDescStatistics(multipartType);
175 return collectGroupStatistics(multipartType);
176 case OFPMPMETERCONFIG:
177 return collectMeterConfigStatistics(multipartType);
179 return collectMeterStatistics(multipartType);
181 LOG.warn("Unsuported Statistics type {}", multipartType);
182 return Futures.immediateCheckedFuture(Boolean.TRUE);
188 public <T> RequestContext<T> createRequestContext() {
189 final AbstractRequestContext<T> ret = new AbstractRequestContext<T>(deviceContext.reserveXidForDeviceMessage()) {
191 public void close() {
192 requestContexts.remove(this);
195 requestContexts.add(ret);
200 public void close() {
201 if (CONTEXT_STATE.TERMINATION.equals(getState())) {
202 if (LOG.isDebugEnabled()) {
203 LOG.debug("Statistics context is already in state TERMINATION.");
206 setState(CONTEXT_STATE.TERMINATION);
207 schedulingEnabled = false;
208 for (final Iterator<RequestContext<?>> iterator = Iterators.consumingIterator(requestContexts.iterator());
209 iterator.hasNext(); ) {
210 RequestContextUtil.closeRequestContextWithRpcError(iterator.next(), CONNECTION_CLOSED);
212 if (null != pollTimeout && !pollTimeout.isExpired()) {
213 pollTimeout.cancel();
219 public void setSchedulingEnabled(final boolean schedulingEnabled) {
220 this.schedulingEnabled = schedulingEnabled;
224 public boolean isSchedulingEnabled() {
225 return schedulingEnabled;
229 public void setPollTimeout(final Timeout pollTimeout) {
230 this.pollTimeout = pollTimeout;
234 public Optional<Timeout> getPollTimeout() {
235 return Optional.ofNullable(pollTimeout);
238 private void statChainFuture(final Iterator<MultipartType> iterator, final SettableFuture<Boolean> resultFuture, final boolean initial) {
239 if (ConnectionContext.CONNECTION_STATE.RIP.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) {
240 final String errMsg = String.format("Device connection is closed for Node : %s.",
241 getDeviceInfo().getNodeId());
243 resultFuture.setException(new IllegalStateException(errMsg));
246 if ( ! iterator.hasNext()) {
247 resultFuture.set(Boolean.TRUE);
248 LOG.debug("Stats collection successfully finished for node {}", getDeviceInfo().getNodeId());
252 final MultipartType nextType = iterator.next();
253 LOG.debug("Stats iterating to next type for node {} of type {}", getDeviceInfo().getNodeId(), nextType);
255 final ListenableFuture<Boolean> deviceStatisticsCollectionFuture = chooseStat(nextType, initial);
256 Futures.addCallback(deviceStatisticsCollectionFuture, new FutureCallback<Boolean>() {
258 public void onSuccess(final Boolean result) {
259 statChainFuture(iterator, resultFuture, initial);
262 public void onFailure(@Nonnull final Throwable t) {
263 resultFuture.setException(t);
269 * Method checks a device state. It returns null for be able continue. Otherwise it returns immediateFuture
270 * which has to be returned from caller too
275 ListenableFuture<Boolean> deviceConnectionCheck() {
276 if (!ConnectionContext.CONNECTION_STATE.WORKING.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) {
277 ListenableFuture<Boolean> resultingFuture;
278 switch (deviceContext.getPrimaryConnectionContext().getConnectionState()) {
280 final String errMsg = String.format("Device connection doesn't exist anymore. Primary connection status : %s",
281 deviceContext.getPrimaryConnectionContext().getConnectionState());
282 resultingFuture = Futures.immediateFailedFuture(new Throwable(errMsg));
285 resultingFuture = Futures.immediateCheckedFuture(Boolean.TRUE);
288 return resultingFuture;
293 //TODO: Refactor twice sending deviceContext into gatheringStatistics
294 private ListenableFuture<Boolean> collectFlowStatistics(final MultipartType multipartType, final boolean initial) {
295 return devState.isFlowStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
296 statisticsGatheringOnTheFlyService,
298 /*MultipartType.OFPMPFLOW*/ multipartType,
301 initial, multipartReplyTranslator) : emptyFuture;
304 private ListenableFuture<Boolean> collectTableStatistics(final MultipartType multipartType) {
305 return devState.isTableStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
306 statisticsGatheringService,
308 /*MultipartType.OFPMPTABLE*/ multipartType,
311 false, multipartReplyTranslator) : emptyFuture;
314 private ListenableFuture<Boolean> collectPortStatistics(final MultipartType multipartType) {
315 return devState.isPortStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
316 statisticsGatheringService,
318 /*MultipartType.OFPMPPORTSTATS*/ multipartType,
321 false, multipartReplyTranslator) : emptyFuture;
324 private ListenableFuture<Boolean> collectQueueStatistics(final MultipartType multipartType) {
325 return !devState.isQueueStatisticsAvailable() ? emptyFuture : StatisticsGatheringUtils.gatherStatistics(
326 statisticsGatheringService,
328 /*MultipartType.OFPMPQUEUE*/ multipartType,
331 false, multipartReplyTranslator);
334 private ListenableFuture<Boolean> collectGroupDescStatistics(final MultipartType multipartType) {
335 return devState.isGroupAvailable() ? StatisticsGatheringUtils.gatherStatistics(
336 statisticsGatheringService,
338 /*MultipartType.OFPMPGROUPDESC*/ multipartType,
341 false, multipartReplyTranslator) : emptyFuture;
344 private ListenableFuture<Boolean> collectGroupStatistics(final MultipartType multipartType) {
345 return devState.isGroupAvailable() ? StatisticsGatheringUtils.gatherStatistics(
346 statisticsGatheringService,
348 /*MultipartType.OFPMPGROUP*/ multipartType,
351 false, multipartReplyTranslator) : emptyFuture;
354 private ListenableFuture<Boolean> collectMeterConfigStatistics(final MultipartType multipartType) {
355 return devState.isMetersAvailable() ? StatisticsGatheringUtils.gatherStatistics(
356 statisticsGatheringService,
358 /*MultipartType.OFPMPMETERCONFIG*/ multipartType,
361 false, multipartReplyTranslator) : emptyFuture;
364 private ListenableFuture<Boolean> collectMeterStatistics(final MultipartType multipartType) {
365 return devState.isMetersAvailable() ? StatisticsGatheringUtils.gatherStatistics(
366 statisticsGatheringService,
368 /*MultipartType.OFPMPMETER*/ multipartType,
371 false, multipartReplyTranslator) : emptyFuture;
375 void setStatisticsGatheringService(final StatisticsGatheringService statisticsGatheringService) {
376 this.statisticsGatheringService = statisticsGatheringService;
380 void setStatisticsGatheringOnTheFlyService(final StatisticsGatheringOnTheFlyService
381 statisticsGatheringOnTheFlyService) {
382 this.statisticsGatheringOnTheFlyService = statisticsGatheringOnTheFlyService;
386 public ItemLifecycleListener getItemLifeCycleListener () {
387 return itemLifeCycleListener;
391 public CONTEXT_STATE getState() {
396 public void setState(CONTEXT_STATE state) {
401 public ServiceGroupIdentifier getServiceIdentifier() {
402 return this.deviceInfo.getServiceIdentifier();
406 public DeviceInfo getDeviceInfo() {
407 return this.deviceInfo;
411 public void startupClusterServices() throws ExecutionException, InterruptedException {
412 this.statListForCollectingInitialization();
413 this.initialGatherDynamicData();
417 public ListenableFuture<Void> stopClusterServices() {
418 return Futures.immediateFuture(null);