final ListenableFuture<RpcResult<List<MultipartReply>>> statisticsDataInFuture =
JdkFutureAdapters.listenInPoolThread(statisticsGatheringService.getStatisticsOfType(
ofpQueuToRequestContextEventIdentifier, type));
- return transformAndStoreStatisticsData(statisticsDataInFuture, deviceContext, wholeProcessEventIdentifier);
+ return transformAndStoreStatisticsData(statisticsDataInFuture, deviceContext, wholeProcessEventIdentifier, type);
}
private static ListenableFuture<Boolean> transformAndStoreStatisticsData(final ListenableFuture<RpcResult<List<MultipartReply>>> statisticsDataInFuture,
final DeviceContext deviceContext,
- final EventIdentifier eventIdentifier) {
+ final EventIdentifier eventIdentifier, final MultipartType type) {
return Futures.transform(statisticsDataInFuture, new Function<RpcResult<List<MultipartReply>>, Boolean>() {
@Nullable
@Override
public Boolean apply(final RpcResult<List<MultipartReply>> rpcResult) {
if (rpcResult.isSuccessful()) {
+ LOG.debug("Stats reply successfully received for node {} of type {}", deviceContext.getDeviceState().getNodeId(), type);
boolean isMultipartProcessed = Boolean.TRUE;
// TODO: in case the result value is null then multipart data probably got processed on the fly -
if (null != rpcResult.getResult()) {
Iterable<? extends DataObject> allMultipartData = Collections.emptyList();
DataObject multipartData = null;
- for (final MultipartReply singleReply : rpcResult.getResult()) {
- final List<? extends DataObject> multipartDataList = MULTIPART_REPLY_TRANSLATOR.translate(deviceContext, singleReply);
- multipartData = multipartDataList.get(0);
- allMultipartData = Iterables.concat(allMultipartData, multipartDataList);
+
+
+ try {
+ for (final MultipartReply singleReply : rpcResult.getResult()) {
+ final List<? extends DataObject> multipartDataList = MULTIPART_REPLY_TRANSLATOR.translate(deviceContext, singleReply);
+ multipartData = multipartDataList.get(0);
+ allMultipartData = Iterables.concat(allMultipartData, multipartDataList);
+ }
+ } catch (Exception e) {
+ LOG.warn("stats processing of type {} for node {} failed during transfomation step",
+ type, deviceContext.getDeviceState().getNodeId(), e);
+ throw e;
}
- if (multipartData instanceof GroupStatisticsUpdated) {
- processGroupStatistics((Iterable<GroupStatisticsUpdated>) allMultipartData, deviceContext);
- } else if (multipartData instanceof MeterStatisticsUpdated) {
- processMetersStatistics((Iterable<MeterStatisticsUpdated>) allMultipartData, deviceContext);
- } else if (multipartData instanceof NodeConnectorStatisticsUpdate) {
- processNodeConnectorStatistics((Iterable<NodeConnectorStatisticsUpdate>) allMultipartData, deviceContext);
- } else if (multipartData instanceof FlowTableStatisticsUpdate) {
- processFlowTableStatistics((Iterable<FlowTableStatisticsUpdate>) allMultipartData, deviceContext);
- } else if (multipartData instanceof QueueStatisticsUpdate) {
- processQueueStatistics((Iterable<QueueStatisticsUpdate>) allMultipartData, deviceContext);
- } else if (multipartData instanceof FlowsStatisticsUpdate) {
- processFlowStatistics((Iterable<FlowsStatisticsUpdate>) allMultipartData, deviceContext);
- EventsTimeCounter.markEnd(eventIdentifier);
- } else if (multipartData instanceof GroupDescStatsUpdated) {
- processGroupDescStats((Iterable<GroupDescStatsUpdated>) allMultipartData, deviceContext);
- } else if (multipartData instanceof MeterConfigStatsUpdated) {
- processMeterConfigStatsUpdated((Iterable<MeterConfigStatsUpdated>) allMultipartData, deviceContext);
- } else {
- isMultipartProcessed = Boolean.FALSE;
+
+ try {
+ if (multipartData instanceof GroupStatisticsUpdated) {
+ processGroupStatistics((Iterable<GroupStatisticsUpdated>) allMultipartData, deviceContext);
+ } else if (multipartData instanceof MeterStatisticsUpdated) {
+ processMetersStatistics((Iterable<MeterStatisticsUpdated>) allMultipartData, deviceContext);
+ } else if (multipartData instanceof NodeConnectorStatisticsUpdate) {
+ processNodeConnectorStatistics((Iterable<NodeConnectorStatisticsUpdate>) allMultipartData, deviceContext);
+ } else if (multipartData instanceof FlowTableStatisticsUpdate) {
+ processFlowTableStatistics((Iterable<FlowTableStatisticsUpdate>) allMultipartData, deviceContext);
+ } else if (multipartData instanceof QueueStatisticsUpdate) {
+ processQueueStatistics((Iterable<QueueStatisticsUpdate>) allMultipartData, deviceContext);
+ } else if (multipartData instanceof FlowsStatisticsUpdate) {
+ processFlowStatistics((Iterable<FlowsStatisticsUpdate>) allMultipartData, deviceContext);
+ EventsTimeCounter.markEnd(eventIdentifier);
+ } else if (multipartData instanceof GroupDescStatsUpdated) {
+ processGroupDescStats((Iterable<GroupDescStatsUpdated>) allMultipartData, deviceContext);
+ } else if (multipartData instanceof MeterConfigStatsUpdated) {
+ processMeterConfigStatsUpdated((Iterable<MeterConfigStatsUpdated>) allMultipartData, deviceContext);
+ } else {
+ isMultipartProcessed = Boolean.FALSE;
+ }
+ } catch (Exception e) {
+ LOG.warn("stats processing of type {} for node {} failed during write-to-tx step",
+ type, deviceContext.getDeviceState().getNodeId(), e);
+ throw e;
}
+
+ LOG.debug("Stats reply added to transaction for node {} of type {}", deviceContext.getDeviceState().getNodeId(), type);
+
//TODO : implement experimenter
+ } else {
+ LOG.debug("Stats reply was empty for node {} of type {}", deviceContext.getDeviceState().getNodeId(), type);
}
return isMultipartProcessed;
+ } else {
+ LOG.debug("Stats reply FAILED for node {} of type {}: {}", deviceContext.getDeviceState().getNodeId(), type, rpcResult.getErrors());
}
return Boolean.FALSE;
}
import io.netty.util.TimerTask;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.sm.control.rev150812.GetStatisticsWorkModeOutputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.sm.control.rev150812.StatisticsManagerControlService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.sm.control.rev150812.StatisticsWorkMode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole;
import org.opendaylight.yangtools.yang.common.RpcError;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private void pollStatistics(final DeviceContext deviceContext,
final StatisticsContext statisticsContext,
final TimeCounter timeCounter) {
+ LOG.debug("POLLING ALL STATS for device: {}", deviceContext.getDeviceState().getNodeId().getValue());
timeCounter.markStart();
ListenableFuture<Boolean> deviceStatisticsCollectionFuture = statisticsContext.gatherDynamicData();
Futures.addCallback(deviceStatisticsCollectionFuture, new FutureCallback<Boolean>() {
scheduleNextPolling(deviceContext, statisticsContext, timeCounter);
}
});
+
+ final long STATS_TIMEOUT_SEC = 20L;
+ try {
+ deviceStatisticsCollectionFuture.get(STATS_TIMEOUT_SEC, TimeUnit.SECONDS);
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.warn("Statistics collection for node {} failed", deviceContext.getDeviceState().getNodeId(), e);
+ } catch (TimeoutException e) {
+ LOG.info("Statistics collection for node {} still in progress even after {} secs", deviceContext.getDeviceState().getNodeId(), STATS_TIMEOUT_SEC);
+ }
}
private void scheduleNextPolling(final DeviceContext deviceContext,
final StatisticsContext statisticsContext,
final TimeCounter timeCounter) {
if (null != hashedWheelTimer) {
+ LOG.debug("SCHEDULING NEXT STATS POLLING for device: {}", deviceContext.getDeviceState().getNodeId().getValue());
if (!shuttingDownStatisticsPolling) {
Timeout pollTimeout = hashedWheelTimer.newTimeout(new TimerTask() {
@Override
}, currentTimerDelay, TimeUnit.MILLISECONDS);
statisticsContext.setPollTimeout(pollTimeout);
}
+ } else {
+ LOG.debug("#!NOT SCHEDULING NEXT STATS POLLING for device: {}", deviceContext.getDeviceState().getNodeId().getValue());
}
}