Handle nullable lists
[genius.git] / interfacemanager / interfacemanager-impl / src / main / java / org / opendaylight / genius / interfacemanager / pmcounters / NodeConnectorStatsImpl.java
index f1ab874db2a55df4717c0726cc27e49dd3d56551..3627b6a3bda20add7392705bbfaa7c460f8c5187 100644 (file)
@@ -7,31 +7,27 @@
  */
 package org.opendaylight.genius.interfacemanager.pmcounters;
 
+import static org.opendaylight.genius.interfacemanager.IfmUtil.nullToEmpty;
+import static org.opendaylight.infrautils.utils.concurrent.Executors.newListeningScheduledThreadPool;
+
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.JdkFutureAdapters;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import java.math.BigInteger;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import javax.annotation.Nonnull;
 import javax.annotation.PreDestroy;
 import javax.inject.Inject;
 import javax.inject.Singleton;
-
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.genius.datastoreutils.AsyncClusteredDataTreeChangeListenerBase;
@@ -43,6 +39,7 @@ import org.opendaylight.infrautils.metrics.Counter;
 import org.opendaylight.infrautils.metrics.Labeled;
 import org.opendaylight.infrautils.metrics.MetricDescriptor;
 import org.opendaylight.infrautils.metrics.MetricProvider;
+import org.opendaylight.infrautils.utils.UncheckedCloseable;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetFlowStatisticsInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetFlowStatisticsInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetFlowStatisticsOutput;
@@ -71,8 +68,8 @@ public class NodeConnectorStatsImpl extends AsyncClusteredDataTreeChangeListener
     private static final Logger LOG = LoggerFactory.getLogger(NodeConnectorStatsImpl.class);
 
     private static final int THREAD_POOL_SIZE = 4;
-    private final Set<BigInteger> nodes = ConcurrentHashMap.newKeySet();
-    Map<BigInteger, Set<Counter>> metricsCountersPerNodeMap = new ConcurrentHashMap<>();
+    private final Set<String> nodes = ConcurrentHashMap.newKeySet();
+    private final Map<String, Set<Counter>> metricsCountersPerNodeMap = new ConcurrentHashMap<>();
     private final OpendaylightDirectStatisticsService opendaylightDirectStatisticsService;
     private final ScheduledExecutorService portStatExecutorService;
     private final EntityOwnershipUtils entityOwnershipUtils;
@@ -100,8 +97,7 @@ public class NodeConnectorStatsImpl extends AsyncClusteredDataTreeChangeListener
         this.ifmConfig = ifmConfigObj;
         this.metricProvider = metricProvider;
         registerListener(LogicalDatastoreType.OPERATIONAL, dataBroker);
-        portStatExecutorService = Executors.newScheduledThreadPool(THREAD_POOL_SIZE,
-            getThreadFactory("Port Stats " + "Request Task"));
+        portStatExecutorService = newListeningScheduledThreadPool(THREAD_POOL_SIZE, "Port Stats Request Task", LOG);
     }
 
     @Override
@@ -118,7 +114,7 @@ public class NodeConnectorStatsImpl extends AsyncClusteredDataTreeChangeListener
     @PreDestroy
     public void close() {
         // close the nested counter objects for each node
-        metricsCountersPerNodeMap.values().forEach(counterSet -> counterSet.forEach(counter -> counter.close()));
+        metricsCountersPerNodeMap.values().forEach(counterSet -> counterSet.forEach(UncheckedCloseable::close));
     }
 
     /*
@@ -157,17 +153,14 @@ public class NodeConnectorStatsImpl extends AsyncClusteredDataTreeChangeListener
             if (LOG.isTraceEnabled()) {
                 LOG.trace("Requesting port stats - {}");
             }
-            for (BigInteger node : nodes) {
+            for (String node : nodes) {
                 LOG.trace("Requesting AllNodeConnectorStatistics and flow table statistics for node - {}", node);
                 // Call RPC to Get NodeConnector Stats for node
-                Future<RpcResult<GetNodeConnectorStatisticsOutput>> ncStatsFuture = opendaylightDirectStatisticsService
-                        .getNodeConnectorStatistics(buildGetNodeConnectorStatisticsInput(node));
-                //Create ListenableFuture to get RPC result asynchronously
-                ListenableFuture<RpcResult<GetNodeConnectorStatisticsOutput>> ncStatsListenableFuture =
-                        JdkFutureAdapters.listenInPoolThread(ncStatsFuture);
+                ListenableFuture<RpcResult<GetNodeConnectorStatisticsOutput>> ncStatsFuture =
+                        opendaylightDirectStatisticsService.getNodeConnectorStatistics(
+                            buildGetNodeConnectorStatisticsInput(node));
 
-                Futures.addCallback(ncStatsListenableFuture, new
-                        FutureCallback<RpcResult<GetNodeConnectorStatisticsOutput>>() {
+                Futures.addCallback(ncStatsFuture, new FutureCallback<RpcResult<GetNodeConnectorStatisticsOutput>>() {
 
                     @Override
                     public void onFailure(@Nonnull Throwable error) {
@@ -190,14 +183,10 @@ public class NodeConnectorStatsImpl extends AsyncClusteredDataTreeChangeListener
                 }, MoreExecutors.directExecutor());
 
                 // Call RPC to Get flow stats for node
-                Future<RpcResult<GetFlowStatisticsOutput>>  flowStatsFuture = opendaylightDirectStatisticsService
-                        .getFlowStatistics(buildGetFlowStatisticsInput(node));
-                //Create ListenableFuture to get RPC result asynchronously
-                ListenableFuture<RpcResult<GetFlowStatisticsOutput>> flowStatsListenableFuture =
-                        JdkFutureAdapters.listenInPoolThread(flowStatsFuture);
+                ListenableFuture<RpcResult<GetFlowStatisticsOutput>> flowStatsFuture =
+                        opendaylightDirectStatisticsService.getFlowStatistics(buildGetFlowStatisticsInput(node));
 
-                Futures.addCallback(flowStatsListenableFuture, new
-                        FutureCallback<RpcResult<GetFlowStatisticsOutput>>() {
+                Futures.addCallback(flowStatsFuture, new FutureCallback<RpcResult<GetFlowStatisticsOutput>>() {
 
                     @Override
                     public void onFailure(@Nonnull Throwable error) {
@@ -239,32 +228,24 @@ public class NodeConnectorStatsImpl extends AsyncClusteredDataTreeChangeListener
         /**
          * This method builds GetNodeConnectorStatisticsInput which is input for NodeConnectorStatistics RPC.
          */
