Migrate ListenableFutures.addErrorLogging() users
[netvirt.git] / qosservice / impl / src / main / java / org / opendaylight / netvirt / qosservice / QosAlertManager.java
index b302d29bca69ea5ec1e5dcf94b528610ee288d3c..3cf86728527b9d66688e17d27fa7612640807dcf 100644 (file)
@@ -8,12 +8,15 @@
 
 package org.opendaylight.netvirt.qosservice;
 
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.MoreExecutors;
-import java.math.BigInteger;
-import java.util.List;
+import static org.opendaylight.genius.infra.Datastore.CONFIGURATION;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
@@ -23,17 +26,17 @@ import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
 import javax.inject.Inject;
 import javax.inject.Singleton;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.apache.felix.service.command.CommandSession;
+import org.opendaylight.genius.infra.ManagedNewTransactionRunner;
+import org.opendaylight.genius.infra.ManagedNewTransactionRunnerImpl;
 import org.opendaylight.genius.interfacemanager.globals.IfmConstants;
-import org.opendaylight.genius.mdsalutil.MDSALUtil;
-import org.opendaylight.netvirt.neutronvpn.interfaces.INeutronVpnManager;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.Uuid;
+import org.opendaylight.genius.interfacemanager.globals.InterfaceInfo;
+import org.opendaylight.genius.interfacemanager.interfaces.IInterfaceManager;
+import org.opendaylight.infrautils.utils.concurrent.LoggingFutures;
+import org.opendaylight.mdsal.binding.api.DataBroker;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetNodeConnectorStatisticsInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetNodeConnectorStatisticsOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.OpendaylightDirectStatisticsService;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
@@ -41,12 +44,12 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.N
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.qosalert.config.rev170301.QosalertConfig;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netvirt.qosalert.config.rev170301.QosalertConfigBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.neutron.networks.rev150712.networks.attributes.networks.Network;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.neutron.ports.rev150712.ports.attributes.ports.Port;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.node.connector.statistics.and.port.number.map.NodeConnectorStatisticsAndPortNumberMap;
-import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.node.connector.statistics.and.port.number.map.NodeConnectorStatisticsAndPortNumberMapKey;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.Uint64;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,31 +58,19 @@ import org.slf4j.LoggerFactory;
 public final class QosAlertManager implements Runnable {
     private static final Logger LOG = LoggerFactory.getLogger(QosAlertManager.class);
 
-    private static final FutureCallback<Void> DEFAULT_FUTURE_CALLBACK = new FutureCallback<Void>() {
-        @Override
-        public void onSuccess(Void result) {
-            LOG.debug("Datastore operation completed successfully");
-        }
-
-        @Override
-        public void onFailure(Throwable error) {
-            LOG.error("Error in datastore operation {}", error);
-        }
-
-    };
-
     private volatile boolean alertEnabled;
     private volatile int pollInterval;
     private volatile Thread thread;
     private volatile boolean statsPollThreadStart;
 
-    private final DataBroker dataBroker;
+    private final ManagedNewTransactionRunner txRunner;
     private final QosalertConfig defaultConfig;
     private final OpendaylightDirectStatisticsService odlDirectStatisticsService;
     private final QosNeutronUtils qosNeutronUtils;
     private final QosEosHandler qosEosHandler;
-    private final INeutronVpnManager neutronVpnManager;
-    private final ConcurrentMap<BigInteger, ConcurrentMap<String, QosAlertPortData>> qosAlertDpnPortNumberMap =
+    private final IInterfaceManager interfaceManager;
+    private final Set unprocessedInterfaceIds = ConcurrentHashMap.newKeySet();
+    private final ConcurrentMap<Uint64, ConcurrentMap<String, QosAlertPortData>> qosAlertDpnPortNumberMap =
             new ConcurrentHashMap<>();
     private final AlertThresholdSupplier alertThresholdSupplier = new AlertThresholdSupplier();
 
@@ -87,16 +78,14 @@ public final class QosAlertManager implements Runnable {
     public QosAlertManager(final DataBroker dataBroker,
             final OpendaylightDirectStatisticsService odlDirectStatisticsService, final QosalertConfig defaultConfig,
             final QosNeutronUtils qosNeutronUtils, final QosEosHandler qosEosHandler,
-            final INeutronVpnManager neutronVpnManager) {
-
-        LOG.debug("{} created",  getClass().getSimpleName());
-        this.dataBroker = dataBroker;
+            final IInterfaceManager interfaceManager) {
+        this.txRunner = new ManagedNewTransactionRunnerImpl(dataBroker);
         this.odlDirectStatisticsService = odlDirectStatisticsService;
+        this.interfaceManager = interfaceManager;
         this.defaultConfig = defaultConfig;
         this.qosNeutronUtils = qosNeutronUtils;
         this.qosEosHandler = qosEosHandler;
-        this.neutronVpnManager = neutronVpnManager;
-        LOG.debug("QosAlert default config poll alertEnabled:{} threshold:{} pollInterval:{}",
+        LOG.trace("QosAlert default config poll alertEnabled:{} threshold:{} pollInterval:{}",
                 defaultConfig.isQosAlertEnabled(), defaultConfig.getQosDropPacketThreshold(),
                 defaultConfig.getQosAlertPollInterval());
         getDefaultConfig();
@@ -108,7 +97,7 @@ public final class QosAlertManager implements Runnable {
         qosAlertDpnPortNumberMap.clear();
         statsPollThreadStart = true;
         startStatsPollThread();
-        LOG.debug("{} init done", getClass().getSimpleName());
+        LOG.trace("{} init done", getClass().getSimpleName());
     }
 
     @PreDestroy
@@ -117,7 +106,7 @@ public final class QosAlertManager implements Runnable {
         if (thread != null) {
             thread.interrupt();
         }
-        LOG.debug("{} close done", getClass().getSimpleName());
+        LOG.trace("{} close done", getClass().getSimpleName());
     }
 
     private void setQosAlertOwner(boolean isOwner) {
@@ -134,7 +123,7 @@ public final class QosAlertManager implements Runnable {
     public void run() {
         LOG.debug("Qos alert poll thread started");
         while (statsPollThreadStart && alertEnabled) {
-            LOG.debug("Thread loop polling :{} threshold:{} pollInterval:{}",
+            LOG.trace("Thread loop polling :{} threshold:{} pollInterval:{}",
                     alertEnabled, alertThresholdSupplier.get(), pollInterval);
 
             try {
@@ -159,9 +148,9 @@ public final class QosAlertManager implements Runnable {
 
     private void getDefaultConfig() {
         alertEnabled = defaultConfig.isQosAlertEnabled();
-        pollInterval = defaultConfig.getQosAlertPollInterval();
+        pollInterval = defaultConfig.getQosAlertPollInterval().toJava();
 
-        alertThresholdSupplier.set(defaultConfig.getQosDropPacketThreshold().shortValue());
+        alertThresholdSupplier.set(defaultConfig.getQosDropPacketThreshold().toJava());
     }
 
     public void setQosalertConfig(QosalertConfig config) {
@@ -171,7 +160,7 @@ public final class QosAlertManager implements Runnable {
                 config.getQosAlertPollInterval());
 
         alertEnabled = config.isQosAlertEnabled().booleanValue();
-        pollInterval = config.getQosAlertPollInterval();
+        pollInterval = config.getQosAlertPollInterval().toJava();
 
         alertThresholdSupplier.set(config.getQosDropPacketThreshold().shortValue());
 
@@ -208,118 +197,121 @@ public final class QosAlertManager implements Runnable {
         writeConfigDataStore(enable, alertThresholdSupplier.get().shortValue(), pollInterval);
     }
 
-    public void addToQosAlertCache(Port port) {
-        LOG.trace("Adding port {} in cache", port.getUuid());
-
-        BigInteger dpnId = qosNeutronUtils.getDpnForInterface(port.getUuid().getValue());
-
-        if (dpnId.equals(BigInteger.ZERO)) {
-            LOG.debug("DPN ID for port {} not found", port.getUuid());
-            return;
+    public void addInterfaceIdInQoSAlertCache(String ifaceId) {
+        LOG.trace("Adding interface id {} in cache", ifaceId);
+        InterfaceInfo interfaceInfo =
+                interfaceManager.getInterfaceInfoFromOperationalDataStore(ifaceId);
+        if (interfaceInfo == null) {
+            LOG.debug("Interface not found {}. Added in cache now to process later ", ifaceId);
+            unprocessedInterfaceIds.add(ifaceId);
+        } else {
+            addToQosAlertCache(interfaceInfo);
         }
-
-        String portNumber = qosNeutronUtils.getPortNumberForInterface(port.getUuid().getValue());
-
-        LOG.trace("Adding DPN ID {} with port {} port number {}", dpnId, port.getUuid(), portNumber);
-
-        qosAlertDpnPortNumberMap.computeIfAbsent(dpnId, key -> new ConcurrentHashMap<>())
-                .put(portNumber, new QosAlertPortData(port, qosNeutronUtils, alertThresholdSupplier));
     }
 
-    public void addToQosAlertCache(Network network) {
-        LOG.trace("Adding network {} in cache", network.getUuid());
+    public void displayConfig(CommandSession session) {
 
-        List<Uuid> subnetIds = qosNeutronUtils.getSubnetIdsFromNetworkId(network.getUuid());
+        session.getConsole().println("Qos Alert Configuration Details");
+        session.getConsole().println("Threshold: " + alertThresholdSupplier.get().shortValue());
+        session.getConsole().println("AlertEnabled: " + alertEnabled);
+        session.getConsole().println("Poll Interval: " + pollInterval);
 
-        for (Uuid subnetId : subnetIds) {
-            List<Uuid> portIds = qosNeutronUtils.getPortIdsFromSubnetId(subnetId);
-            for (Uuid portId : portIds) {
-                Port port = neutronVpnManager.getNeutronPort(portId);
-                if (port != null && !qosNeutronUtils.portHasQosPolicy(port)) {
-                    LOG.trace("Adding network {} port {} in cache", network.getUuid(), port.getUuid());
-                    addToQosAlertCache(port);
+        Uint64 dpnId;
+        String portData;
+        Gson gson = new GsonBuilder().setPrettyPrinting().create();
+        if (qosAlertDpnPortNumberMap.isEmpty()) {
+            session.getConsole().println("\nQosAlert Cache not found\n");
+            return;
+        } else {
+            session.getConsole().println("\nDPN Map");
+            JsonObject jsonObject;
+            JsonArray jsonArray;
+            JsonArray jsonArrayOuter = new JsonArray();
+            for (Entry<Uint64, ConcurrentMap<String, QosAlertPortData>> dpnEntry
+                    : qosAlertDpnPortNumberMap.entrySet()) {
+                dpnId = dpnEntry.getKey();
+                jsonObject = new JsonObject();
+                jsonObject.addProperty("DpnId", dpnId.toString());
+                ConcurrentMap<String, QosAlertPortData> portInnerMap = qosAlertDpnPortNumberMap.get(dpnId);
+                jsonArray = new JsonArray();
+                for (ConcurrentMap.Entry<String, QosAlertPortData> portEntry : portInnerMap.entrySet()) {
+                    portData = "Port_number: " + portEntry.getKey() + ", " + portEntry.getValue();
+                    jsonArray.add(portData);
                 }
+                jsonObject.add("QosAlertPortData Cache", jsonArray);
+                jsonArrayOuter.add(jsonObject);
             }
+            session.getConsole().println(gson.toJson(jsonArrayOuter));
+            session.getConsole().println();
         }
     }
 
-    public void removeFromQosAlertCache(Port port) {
-        LOG.trace("Removing port {} from cache", port.getUuid());
+    public void processInterfaceUpEvent(String ifaceId) {
+        LOG.trace("processInterfaceUpEvent {}", ifaceId);
+        if (unprocessedInterfaceIds.remove(ifaceId)) {
+            addInterfaceIdInQoSAlertCache(ifaceId);
+        }
+    }
 
-        BigInteger dpnId = qosNeutronUtils.getDpnForInterface(port.getUuid().getValue());
+    private void addToQosAlertCache(InterfaceInfo interfaceInfo) {
+        Uint64 dpnId = interfaceInfo.getDpId();
+        if (dpnId.equals(Uint64.valueOf(0L))) {
+            LOG.warn("Interface {} could not be added to Qos Alert Cache because Dpn Id is not found",
+                    interfaceInfo.getInterfaceName());
+            return;
+        }
 
-        if (dpnId.equals(BigInteger.ZERO)) {
-            LOG.debug("DPN ID for port {} not found", port.getUuid());
+        Port port = qosNeutronUtils.getNeutronPort(interfaceInfo.getInterfaceName());
+        if (port == null) {
+            LOG.warn("Port {} not added to Qos Alert Cache because it is not found", interfaceInfo.getInterfaceName());
             return;
         }
 
-        String portNumber = qosNeutronUtils.getPortNumberForInterface(port.getUuid().getValue());
+        String portNumber = String.valueOf(interfaceInfo.getPortNo());
 
-        removeFromQosAlertCache(dpnId, portNumber);
-    }
+        LOG.trace("Adding DPN ID {} with port {} port number {}", dpnId, port.getUuid(), portNumber);
 
-    public void removeFromQosAlertCache(NodeConnectorId nodeConnectorId) {
-        LOG.trace("Removing node connector {} from cache", nodeConnectorId.getValue());
+        qosAlertDpnPortNumberMap.computeIfAbsent(dpnId, key -> new ConcurrentHashMap<>())
+                .put(portNumber, new QosAlertPortData(port, qosNeutronUtils, alertThresholdSupplier));
+    }
 
-        long nodeId = MDSALUtil.getDpnIdFromPortName(nodeConnectorId);
+    public void removeInterfaceIdFromQosAlertCache(String ifaceId) {
 
-        if (nodeId == -1) {
-            LOG.debug("Node ID for node connector {} not found", nodeConnectorId.getValue());
+        LOG.trace("If present, remove interface {} from cache", ifaceId);
+        unprocessedInterfaceIds.remove(ifaceId);
+        InterfaceInfo interfaceInfo =
+                interfaceManager.getInterfaceInfoFromOperationalDataStore(ifaceId);
+        if (interfaceInfo == null) {
             return;
         }
-
-        BigInteger dpnId = new BigInteger(String.valueOf(nodeId));
-
-        long portId = MDSALUtil.getOfPortNumberFromPortName(nodeConnectorId);
-
-        String portNumber = String.valueOf(portId);
-
+        Uint64 dpnId = interfaceInfo.getDpId();
+        String portNumber = String.valueOf(interfaceInfo.getPortNo());
         removeFromQosAlertCache(dpnId, portNumber);
     }
 
-    private void removeFromQosAlertCache(BigInteger dpnId, String portNumber) {
-        boolean removed = false;
-        ConcurrentMap<String, QosAlertPortData> portDataMap = qosAlertDpnPortNumberMap.get(dpnId);
-        if (portDataMap != null) {
-            removed = portDataMap.remove(portNumber) != null;
-            if (portDataMap.isEmpty()) {
-                LOG.trace("DPN {} empty. Removing from cache", dpnId);
-                qosAlertDpnPortNumberMap.remove(dpnId, portDataMap);
-            }
-        }
-
-        if (removed) {
-            LOG.trace("Removed DPN {} port number {} from cache", dpnId, portNumber);
-        } else {
-            LOG.trace("DPN {} port number {} not found in cache", dpnId, portNumber);
+    public void removeLowerLayerIfFromQosAlertCache(String lowerLayerIf) {
+        LOG.trace("If present, remove lowerLayerIf {} from cache", lowerLayerIf);
+        Uint64 dpnId = qosNeutronUtils.getDpnIdFromLowerLayerIf(lowerLayerIf);
+        String portNumber = qosNeutronUtils.getPortNumberFromLowerLayerIf(lowerLayerIf);
+        if (dpnId == null || portNumber == null) {
+            LOG.warn("Interface {} not in openflow:dpnid:portnum format, could not remove from cache", lowerLayerIf);
+            return;
         }
+        removeFromQosAlertCache(dpnId, portNumber);
     }
 
-    public void removeFromQosAlertCache(Network network) {
-        LOG.trace("Removing network {} from cache", network.getUuid());
-
-        List<Uuid> subnetIds = qosNeutronUtils.getSubnetIdsFromNetworkId(network.getUuid());
-
-        for (Uuid subnetId : subnetIds) {
-            List<Uuid> portIds = qosNeutronUtils.getPortIdsFromSubnetId(subnetId);
-            for (Uuid portId : portIds) {
-                Port port = neutronVpnManager.getNeutronPort(portId);
-                if (port != null && !qosNeutronUtils.portHasQosPolicy(port)) {
-                    LOG.trace("Removing network {} port {} from cache", network.getUuid(), port.getUuid());
-                    removeFromQosAlertCache(port);
-                }
+    private void removeFromQosAlertCache(Uint64 dpnId, String portNumber) {
+        if (qosAlertDpnPortNumberMap.containsKey(dpnId)
+                && qosAlertDpnPortNumberMap.get(dpnId).containsKey(portNumber)) {
+            qosAlertDpnPortNumberMap.get(dpnId).remove(portNumber);
+            LOG.trace("Removed interace {}:{} from cache", dpnId, portNumber);
+            if (qosAlertDpnPortNumberMap.get(dpnId).isEmpty()) {
+                LOG.trace("DPN {} empty. Removing dpn from cache", dpnId);
+                qosAlertDpnPortNumberMap.remove(dpnId);
             }
         }
     }
 
-    private static <T extends DataObject> void asyncWrite(LogicalDatastoreType datastoreType,
-                                                          InstanceIdentifier<T> path, T data, DataBroker broker,
-                                                          FutureCallback<Void> callback) {
-        WriteTransaction tx = broker.newWriteOnlyTransaction();
-        tx.put(datastoreType, path, data, WriteTransaction.CREATE_MISSING_PARENTS);
-        Futures.addCallback(tx.submit(), callback, MoreExecutors.directExecutor());
-    }
-
     private void writeConfigDataStore(boolean qosAlertEnabled, short dropPacketThreshold, int alertPollInterval) {
 
         InstanceIdentifier<QosalertConfig> path = InstanceIdentifier.builder(QosalertConfig.class).build();
@@ -330,15 +322,16 @@ public final class QosAlertManager implements Runnable {
                 .setQosAlertPollInterval(alertPollInterval)
                 .build();
 
-        asyncWrite(LogicalDatastoreType.CONFIGURATION, path, qosAlertConfig, dataBroker,
-                DEFAULT_FUTURE_CALLBACK);
+        LoggingFutures.addErrorLogging(txRunner.callWithNewWriteOnlyTransactionAndSubmit(CONFIGURATION,
+            tx -> tx.mergeParentStructurePut(path,
+                    qosAlertConfig)), LOG, "Error writing to the config data store");
     }
 
     private void pollDirectStatisticsForAllNodes() {
         LOG.trace("Polling direct statistics from nodes");
 
-        for (Entry<BigInteger, ConcurrentMap<String, QosAlertPortData>> entry : qosAlertDpnPortNumberMap.entrySet()) {
-            BigInteger dpn = entry.getKey();
+        for (Entry<Uint64, ConcurrentMap<String, QosAlertPortData>> entry : qosAlertDpnPortNumberMap.entrySet()) {
+            Uint64 dpn = entry.getKey();
             LOG.trace("Polling DPN ID {}", dpn);
             GetNodeConnectorStatisticsInputBuilder input = new GetNodeConnectorStatisticsInputBuilder()
                     .setNode(new NodeRef(InstanceIdentifier.builder(Nodes.class)
@@ -351,24 +344,30 @@ public final class QosAlertManager implements Runnable {
             try {
                 rpcResult = rpcResultFuture.get();
             } catch (InterruptedException | ExecutionException e) {
-                LOG.error("Exception {} occurred with node {} Direct-Statistics get", e, dpn);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Could not get Direct-Statistics for node {} Exception occurred ", dpn, e);
+                } else {
+                    LOG.info("Could not get Direct-Statistics for node {}", dpn);
+                }
             }
             if (rpcResult != null && rpcResult.isSuccessful() && rpcResult.getResult() != null) {
 
                 GetNodeConnectorStatisticsOutput nodeConnectorStatisticsOutput = rpcResult.getResult();
 
-                List<NodeConnectorStatisticsAndPortNumberMap> nodeConnectorStatisticsAndPortNumberMapList =
-                        nodeConnectorStatisticsOutput.getNodeConnectorStatisticsAndPortNumberMap();
+                Map<NodeConnectorStatisticsAndPortNumberMapKey, NodeConnectorStatisticsAndPortNumberMap>
+                        nodeConnectorStatisticsAndPortNumberMap =
+                        nodeConnectorStatisticsOutput.nonnullNodeConnectorStatisticsAndPortNumberMap();
 
                 ConcurrentMap<String, QosAlertPortData> portDataMap = entry.getValue();
-                for (NodeConnectorStatisticsAndPortNumberMap stats : nodeConnectorStatisticsAndPortNumberMapList) {
+                for (NodeConnectorStatisticsAndPortNumberMap stats
+                        : nodeConnectorStatisticsAndPortNumberMap.values()) {
                     QosAlertPortData portData = portDataMap.get(stats.getNodeConnectorId().getValue());
                     if (portData != null) {
                         portData.updatePortStatistics(stats);
                     }
                 }
             } else {
-                LOG.error("Direct-Statistics not available for node {}", dpn);
+                LOG.info("Direct-Statistics not available for node {}", dpn);
             }
 
         }
@@ -379,15 +378,15 @@ public final class QosAlertManager implements Runnable {
                 .forEach(QosAlertPortData::initPortData));
     }
 
-    private static class AlertThresholdSupplier implements Supplier<BigInteger> {
-        private volatile BigInteger alertThreshold = BigInteger.valueOf(0);
+    private static class AlertThresholdSupplier implements Supplier<Uint64> {
+        private volatile Uint64 alertThreshold = Uint64.valueOf(0);
 
         void set(short threshold) {
-            alertThreshold = BigInteger.valueOf(threshold);
+            alertThreshold = Uint64.valueOf(threshold);
         }
 
         @Override
-        public BigInteger get() {
+        public Uint64 get() {
             return alertThreshold;
         }
     }