2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.impl.statistics;
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.collect.ImmutableList;
14 import com.google.common.collect.Iterators;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.SettableFuture;
19 import io.netty.util.Timeout;
20 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
21 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
22 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
23 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
24 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
25 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleConductor;
26 import org.opendaylight.openflowplugin.api.openflow.rpc.listener.ItemLifecycleListener;
27 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
28 import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
29 import org.opendaylight.openflowplugin.impl.rpc.listener.ItemLifecycleListenerImpl;
30 import org.opendaylight.openflowplugin.impl.services.RequestContextUtil;
31 import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringOnTheFlyService;
32 import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringService;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
37 import javax.annotation.CheckForNull;
38 import javax.annotation.Nonnull;
39 import javax.annotation.Nullable;
40 import javax.annotation.concurrent.GuardedBy;
41 import java.util.ArrayList;
42 import java.util.Collection;
43 import java.util.HashSet;
44 import java.util.Iterator;
45 import java.util.List;
46 import java.util.Optional;
48 class StatisticsContextImpl implements StatisticsContext {
50 private static final Logger LOG = LoggerFactory.getLogger(StatisticsContextImpl.class);
51 private static final String CONNECTION_CLOSED = "Connection closed.";
53 private final ItemLifecycleListener itemLifeCycleListener;
54 private final Collection<RequestContext<?>> requestContexts = new HashSet<>();
55 private final DeviceContext deviceContext;
56 private final DeviceState devState;
57 private final ListenableFuture<Boolean> emptyFuture;
58 private final boolean shuttingDownStatisticsPolling;
59 private final Object COLLECTION_STAT_TYPE_LOCK = new Object();
60 @GuardedBy("COLLECTION_STAT_TYPE_LOCK")
61 private List<MultipartType> collectingStatType;
63 private StatisticsGatheringService statisticsGatheringService;
64 private StatisticsGatheringOnTheFlyService statisticsGatheringOnTheFlyService;
65 private Timeout pollTimeout;
67 private volatile boolean schedulingEnabled;
69 StatisticsContextImpl(@CheckForNull final DeviceInfo deviceInfo, final boolean shuttingDownStatisticsPolling, final LifecycleConductor lifecycleConductor) {
70 this.deviceContext = Preconditions.checkNotNull(lifecycleConductor.getDeviceContext(deviceInfo));
71 this.devState = Preconditions.checkNotNull(deviceContext.getDeviceState());
72 this.shuttingDownStatisticsPolling = shuttingDownStatisticsPolling;
73 emptyFuture = Futures.immediateFuture(false);
74 statisticsGatheringService = new StatisticsGatheringService(this, deviceContext);
75 statisticsGatheringOnTheFlyService = new StatisticsGatheringOnTheFlyService(this, deviceContext);
76 itemLifeCycleListener = new ItemLifecycleListenerImpl(deviceContext);
77 statListForCollectingInitialization();
81 public void statListForCollectingInitialization() {
82 synchronized (COLLECTION_STAT_TYPE_LOCK) {
83 final List<MultipartType> statListForCollecting = new ArrayList<>();
84 if (devState.isTableStatisticsAvailable()) {
85 statListForCollecting.add(MultipartType.OFPMPTABLE);
87 if (devState.isFlowStatisticsAvailable()) {
88 statListForCollecting.add(MultipartType.OFPMPFLOW);
90 if (devState.isGroupAvailable()) {
91 statListForCollecting.add(MultipartType.OFPMPGROUPDESC);
92 statListForCollecting.add(MultipartType.OFPMPGROUP);
94 if (devState.isMetersAvailable()) {
95 statListForCollecting.add(MultipartType.OFPMPMETERCONFIG);
96 statListForCollecting.add(MultipartType.OFPMPMETER);
98 if (devState.isPortStatisticsAvailable()) {
99 statListForCollecting.add(MultipartType.OFPMPPORTSTATS);
101 if (devState.isQueueStatisticsAvailable()) {
102 statListForCollecting.add(MultipartType.OFPMPQUEUE);
104 collectingStatType = ImmutableList.<MultipartType>copyOf(statListForCollecting);
110 public ListenableFuture<Boolean> initialGatherDynamicData() {
111 return gatherDynamicData(true);
115 public ListenableFuture<Boolean> gatherDynamicData(){
116 return gatherDynamicData(false);
119 private ListenableFuture<Boolean> gatherDynamicData(final boolean initial) {
120 if (shuttingDownStatisticsPolling) {
121 LOG.debug("Statistics for device {} is not enabled.", deviceContext.getDeviceInfo().getNodeId());
122 return Futures.immediateFuture(Boolean.TRUE);
124 final ListenableFuture<Boolean> errorResultFuture = deviceConnectionCheck();
125 if (errorResultFuture != null) {
126 return errorResultFuture;
128 synchronized (COLLECTION_STAT_TYPE_LOCK) {
129 final Iterator<MultipartType> statIterator = collectingStatType.iterator();
130 final SettableFuture<Boolean> settableStatResultFuture = SettableFuture.create();
132 // write start timestamp to state snapshot container
133 StatisticsGatheringUtils.markDeviceStateSnapshotStart(deviceContext);
135 statChainFuture(statIterator, settableStatResultFuture, initial);
137 // write end timestamp to state snapshot container
138 Futures.addCallback(settableStatResultFuture, new FutureCallback<Boolean>() {
140 public void onSuccess(@Nullable final Boolean result) {
141 StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceContext, true);
144 public void onFailure(final Throwable t) {
145 StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceContext, false);
148 return settableStatResultFuture;
152 private ListenableFuture<Boolean> chooseStat(final MultipartType multipartType, final boolean initial){
153 switch (multipartType) {
155 return collectFlowStatistics(multipartType, initial);
157 return collectTableStatistics(multipartType);
159 return collectPortStatistics(multipartType);
161 return collectQueueStatistics(multipartType);
163 return collectGroupDescStatistics(multipartType);
165 return collectGroupStatistics(multipartType);
166 case OFPMPMETERCONFIG:
167 return collectMeterConfigStatistics(multipartType);
169 return collectMeterStatistics(multipartType);
171 LOG.warn("Unsuported Statistics type {}", multipartType);
172 return Futures.immediateCheckedFuture(Boolean.TRUE);
178 public <T> RequestContext<T> createRequestContext() {
179 final AbstractRequestContext<T> ret = new AbstractRequestContext<T>(deviceContext.reserveXidForDeviceMessage()) {
181 public void close() {
182 requestContexts.remove(this);
185 requestContexts.add(ret);
190 public void close() {
191 schedulingEnabled = false;
192 for (final Iterator<RequestContext<?>> iterator = Iterators.consumingIterator(requestContexts.iterator());
193 iterator.hasNext();) {
194 RequestContextUtil.closeRequestContextWithRpcError(iterator.next(), CONNECTION_CLOSED);
196 if (null != pollTimeout && !pollTimeout.isExpired()) {
197 pollTimeout.cancel();
202 public void setSchedulingEnabled(final boolean schedulingEnabled) {
203 this.schedulingEnabled = schedulingEnabled;
207 public boolean isSchedulingEnabled() {
208 return schedulingEnabled;
212 public void setPollTimeout(final Timeout pollTimeout) {
213 this.pollTimeout = pollTimeout;
217 public Optional<Timeout> getPollTimeout() {
218 return Optional.ofNullable(pollTimeout);
221 private void statChainFuture(final Iterator<MultipartType> iterator, final SettableFuture<Boolean> resultFuture, final boolean initial) {
222 if (ConnectionContext.CONNECTION_STATE.RIP.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) {
223 final String errMsg = String.format("Device connection is closed for Node : %s.",
224 deviceContext.getDeviceInfo().getNodeId());
226 resultFuture.setException(new IllegalStateException(errMsg));
229 if ( ! iterator.hasNext()) {
230 resultFuture.set(Boolean.TRUE);
231 LOG.debug("Stats collection successfully finished for node {}", deviceContext.getDeviceInfo().getNodeId());
235 final MultipartType nextType = iterator.next();
236 LOG.debug("Stats iterating to next type for node {} of type {}", deviceContext.getDeviceInfo().getNodeId(), nextType);
238 final ListenableFuture<Boolean> deviceStatisticsCollectionFuture = chooseStat(nextType, initial);
239 Futures.addCallback(deviceStatisticsCollectionFuture, new FutureCallback<Boolean>() {
241 public void onSuccess(final Boolean result) {
242 statChainFuture(iterator, resultFuture, initial);
245 public void onFailure(@Nonnull final Throwable t) {
246 resultFuture.setException(t);
252 * Method checks a device state. It returns null for be able continue. Otherwise it returns immediateFuture
253 * which has to be returned from caller too
258 ListenableFuture<Boolean> deviceConnectionCheck() {
259 if (!ConnectionContext.CONNECTION_STATE.WORKING.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) {
260 ListenableFuture<Boolean> resultingFuture;
261 switch (deviceContext.getPrimaryConnectionContext().getConnectionState()) {
263 final String errMsg = String.format("Device connection doesn't exist anymore. Primary connection status : %s",
264 deviceContext.getPrimaryConnectionContext().getConnectionState());
265 resultingFuture = Futures.immediateFailedFuture(new Throwable(errMsg));
268 resultingFuture = Futures.immediateCheckedFuture(Boolean.TRUE);
271 return resultingFuture;
276 //TODO: Refactor twice sending deviceContext into gatheringStatistics
277 private ListenableFuture<Boolean> collectFlowStatistics(final MultipartType multipartType, final boolean initial) {
278 return devState.isFlowStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
279 statisticsGatheringOnTheFlyService,
280 deviceContext.getDeviceInfo(),
281 /*MultipartType.OFPMPFLOW*/ multipartType,
284 initial) : emptyFuture;
287 private ListenableFuture<Boolean> collectTableStatistics(final MultipartType multipartType) {
288 return devState.isTableStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
289 statisticsGatheringService,
290 deviceContext.getDeviceInfo(),
291 /*MultipartType.OFPMPTABLE*/ multipartType,
294 false) : emptyFuture;
297 private ListenableFuture<Boolean> collectPortStatistics(final MultipartType multipartType) {
298 return devState.isPortStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
299 statisticsGatheringService,
300 deviceContext.getDeviceInfo(),
301 /*MultipartType.OFPMPPORTSTATS*/ multipartType,
304 false) : emptyFuture;
307 private ListenableFuture<Boolean> collectQueueStatistics(final MultipartType multipartType) {
308 return !devState.isQueueStatisticsAvailable() ? emptyFuture : StatisticsGatheringUtils.gatherStatistics(
309 statisticsGatheringService,
310 deviceContext.getDeviceInfo(),
311 /*MultipartType.OFPMPQUEUE*/ multipartType,
317 private ListenableFuture<Boolean> collectGroupDescStatistics(final MultipartType multipartType) {
318 return devState.isGroupAvailable() ? StatisticsGatheringUtils.gatherStatistics(
319 statisticsGatheringService,
320 deviceContext.getDeviceInfo(),
321 /*MultipartType.OFPMPGROUPDESC*/ multipartType,
324 false) : emptyFuture;
327 private ListenableFuture<Boolean> collectGroupStatistics(final MultipartType multipartType) {
328 return devState.isGroupAvailable() ? StatisticsGatheringUtils.gatherStatistics(
329 statisticsGatheringService,
330 deviceContext.getDeviceInfo(),
331 /*MultipartType.OFPMPGROUP*/ multipartType,
334 false) : emptyFuture;
337 private ListenableFuture<Boolean> collectMeterConfigStatistics(final MultipartType multipartType) {
338 return devState.isMetersAvailable() ? StatisticsGatheringUtils.gatherStatistics(
339 statisticsGatheringService,
340 deviceContext.getDeviceInfo(),
341 /*MultipartType.OFPMPMETERCONFIG*/ multipartType,
344 false) : emptyFuture;
347 private ListenableFuture<Boolean> collectMeterStatistics(final MultipartType multipartType) {
348 return devState.isMetersAvailable() ? StatisticsGatheringUtils.gatherStatistics(
349 statisticsGatheringService,
350 deviceContext.getDeviceInfo(),
351 /*MultipartType.OFPMPMETER*/ multipartType,
354 false) : emptyFuture;
358 void setStatisticsGatheringService(final StatisticsGatheringService statisticsGatheringService) {
359 this.statisticsGatheringService = statisticsGatheringService;
363 void setStatisticsGatheringOnTheFlyService(final StatisticsGatheringOnTheFlyService
364 statisticsGatheringOnTheFlyService) {
365 this.statisticsGatheringOnTheFlyService = statisticsGatheringOnTheFlyService;
369 public ItemLifecycleListener getItemLifeCycleListener () {
370 return itemLifeCycleListener;