BUG-3774: 100k flows initial stats fail - logging 68/30968/5
authorMichal Rehak <mirehak@cisco.com>
Tue, 8 Dec 2015 09:47:37 +0000 (10:47 +0100)
committerMichal Rehak <mirehak@cisco.com>
Fri, 8 Jan 2016 12:45:27 +0000 (13:45 +0100)
 - added stats schedule and polling logs
 - added much more logging
 - added try-catch to statistics processing

Change-Id: If953ff9d72c06cd57bd3158a407bfdfdcd370860
Signed-off-by: Michal Rehak <mirehak@cisco.com>
(cherry picked from commit 134eb753830bb774ddf3257c6a3e86cfcbdbeffc)

openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsContextImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsGatheringUtils.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsManagerImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/services/dedicated/StatisticsGatheringOnTheFlyService.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/services/dedicated/StatisticsGatheringService.java
openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsManagerImplTest.java

index 301d0061fbd97c226ea5ce413489da571eca6589..3ef1a95eb753bc1d1179f0e0f45a4f0b931b66ef 100644 (file)
@@ -160,9 +160,14 @@ public class StatisticsContextImpl implements StatisticsContext {
     void statChainFuture(final Iterator<MultipartType> iterator, final SettableFuture<Boolean> resultFuture) {
         if ( ! iterator.hasNext()) {
             resultFuture.set(Boolean.TRUE);
+            LOG.debug("Stats collection successfully finished for node {}", deviceContext.getDeviceState().getNodeId());
             return;
         }
-        final ListenableFuture<Boolean> deviceStatisticsCollectionFuture = chooseStat(iterator.next());
+
+        final MultipartType nextType = iterator.next();
+        LOG.debug("Stats iterating to next type for node {} of type {}", deviceContext.getDeviceState().getNodeId(), nextType);
+
+        final ListenableFuture<Boolean> deviceStatisticsCollectionFuture = chooseStat(nextType);
         Futures.addCallback(deviceStatisticsCollectionFuture, new FutureCallback<Boolean>() {
             @Override
             public void onSuccess(final Boolean result) {
index 06d8d9d5b60970d37718217d793ea45f2795bab3..4730e085bd6e552e7677e2ef29971aa7eaa22e4b 100644 (file)
@@ -124,17 +124,18 @@ public final class StatisticsGatheringUtils {
         final ListenableFuture<RpcResult<List<MultipartReply>>> statisticsDataInFuture =
                 JdkFutureAdapters.listenInPoolThread(statisticsGatheringService.getStatisticsOfType(
                         ofpQueuToRequestContextEventIdentifier, type));
-        return transformAndStoreStatisticsData(statisticsDataInFuture, deviceContext, wholeProcessEventIdentifier);
+        return transformAndStoreStatisticsData(statisticsDataInFuture, deviceContext, wholeProcessEventIdentifier, type);
     }
 
     private static ListenableFuture<Boolean> transformAndStoreStatisticsData(final ListenableFuture<RpcResult<List<MultipartReply>>> statisticsDataInFuture,
                                                                              final DeviceContext deviceContext,
-                                                                             final EventIdentifier eventIdentifier) {
+                                                                             final EventIdentifier eventIdentifier, final MultipartType type) {
         return Futures.transform(statisticsDataInFuture, new Function<RpcResult<List<MultipartReply>>, Boolean>() {
             @Nullable
             @Override
             public Boolean apply(final RpcResult<List<MultipartReply>> rpcResult) {
                 if (rpcResult.isSuccessful()) {
+                    LOG.debug("Stats reply successfully received for node {} of type {}", deviceContext.getDeviceState().getNodeId(), type);
                     boolean isMultipartProcessed = Boolean.TRUE;
 
                     // TODO: in case the result value is null then multipart data probably got processed on the fly -
@@ -142,36 +143,58 @@ public final class StatisticsGatheringUtils {
                     if (null != rpcResult.getResult()) {
                         Iterable<? extends DataObject> allMultipartData = Collections.emptyList();
                         DataObject multipartData = null;
-                        for (final MultipartReply singleReply : rpcResult.getResult()) {
-                            final List<? extends DataObject> multipartDataList = MULTIPART_REPLY_TRANSLATOR.translate(deviceContext, singleReply);
-                            multipartData = multipartDataList.get(0);
-                            allMultipartData = Iterables.concat(allMultipartData, multipartDataList);
+
+
+                        try {
+                            for (final MultipartReply singleReply : rpcResult.getResult()) {
+                                final List<? extends DataObject> multipartDataList = MULTIPART_REPLY_TRANSLATOR.translate(deviceContext, singleReply);
+                                multipartData = multipartDataList.get(0);
+                                allMultipartData = Iterables.concat(allMultipartData, multipartDataList);
+                            }
+                        } catch (Exception e) {
+                            LOG.warn("stats processing of type {} for node {} failed during transfomation step",
+                                    type, deviceContext.getDeviceState().getNodeId(), e);
+                            throw e;
                         }
 
-                        if (multipartData instanceof GroupStatisticsUpdated) {
-                            processGroupStatistics((Iterable<GroupStatisticsUpdated>) allMultipartData, deviceContext);
-                        } else if (multipartData instanceof MeterStatisticsUpdated) {
-                            processMetersStatistics((Iterable<MeterStatisticsUpdated>) allMultipartData, deviceContext);
-                        } else if (multipartData instanceof NodeConnectorStatisticsUpdate) {
-                            processNodeConnectorStatistics((Iterable<NodeConnectorStatisticsUpdate>) allMultipartData, deviceContext);
-                        } else if (multipartData instanceof FlowTableStatisticsUpdate) {
-                            processFlowTableStatistics((Iterable<FlowTableStatisticsUpdate>) allMultipartData, deviceContext);
-                        } else if (multipartData instanceof QueueStatisticsUpdate) {
-                            processQueueStatistics((Iterable<QueueStatisticsUpdate>) allMultipartData, deviceContext);
-                        } else if (multipartData instanceof FlowsStatisticsUpdate) {
-                            processFlowStatistics((Iterable<FlowsStatisticsUpdate>) allMultipartData, deviceContext);
-                            EventsTimeCounter.markEnd(eventIdentifier);
-                        } else if (multipartData instanceof GroupDescStatsUpdated) {
-                            processGroupDescStats((Iterable<GroupDescStatsUpdated>) allMultipartData, deviceContext);
-                        } else if (multipartData instanceof MeterConfigStatsUpdated) {
-                            processMeterConfigStatsUpdated((Iterable<MeterConfigStatsUpdated>) allMultipartData, deviceContext);
-                        } else {
-                            isMultipartProcessed = Boolean.FALSE;
+
+                        try {
+                            if (multipartData instanceof GroupStatisticsUpdated) {
+                                processGroupStatistics((Iterable<GroupStatisticsUpdated>) allMultipartData, deviceContext);
+                            } else if (multipartData instanceof MeterStatisticsUpdated) {
+                                processMetersStatistics((Iterable<MeterStatisticsUpdated>) allMultipartData, deviceContext);
+                            } else if (multipartData instanceof NodeConnectorStatisticsUpdate) {
+                                processNodeConnectorStatistics((Iterable<NodeConnectorStatisticsUpdate>) allMultipartData, deviceContext);
+                            } else if (multipartData instanceof FlowTableStatisticsUpdate) {
+                                processFlowTableStatistics((Iterable<FlowTableStatisticsUpdate>) allMultipartData, deviceContext);
+                            } else if (multipartData instanceof QueueStatisticsUpdate) {
+                                processQueueStatistics((Iterable<QueueStatisticsUpdate>) allMultipartData, deviceContext);
+                            } else if (multipartData instanceof FlowsStatisticsUpdate) {
+                                processFlowStatistics((Iterable<FlowsStatisticsUpdate>) allMultipartData, deviceContext);
+                                EventsTimeCounter.markEnd(eventIdentifier);
+                            } else if (multipartData instanceof GroupDescStatsUpdated) {
+                                processGroupDescStats((Iterable<GroupDescStatsUpdated>) allMultipartData, deviceContext);
+                            } else if (multipartData instanceof MeterConfigStatsUpdated) {
+                                processMeterConfigStatsUpdated((Iterable<MeterConfigStatsUpdated>) allMultipartData, deviceContext);
+                            } else {
+                                isMultipartProcessed = Boolean.FALSE;
+                            }
+                        } catch (Exception e) {
+                            LOG.warn("stats processing of type {} for node {} failed during write-to-tx step",
+                                    type, deviceContext.getDeviceState().getNodeId(), e);
+                            throw e;
                         }
+
+                        LOG.debug("Stats reply added to transaction for node {} of type {}", deviceContext.getDeviceState().getNodeId(), type);
+
                         //TODO : implement experimenter
+                    } else {
+                        LOG.debug("Stats reply was empty for node {} of type {}", deviceContext.getDeviceState().getNodeId(), type);
                     }
 
                     return isMultipartProcessed;
+                } else {
+                    LOG.debug("Stats reply FAILED for node {} of type {}: {}", deviceContext.getDeviceState().getNodeId(), type, rpcResult.getErrors());
                 }
                 return Boolean.FALSE;
             }
index 7a2a55ef58736ca700ac4c7e851668fa7913f9f7..d360e9f0ebd14a0ebdd540b766a6bbba82ea89ed 100644 (file)
@@ -18,9 +18,11 @@ import io.netty.util.Timeout;
 import io.netty.util.TimerTask;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
@@ -34,10 +36,10 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.sm.control.rev150812.GetStatisticsWorkModeOutputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.sm.control.rev150812.StatisticsManagerControlService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.sm.control.rev150812.StatisticsWorkMode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole;
 import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -136,6 +138,7 @@ public class StatisticsManagerImpl implements StatisticsManager, StatisticsManag
     private void pollStatistics(final DeviceContext deviceContext,
                                 final StatisticsContext statisticsContext,
                                 final TimeCounter timeCounter) {
+        LOG.debug("POLLING ALL STATS for device: {}", deviceContext.getDeviceState().getNodeId().getValue());
         timeCounter.markStart();
         ListenableFuture<Boolean> deviceStatisticsCollectionFuture = statisticsContext.gatherDynamicData();
         Futures.addCallback(deviceStatisticsCollectionFuture, new FutureCallback<Boolean>() {
@@ -155,12 +158,22 @@ public class StatisticsManagerImpl implements StatisticsManager, StatisticsManag
                 scheduleNextPolling(deviceContext, statisticsContext, timeCounter);
             }
         });
+
+        final long STATS_TIMEOUT_SEC = 20L;
+        try {
+            deviceStatisticsCollectionFuture.get(STATS_TIMEOUT_SEC, TimeUnit.SECONDS);
+        } catch (InterruptedException | ExecutionException e) {
+            LOG.warn("Statistics collection for node {} failed", deviceContext.getDeviceState().getNodeId(), e);
+        } catch (TimeoutException e) {
+            LOG.info("Statistics collection for node {} still in progress even after {} secs", deviceContext.getDeviceState().getNodeId(), STATS_TIMEOUT_SEC);
+        }
     }
 
     private void scheduleNextPolling(final DeviceContext deviceContext,
                                      final StatisticsContext statisticsContext,
                                      final TimeCounter timeCounter) {
         if (null != hashedWheelTimer) {
+            LOG.debug("SCHEDULING NEXT STATS POLLING for device: {}", deviceContext.getDeviceState().getNodeId().getValue());
             if (!shuttingDownStatisticsPolling) {
                 Timeout pollTimeout = hashedWheelTimer.newTimeout(new TimerTask() {
                     @Override
@@ -170,6 +183,8 @@ public class StatisticsManagerImpl implements StatisticsManager, StatisticsManag
                 }, currentTimerDelay, TimeUnit.MILLISECONDS);
                 statisticsContext.setPollTimeout(pollTimeout);
             }
+        } else {
+            LOG.debug("#!NOT SCHEDULING NEXT STATS POLLING for device: {}", deviceContext.getDeviceState().getNodeId().getValue());
         }
     }
 
index a1310c221c54cf50204886ab9a05d23fdff11a57..a38c4b480a44d13a687fd933e7638a3ec358c7c4 100644 (file)
@@ -22,18 +22,23 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev13
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReply;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
 import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * collects statistics and processes them on the fly
  */
 public class StatisticsGatheringOnTheFlyService extends AbstractMultipartOnTheFlyService<MultipartType> implements StatisticsGatherer {
 
+    private static final Logger LOG = LoggerFactory.getLogger(StatisticsGatheringOnTheFlyService.class);
+
     public StatisticsGatheringOnTheFlyService(final RequestContextStack requestContextStack, final DeviceContext deviceContext) {
         super(requestContextStack, deviceContext);
     }
 
     @Override
     public Future<RpcResult<List<MultipartReply>>> getStatisticsOfType(final EventIdentifier eventIdentifier, final MultipartType type) {
+        LOG.debug("Getting statistics (onTheFly) for node {} of type {}", getDeviceContext().getDeviceState().getNodeId(), type);
         EventsTimeCounter.markStart(eventIdentifier);
         setEventIdentifier(eventIdentifier);
         return handleServiceCall(type);
index d61d77a015487dbdc3d223aa4e3afe10d236af60..9a2eb17de6d38b62b479bea658d4923b969b3af6 100644 (file)
@@ -22,18 +22,23 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev13
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReply;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
 import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Created by Martin Bobak &lt;mbobak@cisco.com&gt; on 4.4.2015.
  */
 public class StatisticsGatheringService extends AbstractMultipartService<MultipartType> implements StatisticsGatherer {
 
+    private static final Logger LOG = LoggerFactory.getLogger(StatisticsGatheringService.class);
+
     public StatisticsGatheringService(final RequestContextStack requestContextStack, final DeviceContext deviceContext) {
         super(requestContextStack, deviceContext);
     }
 
     @Override
     public Future<RpcResult<List<MultipartReply>>> getStatisticsOfType(final EventIdentifier eventIdentifier, final MultipartType type) {
+        LOG.debug("Getting statistics for node {} of type {}", getDeviceContext().getDeviceState().getNodeId(), type);
         EventsTimeCounter.markStart(eventIdentifier);
         setEventIdentifier(eventIdentifier);
         return handleServiceCall(type);
index 328b8f10cec6ba2a5d427141dbb32ac1d418964a..8c0b0bc2e72149962c5c652dd17040c2c3265be6 100644 (file)
@@ -120,6 +120,8 @@ public class StatisticsManagerImplTest {
         when(mockedDeviceState.isQueueStatisticsAvailable()).thenReturn(true);
         when(mockedDeviceState.isTableStatisticsAvailable()).thenReturn(true);
 
+        when(mockedDeviceState.getNodeId()).thenReturn(new NodeId("ofp-unit-dummy-node-id"));
+
         when(mockedDeviceContext.getPrimaryConnectionContext()).thenReturn(mockedPrimConnectionContext);
         when(mockedDeviceContext.getMessageSpy()).thenReturn(mockedMessagSpy);
         when(mockedDeviceContext.getDeviceFlowRegistry()).thenReturn(new DeviceFlowRegistryImpl());