+ //Add statistics to local cache
+ ConcurrentMap<NodeId, NodeStatistics> cache = this.statisticsManager.getStatisticsCache();
+ if(!cache.containsKey(notification.getId())){
+ cache.put(notification.getId(), new NodeStatistics());
+ }
+
+ List<QueueIdAndStatisticsMap> queuesStats = notification.getQueueIdAndStatisticsMap();
+ for(QueueIdAndStatisticsMap swQueueStats : queuesStats){
+
+ if(!cache.get(notification.getId()).getNodeConnectorAndQueuesStatsMap().containsKey(swQueueStats.getNodeConnectorId())){
+ cache.get(notification.getId()).getNodeConnectorAndQueuesStatsMap().put(swQueueStats.getNodeConnectorId(), new HashMap<QueueId,GenericQueueStatistics>());
+ }
+
+ FlowCapableNodeConnectorQueueStatisticsDataBuilder queueStatisticsDataBuilder = new FlowCapableNodeConnectorQueueStatisticsDataBuilder();
+
+ FlowCapableNodeConnectorQueueStatisticsBuilder queueStatisticsBuilder = new FlowCapableNodeConnectorQueueStatisticsBuilder();
+
+ queueStatisticsBuilder.fieldsFrom(swQueueStats);
+
+ queueStatisticsDataBuilder.setFlowCapableNodeConnectorQueueStatistics(queueStatisticsBuilder.build());
+
+ cache.get(notification.getId()).getNodeConnectorAndQueuesStatsMap()
+ .get(swQueueStats.getNodeConnectorId())
+ .put(swQueueStats.getQueueId(), queueStatisticsBuilder.build());
+
+
+ DataModificationTransaction it = this.statisticsManager.startChange();