- if (multipartData instanceof GroupStatisticsUpdated) {
- processGroupStatistics((Iterable<GroupStatisticsUpdated>) allMultipartData, deviceContext);
- } else if (multipartData instanceof MeterStatisticsUpdated) {
- processMetersStatistics((Iterable<MeterStatisticsUpdated>) allMultipartData, deviceContext);
- } else if (multipartData instanceof NodeConnectorStatisticsUpdate) {
- processNodeConnectorStatistics((Iterable<NodeConnectorStatisticsUpdate>) allMultipartData, deviceContext);
- } else if (multipartData instanceof FlowTableStatisticsUpdate) {
- processFlowTableStatistics((Iterable<FlowTableStatisticsUpdate>) allMultipartData, deviceContext);
- } else if (multipartData instanceof QueueStatisticsUpdate) {
- processQueueStatistics((Iterable<QueueStatisticsUpdate>) allMultipartData, deviceContext);
- } else if (multipartData instanceof FlowsStatisticsUpdate) {
- processFlowStatistics((Iterable<FlowsStatisticsUpdate>) allMultipartData, deviceContext);
- EventsTimeCounter.markEnd(eventIdentifier);
- } else if (multipartData instanceof GroupDescStatsUpdated) {
- processGroupDescStats((Iterable<GroupDescStatsUpdated>) allMultipartData, deviceContext);
- } else if (multipartData instanceof MeterConfigStatsUpdated) {
- processMeterConfigStatsUpdated((Iterable<MeterConfigStatsUpdated>) allMultipartData, deviceContext);
- } else {
- isMultipartProcessed = Boolean.FALSE;
+
+ try {
+ if (multipartData instanceof GroupStatisticsUpdated) {
+ processGroupStatistics((Iterable<GroupStatisticsUpdated>) allMultipartData, deviceInfo, txFacade);
+ } else if (multipartData instanceof MeterStatisticsUpdated) {
+ processMetersStatistics((Iterable<MeterStatisticsUpdated>) allMultipartData, deviceInfo, txFacade);
+ } else if (multipartData instanceof NodeConnectorStatisticsUpdate) {
+ processNodeConnectorStatistics((Iterable<NodeConnectorStatisticsUpdate>) allMultipartData, deviceInfo, txFacade);
+ } else if (multipartData instanceof FlowTableStatisticsUpdate) {
+ processFlowTableStatistics((Iterable<FlowTableStatisticsUpdate>) allMultipartData, deviceInfo, txFacade);
+ } else if (multipartData instanceof QueueStatisticsUpdate) {
+ processQueueStatistics((Iterable<QueueStatisticsUpdate>) allMultipartData, deviceInfo, txFacade);
+ } else if (multipartData instanceof FlowsStatisticsUpdate) {
+ /* FlowStat Processing is realized by NettyThread only by initPhase, otherwise it is realized
+ * by MD-SAL thread */
+ return processFlowStatistics((Iterable<FlowsStatisticsUpdate>) allMultipartData, deviceInfo, txFacade, registry.getDeviceFlowRegistry(), initial, eventIdentifier);
+
+ } else if (multipartData instanceof GroupDescStatsUpdated) {
+ processGroupDescStats((Iterable<GroupDescStatsUpdated>) allMultipartData, deviceInfo, txFacade, registry.getDeviceGroupRegistry());
+ } else if (multipartData instanceof MeterConfigStatsUpdated) {
+ processMeterConfigStatsUpdated((Iterable<MeterConfigStatsUpdated>) allMultipartData, deviceInfo, txFacade, registry.getDeviceMeterRegistry());
+ } else {
+ isMultipartProcessed = Boolean.FALSE;
+ }
+ } catch (final Exception e) {
+ LOG.warn("stats processing of type {} for node {} failed during write-to-tx step",
+ type, deviceInfo.getNodeId(), e);
+ return Futures.immediateFailedFuture(e);