-        private GetNodeConnectorStatisticsInput buildGetNodeConnectorStatisticsInput(BigInteger dpId) {
+        private GetNodeConnectorStatisticsInput buildGetNodeConnectorStatisticsInput(String dpId) {
             return new GetNodeConnectorStatisticsInputBuilder()
                     .setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class)
-                            .child(Node.class, new NodeKey(new NodeId("openflow:" + dpId.toString()))).build()))
+                            .child(Node.class, new NodeKey(new NodeId("openflow:" + dpId))).build()))
                     .build();
         }
 
         /**
          * This method builds GetFlowStatisticsInput which is input for FlowStatistics RPC.
          */
-        private GetFlowStatisticsInput buildGetFlowStatisticsInput(BigInteger dpId) {
+        private GetFlowStatisticsInput buildGetFlowStatisticsInput(String dpId) {
             return new GetFlowStatisticsInputBuilder()
                     .setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class)
-                            .child(Node.class, new NodeKey(new NodeId("openflow:" + dpId.toString()))).build()))
+                            .child(Node.class, new NodeKey(new NodeId("openflow:" + dpId))).build()))
                     .build();
         }
     }
 
-    private ThreadFactory getThreadFactory(String threadNameFormat) {
-        ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
-        builder.setNameFormat(threadNameFormat);
-        builder.setUncaughtExceptionHandler((thread, exception) -> LOG
-                .error("Received Uncaught Exception event in Thread: {}", thread.getName(), exception));
-        return builder.build();
-    }
-
     /**
      * This method processes NodeConnectorStatistics RPC result.
      * It performs:
@@ -273,18 +254,18 @@ public class NodeConnectorStatsImpl extends AsyncClusteredDataTreeChangeListener
      * - set counter with values fetched from NodeConnectorStatistics
      */
     private void processNodeConnectorStatistics(GetNodeConnectorStatisticsOutput nodeConnectorStatisticsOutput,
-                                                BigInteger dpid) {
+                                                String dpid) {
         String port = "";
         String portUuid = "";
         List<NodeConnectorStatisticsAndPortNumberMap> ncStatsAndPortMapList = nodeConnectorStatisticsOutput
                         .getNodeConnectorStatisticsAndPortNumberMap();
         // Parse NodeConnectorStatistics and create/update counters for them
-        for (NodeConnectorStatisticsAndPortNumberMap ncStatsAndPortMap : ncStatsAndPortMapList) {
+        for (NodeConnectorStatisticsAndPortNumberMap ncStatsAndPortMap : nullToEmpty(ncStatsAndPortMapList)) {
             NodeConnectorId nodeConnector = ncStatsAndPortMap.getNodeConnectorId();
-            LOG.trace("Create/update metric counter for NodeConnector: {} of node: {}", nodeConnector, dpid.toString());
+            LOG.trace("Create/update metric counter for NodeConnector: {} of node: {}", nodeConnector, dpid);
             port = nodeConnector.getValue();
             // update port name as per port name maintained in portNameCache
-            String portNameInCache = "openflow" + ":" + dpid.toString() + ":" + port;
+            String portNameInCache = "openflow" + ":" + dpid + ":" + port;
             java.util.Optional<String> portName = portNameCache.get(portNameInCache);
             if (portName.isPresent()) {
                 Optional<List<InterfaceChildEntry>> interfaceChildEntries = interfaceChildCache
@@ -343,16 +324,16 @@ public class NodeConnectorStatsImpl extends AsyncClusteredDataTreeChangeListener
      * - creates/updates Flow table counters using Infrautils metrics API
      * - set counter with values fetched from FlowStatistics
      */
-    private void processFlowStatistics(GetFlowStatisticsOutput flowStatsOutput, BigInteger dpid) {
+    private void processFlowStatistics(GetFlowStatisticsOutput flowStatsOutput, String dpid) {
         Map<Short, AtomicInteger> flowTableMap = new HashMap<>();
         // Get all flows for node from RPC result
         List<FlowAndStatisticsMapList> flowTableAndStatisticsMapList = flowStatsOutput.getFlowAndStatisticsMapList();
-        for (FlowAndStatisticsMapList flowAndStatisticsMap : flowTableAndStatisticsMapList) {
+        for (FlowAndStatisticsMapList flowAndStatisticsMap : nullToEmpty(flowTableAndStatisticsMapList)) {
             short tableId = flowAndStatisticsMap.getTableId();
             // populate map to maintain flow count per table
             flowTableMap.computeIfAbsent(tableId, key -> new AtomicInteger(0)).incrementAndGet();
         }
-        LOG.trace("FlowTableStatistics (tableId:counter): {} for node: {}", flowTableMap.entrySet(), dpid.toString());
+        LOG.trace("FlowTableStatistics (tableId:counter): {} for node: {}", flowTableMap.entrySet(), dpid);
         for (Map.Entry<Short, AtomicInteger> flowTable : flowTableMap.entrySet()) {
             Short tableId = flowTable.getKey();
             AtomicInteger flowCount = flowTable.getValue();
@@ -373,7 +354,7 @@ public class NodeConnectorStatsImpl extends AsyncClusteredDataTreeChangeListener
      * @param tableId table-id value of switch
      * @return counter object
      */
-    private Counter getCounter(String counterName, BigInteger switchId, String port, String aliasId, String tableId) {
+    private Counter getCounter(String counterName, String switchId, String port, String aliasId, String tableId) {
         /*
          * Pattern to be followed for key generation:
          *
@@ -389,7 +370,7 @@ public class NodeConnectorStatsImpl extends AsyncClusteredDataTreeChangeListener
                         CounterConstants.LBL_KEY_ENTITY_TYPE, CounterConstants.LBL_KEY_SWITCHID,
                         CounterConstants.LBL_KEY_PORTID, CounterConstants.LBL_KEY_ALIASID,
                         CounterConstants.LBL_KEY_COUNTER_NAME);
-            counter = labeledCounter.label(CounterConstants.LBL_VAL_ENTITY_TYPE_PORT).label(switchId.toString())
+            counter = labeledCounter.label(CounterConstants.LBL_VAL_ENTITY_TYPE_PORT).label(switchId)
                     .label(port).label(aliasId).label(counterName);
         }
         if (tableId != null) {
@@ -398,7 +379,7 @@ public class NodeConnectorStatsImpl extends AsyncClusteredDataTreeChangeListener
                         .module("interfacemanager").id(CounterConstants.CNT_TYPE_ENTITY_CNT_ID).build(),
                         CounterConstants.LBL_KEY_ENTITY_TYPE, CounterConstants.LBL_KEY_SWITCHID,
                         CounterConstants.LBL_KEY_FLOWTBLID, CounterConstants.LBL_KEY_COUNTER_NAME);
-            counter = labeledCounter.label(CounterConstants.LBL_VAL_ENTITY_TYPE_FLOWTBL).label(switchId.toString())
+            counter = labeledCounter.label(CounterConstants.LBL_VAL_ENTITY_TYPE_FLOWTBL).label(switchId)
                     .label(tableId).label(counterName);
         }
 
@@ -427,15 +408,14 @@ public class NodeConnectorStatsImpl extends AsyncClusteredDataTreeChangeListener
     @Override
     protected void remove(InstanceIdentifier<Node> identifier, Node node) {
         NodeId nodeId = node.getId();
-        String nodeVal = nodeId.getValue().split(":")[1];
-        BigInteger dpId = new BigInteger(nodeVal);
+        String dpId = nodeId.getValue().split(":")[1];
         if (nodes.contains(dpId)) {
             nodes.remove(dpId);
             // remove counters set from node
             Set<Counter> nodeMetricCounterSet = metricsCountersPerNodeMap.remove(dpId);
             if (nodeMetricCounterSet != null) {
                 // remove counters
-                nodeMetricCounterSet.forEach(counter -> counter.close());
+                nodeMetricCounterSet.forEach(UncheckedCloseable::close);
             }
         }
         if (nodes.size() > 0) {
@@ -456,7 +436,7 @@ public class NodeConnectorStatsImpl extends AsyncClusteredDataTreeChangeListener
         NodeId nodeId = node.getId();
         if (entityOwnershipUtils.isEntityOwner(IfmConstants.SERVICE_ENTITY_TYPE, nodeId.getValue())) {
             LOG.trace("Locally connected switch {}",nodeId.getValue());
-            BigInteger dpId = new BigInteger(nodeId.getValue().split(":")[1]);
+            String dpId = nodeId.getValue().split(":")[1];
             if (nodes.contains(dpId)) {
                 return;
             }