Merge "BUG 2221 : Add metering to ShardTransaction actor"
[controller.git] / opendaylight / md-sal / compatibility / sal-compatibility / src / main / java / org / opendaylight / controller / sal / compatibility / InventoryAndReadAdapter.java
index e2c13867754d187f26f9f9a1dc4414a2b4114572..560d8a1d3f379f13b7d4dedce3f45077caa6d1ad 100644 (file)
@@ -7,19 +7,11 @@
  */
 package org.opendaylight.controller.sal.compatibility;
 
+import com.google.common.base.Optional;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
 import com.google.common.collect.Iterables;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-
 import org.opendaylight.controller.md.sal.binding.util.TypeSafeDataReader;
 import org.opendaylight.controller.sal.binding.api.data.DataBrokerService;
 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
@@ -48,7 +40,9 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.ta
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.AggregateFlowStatisticsUpdate;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsData;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowsStatisticsUpdate;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowStatisticsFromFlowTableInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowStatisticsFromFlowTableInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowStatisticsFromFlowTableOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetFlowStatisticsFromFlowTableInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsListener;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsService;
@@ -62,6 +56,8 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev13
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.flow.table.statistics.FlowTableStatistics;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.topology.discovery.rev130819.FlowTopologyDiscoveryService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.topology.discovery.rev130819.Link;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionAware;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRef;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRemoved;
@@ -71,7 +67,6 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRemoved;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeUpdated;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.OpendaylightInventoryListener;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.model.statistics.types.rev130925.GenericStatistics;
@@ -85,19 +80,36 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.N
 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsListener;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.node.connector.statistics.and.port.number.map.NodeConnectorStatisticsAndPortNumberMap;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class InventoryAndReadAdapter implements IPluginInReadService, IPluginInInventoryService, OpendaylightInventoryListener, OpendaylightFlowStatisticsListener, OpendaylightFlowTableStatisticsListener, OpendaylightPortStatisticsListener {
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+public class InventoryAndReadAdapter implements IPluginInReadService, IPluginInInventoryService, OpendaylightFlowStatisticsListener, OpendaylightFlowTableStatisticsListener, OpendaylightPortStatisticsListener {
     private static final Logger LOG = LoggerFactory.getLogger(InventoryAndReadAdapter.class);
     private static final short OPENFLOWV10_TABLE_ID = 0;
+    private static final int SLEEP_FOR_NOTIFICATIONS_MILLIS = 500;
 
     private final InventoryNotificationProvider inventoryNotificationProvider = new InventoryNotificationProvider();
     private final Map<PathArgument,List<PathArgument>> nodeToNodeConnectorsMap = new ConcurrentHashMap<>();
     private List<IPluginOutInventoryService> inventoryPublisher = new CopyOnWriteArrayList<>();
     private List<IPluginOutReadService> statisticsPublisher = new CopyOnWriteArrayList<>();
+    private Cache<String, TransactionNotificationList<? extends TransactionAware>> txCache;
 
     private OpendaylightFlowTableStatisticsService flowTableStatisticsService;
     private OpendaylightPortStatisticsService nodeConnectorStatisticsService;
@@ -173,6 +185,7 @@ public class InventoryAndReadAdapter implements IPluginInReadService, IPluginInI
     public void startAdapter() {
         inventoryNotificationProvider.setDataProviderService(getDataProviderService());
         inventoryNotificationProvider.setInventoryPublisher(getInventoryPublisher());
+        txCache = CacheBuilder.newBuilder().expireAfterWrite(60L, TimeUnit.SECONDS).maximumSize(10000).build();
         // inventoryNotificationProvider.start();
     }
 
@@ -241,40 +254,109 @@ public class InventoryAndReadAdapter implements IPluginInReadService, IPluginInI
      * @param id Table id
      * @return Table contents, or null if not present
      */
-    private Table readConfigTable(final Node node, final short id) {
+    private Table readOperationalTable(final Node node, final short id) {
         final InstanceIdentifier<Table> tableRef = InstanceIdentifier.builder(Nodes.class)
-                .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class, InventoryMapping.toNodeKey(node))
+                .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class, NodeMapping.toNodeKey(node))
                 .augmentation(FlowCapableNode.class)
                 .child(Table.class, new TableKey(id))
                 .build();
 
-        return (Table) startChange().readConfigurationData(tableRef);
+        return (Table) startChange().readOperationalData(tableRef);
     }
 
     @Override
     public List<FlowOnNode> readAllFlow(final Node node, final boolean cached) {
-        final ArrayList<FlowOnNode> output = new ArrayList<>();
-        final Table table = readConfigTable(node, OPENFLOWV10_TABLE_ID);
-        if (table != null) {
-            final List<Flow> flows = table.getFlow();
-            LOG.trace("Number of flows installed in table 0 of node {} : {}", node, flows.size());
+        final ArrayList<FlowOnNode> ret= new ArrayList<>();
+        if (cached) {
+            final Table table = readOperationalTable(node, OPENFLOWV10_TABLE_ID);
+            if (table != null) {
+                final List<Flow> flows = table.getFlow();
+                LOG.trace("Number of flows installed in table 0 of node {} : {}", node, flows.size());
+
+                for (final Flow flow : flows) {
+                    final FlowStatisticsData statsFromDataStore = flow.getAugmentation(FlowStatisticsData.class);
+                    if (statsFromDataStore != null) {
+                        final FlowOnNode it = new FlowOnNode(ToSalConversionsUtils.toFlow(flow, node));
+                        ret.add(addFlowStats(it, statsFromDataStore.getFlowStatistics()));
+                    }
+                }
+            }
+        } else {
+            LOG.debug("readAllFlow cached:{}", cached);
+            GetAllFlowStatisticsFromFlowTableInput input =
+                new GetAllFlowStatisticsFromFlowTableInputBuilder()
+                    .setNode(NodeMapping.toNodeRef(node))
+                    .setTableId(new TableId(OPENFLOWV10_TABLE_ID))
+                    .build();
+
+            Future<RpcResult<GetAllFlowStatisticsFromFlowTableOutput>> future =
+                getFlowStatisticsService().getAllFlowStatisticsFromFlowTable(input);
+
+            RpcResult<GetAllFlowStatisticsFromFlowTableOutput> result = null;
+            try {
+                // having a blocking call is fine here, as we need to join
+                // the notifications and return the result
+                result = future.get();
+            } catch (Exception e) {
+               LOG.error("Exception in getAllFlowStatisticsFromFlowTable ", e);
+               return ret;
+            }
+
+            GetAllFlowStatisticsFromFlowTableOutput output = result.getResult();
+            if (output == null) {
+                return ret;
+            }
+
+            TransactionId transactionId = output.getTransactionId();
+            String cacheKey = buildCacheKey(transactionId, NodeMapping.toNodeId(node));
+            LOG.info("readAllFlow transactionId:{} cacheKey:{}", transactionId, cacheKey);
 
-            for (final Flow flow : flows) {
-                final FlowStatisticsData statsFromDataStore = flow.getAugmentation(FlowStatisticsData.class);
-                if (statsFromDataStore != null) {
-                    final FlowOnNode it = new FlowOnNode(ToSalConversionsUtils.toFlow(flow, node));
-                    output.add(addFlowStats(it, statsFromDataStore.getFlowStatistics()));
+            // insert an entry in tempcache, will get updated when notification is received
+            txCache.put(cacheKey, new TransactionNotificationList<FlowsStatisticsUpdate>(
+                transactionId, node.getNodeIDString()));
+
+            TransactionNotificationList<FlowsStatisticsUpdate> txnList =
+                (TransactionNotificationList<FlowsStatisticsUpdate>) txCache.getIfPresent(cacheKey);
+
+            // this loop would not be infinite as the cache will remove an entry
+            // after defined time if not written to
+            while (txnList != null && !txnList.areAllNotificationsGathered()) {
+                LOG.debug("readAllFlow waiting for notification...");
+                waitForNotification();
+                txnList = (TransactionNotificationList<FlowsStatisticsUpdate>) txCache.getIfPresent(cacheKey);
+            }
+
+            if (txnList == null) {
+                return ret;
+            }
+
+            List<FlowsStatisticsUpdate> notifications = txnList.getNotifications();
+            for (FlowsStatisticsUpdate flowsStatisticsUpdate : notifications) {
+                List<FlowAndStatisticsMapList> flowAndStatisticsMapList = flowsStatisticsUpdate.getFlowAndStatisticsMapList();
+                if (flowAndStatisticsMapList != null) {
+                    for (FlowAndStatisticsMapList flowAndStatistics : flowAndStatisticsMapList) {
+                        final FlowOnNode it = new FlowOnNode(ToSalConversionsUtils.toFlow(flowAndStatistics, node));
+                        ret.add(addFlowStats(it, flowAndStatistics));
+                    }
                 }
             }
         }
+        return ret;
+    }
 
-        // TODO (main): Shall we send request to the switch? It will make async request to the switch.
-        // Once the plugin receives a response, it will let the adaptor know through onFlowStatisticsUpdate()
-        // If we assume that md-sal statistics manager will always be running, then it is not required
-        // But if not, then sending request will collect the latest data for adaptor at least.
-        getFlowStatisticsService().getAllFlowsStatisticsFromAllFlowTables(
-                new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder().setNode(NodeMapping.toNodeRef(node)).build());
-        return output;
+    private String buildCacheKey(final TransactionId id, final NodeId nodeId) {
+        return String.valueOf(id.getValue()) + "-" + nodeId.getValue();
+    }
+
+    private void waitForNotification() {
+        try {
+            // going for a simple sleep approach,as wait-notify on a monitor would require
+            // us to maintain monitors per txn-node combo
+            Thread.sleep(SLEEP_FOR_NOTIFICATIONS_MILLIS);
+            LOG.trace("statCollector is waking up from a wait stat Response sleep");
+        } catch (final InterruptedException e) {
+            LOG.warn("statCollector has been interrupted waiting stat Response sleep", e);
+        }
     }
 
     @Override
@@ -334,7 +416,7 @@ public class InventoryAndReadAdapter implements IPluginInReadService, IPluginInI
     @Override
     public FlowOnNode readFlow(final Node node, final org.opendaylight.controller.sal.flowprogrammer.Flow targetFlow, final boolean cached) {
         FlowOnNode ret = null;
-        final Table table = readConfigTable(node, OPENFLOWV10_TABLE_ID);
+        final Table table = readOperationalTable(node, OPENFLOWV10_TABLE_ID);
         if (table != null) {
             final List<Flow> flows = table.getFlow();
             InventoryAndReadAdapter.LOG.trace("Number of flows installed in table 0 of node {} : {}", node, flows.size());
@@ -386,7 +468,7 @@ public class InventoryAndReadAdapter implements IPluginInReadService, IPluginInI
     @Override
     public NodeTableStatistics readNodeTable(final NodeTable nodeTable, final boolean cached) {
         NodeTableStatistics nodeStats = null;
-        final Table table = readConfigTable(nodeTable.getNode(), (short) nodeTable.getID());
+        final Table table = readOperationalTable(nodeTable.getNode(), (short) nodeTable.getID());
         if (table != null) {
             final FlowTableStatisticsData tableStats = table.getAugmentation(FlowTableStatisticsData.class);
             if (tableStats != null) {
@@ -405,13 +487,11 @@ public class InventoryAndReadAdapter implements IPluginInReadService, IPluginInI
         return nodeStats;
     }
 
-    @Override
-    public void onNodeConnectorRemoved(final NodeConnectorRemoved update) {
+    public void onNodeConnectorRemovedInternal(final NodeConnectorRemoved update) {
         // Never received
     }
 
-    @Override
-    public void onNodeRemoved(final NodeRemoved notification) {
+    public void onNodeRemovedInternal(final NodeRemoved notification) {
         this.removeNodeConnectors(notification.getNodeRef().getValue());
         try {
             final Node aDNode = NodeMapping.toADNode(notification.getNodeRef());
@@ -421,8 +501,7 @@ public class InventoryAndReadAdapter implements IPluginInReadService, IPluginInI
         }
     }
 
-    @Override
-    public void onNodeConnectorUpdated(final NodeConnectorUpdated update) {
+    public void onNodeConnectorUpdatedInternal(final NodeConnectorUpdated update) {
         final NodeConnectorRef ref = update.getNodeConnectorRef();
         final UpdateType updateType;
         if (!this.isKnownNodeConnector(ref.getValue())) {
@@ -441,8 +520,7 @@ public class InventoryAndReadAdapter implements IPluginInReadService, IPluginInI
         }
     }
 
-    @Override
-    public void onNodeUpdated(final NodeUpdated notification) {
+    public void onNodeUpdatedInternal(final NodeUpdated notification) {
         final NodeRef ref = notification.getNodeRef();
 
         final UpdateType updateType;
@@ -635,6 +713,8 @@ public class InventoryAndReadAdapter implements IPluginInReadService, IPluginInI
         for (final IPluginOutReadService statsPublisher : getStatisticsPublisher()) {
             statsPublisher.nodeFlowStatisticsUpdated(aDNode, adsalFlowsStatistics);
         }
+
+        updateTransactionCache(notification, notification.getId(), !notification.isMoreReplies());
     }
 
     /**
@@ -790,4 +870,48 @@ public class InventoryAndReadAdapter implements IPluginInReadService, IPluginInI
     private List<PathArgument> removeNodeConnectors(final InstanceIdentifier<? extends Object> nodeIdentifier) {
         return this.nodeToNodeConnectorsMap.remove(Iterables.get(nodeIdentifier.getPathArguments(), 1));
     }
+
+    private <T extends TransactionAware> void updateTransactionCache(T notification, NodeId nodeId, boolean lastNotification) {
+
+        String cacheKey = buildCacheKey(notification.getTransactionId(), nodeId);
+        TransactionNotificationList<T> txnList = (TransactionNotificationList<T>) txCache.getIfPresent(cacheKey);
+        final Optional<TransactionNotificationList<T>> optional = Optional.<TransactionNotificationList<T>>fromNullable(txnList);
+        if (optional.isPresent()) {
+            LOG.info("updateTransactionCache cacheKey:{}, lastNotification:{}, txnList-present:{}", cacheKey, lastNotification, optional.isPresent());
+            TransactionNotificationList<T> txn = optional.get();
+            txn.addNotification(notification);
+            txn.setAllNotificationsGathered(lastNotification);
+        }
+    }
+
+    private class TransactionNotificationList<T extends TransactionAware> {
+        private TransactionId id;
+        private String nId;
+        private List<T> notifications;
+        private boolean allNotificationsGathered;
+
+        public TransactionNotificationList(TransactionId id, String nId) {
+            this.nId = nId;
+            this.id = id;
+            notifications = new ArrayList<T>();
+        }
+
+        public void addNotification(T notification) {
+            notifications.add(notification);
+        }
+
+        public void setAllNotificationsGathered(boolean allNotificationsGathered) {
+            this.allNotificationsGathered = allNotificationsGathered;
+        }
+
+        public boolean areAllNotificationsGathered() {
+            return allNotificationsGathered;
+        }
+
+        public List<T> getNotifications() {
+            return notifications;
+        }
+
+    }
+
 }