Merge "Bug-2098- sal-compatibility not get up-to-date flow information"
authorMoiz Raja <moraja@cisco.com>
Mon, 27 Oct 2014 23:20:21 +0000 (23:20 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Mon, 27 Oct 2014 23:20:21 +0000 (23:20 +0000)
opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/InventoryAndReadAdapter.java
opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/NodeMapping.java

index bbb6673a8e04cf25fa2b43a7fe8ba686b8e36690..560d8a1d3f379f13b7d4dedce3f45077caa6d1ad 100644 (file)
@@ -7,16 +7,10 @@
  */
 package org.opendaylight.controller.sal.compatibility;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArrayList;
+import com.google.common.base.Optional;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.Iterables;
 
 import org.opendaylight.controller.md.sal.binding.util.TypeSafeDataReader;
 import org.opendaylight.controller.sal.binding.api.data.DataBrokerService;
@@ -46,6 +40,9 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.ta
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.AggregateFlowStatisticsUpdate;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsData;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowsStatisticsUpdate;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowStatisticsFromFlowTableInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowStatisticsFromFlowTableInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowStatisticsFromFlowTableOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetFlowStatisticsFromFlowTableInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsListener;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsService;
@@ -59,6 +56,8 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev13
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.flow.table.statistics.FlowTableStatistics;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.topology.discovery.rev130819.FlowTopologyDiscoveryService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.topology.discovery.rev130819.Link;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionAware;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRef;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRemoved;
@@ -81,21 +80,36 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.N
 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsListener;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.node.connector.statistics.and.port.number.map.NodeConnectorStatisticsAndPortNumberMap;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Iterables;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 public class InventoryAndReadAdapter implements IPluginInReadService, IPluginInInventoryService, OpendaylightFlowStatisticsListener, OpendaylightFlowTableStatisticsListener, OpendaylightPortStatisticsListener {
     private static final Logger LOG = LoggerFactory.getLogger(InventoryAndReadAdapter.class);
     private static final short OPENFLOWV10_TABLE_ID = 0;
+    private static final int SLEEP_FOR_NOTIFICATIONS_MILLIS = 500;
 
     private final InventoryNotificationProvider inventoryNotificationProvider = new InventoryNotificationProvider();
     private final Map<PathArgument,List<PathArgument>> nodeToNodeConnectorsMap = new ConcurrentHashMap<>();
     private List<IPluginOutInventoryService> inventoryPublisher = new CopyOnWriteArrayList<>();
     private List<IPluginOutReadService> statisticsPublisher = new CopyOnWriteArrayList<>();
+    private Cache<String, TransactionNotificationList<? extends TransactionAware>> txCache;
 
     private OpendaylightFlowTableStatisticsService flowTableStatisticsService;
     private OpendaylightPortStatisticsService nodeConnectorStatisticsService;
@@ -171,6 +185,7 @@ public class InventoryAndReadAdapter implements IPluginInReadService, IPluginInI
     public void startAdapter() {
         inventoryNotificationProvider.setDataProviderService(getDataProviderService());
         inventoryNotificationProvider.setInventoryPublisher(getInventoryPublisher());
+        txCache = CacheBuilder.newBuilder().expireAfterWrite(60L, TimeUnit.SECONDS).maximumSize(10000).build();
         // inventoryNotificationProvider.start();
     }
 
@@ -251,22 +266,97 @@ public class InventoryAndReadAdapter implements IPluginInReadService, IPluginInI
 
     @Override
     public List<FlowOnNode> readAllFlow(final Node node, final boolean cached) {
-        final ArrayList<FlowOnNode> output = new ArrayList<>();
-        final Table table = readOperationalTable(node, OPENFLOWV10_TABLE_ID);
-        if (table != null) {
-            final List<Flow> flows = table.getFlow();
-            LOG.trace("Number of flows installed in table 0 of node {} : {}", node, flows.size());
+        final ArrayList<FlowOnNode> ret= new ArrayList<>();
+        if (cached) {
+            final Table table = readOperationalTable(node, OPENFLOWV10_TABLE_ID);
+            if (table != null) {
+                final List<Flow> flows = table.getFlow();
+                LOG.trace("Number of flows installed in table 0 of node {} : {}", node, flows.size());
+
+                for (final Flow flow : flows) {
+                    final FlowStatisticsData statsFromDataStore = flow.getAugmentation(FlowStatisticsData.class);
+                    if (statsFromDataStore != null) {
+                        final FlowOnNode it = new FlowOnNode(ToSalConversionsUtils.toFlow(flow, node));
+                        ret.add(addFlowStats(it, statsFromDataStore.getFlowStatistics()));
+                    }
+                }
+            }
+        } else {
+            LOG.debug("readAllFlow cached:{}", cached);
+            GetAllFlowStatisticsFromFlowTableInput input =
+                new GetAllFlowStatisticsFromFlowTableInputBuilder()
+                    .setNode(NodeMapping.toNodeRef(node))
+                    .setTableId(new TableId(OPENFLOWV10_TABLE_ID))
+                    .build();
+
+            Future<RpcResult<GetAllFlowStatisticsFromFlowTableOutput>> future =
+                getFlowStatisticsService().getAllFlowStatisticsFromFlowTable(input);
 
-            for (final Flow flow : flows) {
-                final FlowStatisticsData statsFromDataStore = flow.getAugmentation(FlowStatisticsData.class);
-                if (statsFromDataStore != null) {
-                    final FlowOnNode it = new FlowOnNode(ToSalConversionsUtils.toFlow(flow, node));
-                    output.add(addFlowStats(it, statsFromDataStore.getFlowStatistics()));
+            RpcResult<GetAllFlowStatisticsFromFlowTableOutput> result = null;
+            try {
+                // having a blocking call is fine here, as we need to join
+                // the notifications and return the result
+                result = future.get();
+            } catch (Exception e) {
+               LOG.error("Exception in getAllFlowStatisticsFromFlowTable ", e);
+               return ret;
+            }
+
+            GetAllFlowStatisticsFromFlowTableOutput output = result.getResult();
+            if (output == null) {
+                return ret;
+            }
+
+            TransactionId transactionId = output.getTransactionId();
+            String cacheKey = buildCacheKey(transactionId, NodeMapping.toNodeId(node));
+            LOG.info("readAllFlow transactionId:{} cacheKey:{}", transactionId, cacheKey);
+
+            // insert an entry in tempcache, will get updated when notification is received
+            txCache.put(cacheKey, new TransactionNotificationList<FlowsStatisticsUpdate>(
+                transactionId, node.getNodeIDString()));
+
+            TransactionNotificationList<FlowsStatisticsUpdate> txnList =
+                (TransactionNotificationList<FlowsStatisticsUpdate>) txCache.getIfPresent(cacheKey);
+
+            // this loop would not be infinite as the cache will remove an entry
+            // after defined time if not written to
+            while (txnList != null && !txnList.areAllNotificationsGathered()) {
+                LOG.debug("readAllFlow waiting for notification...");
+                waitForNotification();
+                txnList = (TransactionNotificationList<FlowsStatisticsUpdate>) txCache.getIfPresent(cacheKey);
+            }
+
+            if (txnList == null) {
+                return ret;
+            }
+
+            List<FlowsStatisticsUpdate> notifications = txnList.getNotifications();
+            for (FlowsStatisticsUpdate flowsStatisticsUpdate : notifications) {
+                List<FlowAndStatisticsMapList> flowAndStatisticsMapList = flowsStatisticsUpdate.getFlowAndStatisticsMapList();
+                if (flowAndStatisticsMapList != null) {
+                    for (FlowAndStatisticsMapList flowAndStatistics : flowAndStatisticsMapList) {
+                        final FlowOnNode it = new FlowOnNode(ToSalConversionsUtils.toFlow(flowAndStatistics, node));
+                        ret.add(addFlowStats(it, flowAndStatistics));
+                    }
                 }
             }
         }
+        return ret;
+    }
+
+    private String buildCacheKey(final TransactionId id, final NodeId nodeId) {
+        return String.valueOf(id.getValue()) + "-" + nodeId.getValue();
+    }
 
-        return output;
+    private void waitForNotification() {
+        try {
+            // going for a simple sleep approach,as wait-notify on a monitor would require
+            // us to maintain monitors per txn-node combo
+            Thread.sleep(SLEEP_FOR_NOTIFICATIONS_MILLIS);
+            LOG.trace("statCollector is waking up from a wait stat Response sleep");
+        } catch (final InterruptedException e) {
+            LOG.warn("statCollector has been interrupted waiting stat Response sleep", e);
+        }
     }
 
     @Override
@@ -623,6 +713,8 @@ public class InventoryAndReadAdapter implements IPluginInReadService, IPluginInI
         for (final IPluginOutReadService statsPublisher : getStatisticsPublisher()) {
             statsPublisher.nodeFlowStatisticsUpdated(aDNode, adsalFlowsStatistics);
         }
+
+        updateTransactionCache(notification, notification.getId(), !notification.isMoreReplies());
     }
 
     /**
@@ -778,4 +870,48 @@ public class InventoryAndReadAdapter implements IPluginInReadService, IPluginInI
     private List<PathArgument> removeNodeConnectors(final InstanceIdentifier<? extends Object> nodeIdentifier) {
         return this.nodeToNodeConnectorsMap.remove(Iterables.get(nodeIdentifier.getPathArguments(), 1));
     }
+
+    private <T extends TransactionAware> void updateTransactionCache(T notification, NodeId nodeId, boolean lastNotification) {
+
+        String cacheKey = buildCacheKey(notification.getTransactionId(), nodeId);
+        TransactionNotificationList<T> txnList = (TransactionNotificationList<T>) txCache.getIfPresent(cacheKey);
+        final Optional<TransactionNotificationList<T>> optional = Optional.<TransactionNotificationList<T>>fromNullable(txnList);
+        if (optional.isPresent()) {
+            LOG.info("updateTransactionCache cacheKey:{}, lastNotification:{}, txnList-present:{}", cacheKey, lastNotification, optional.isPresent());
+            TransactionNotificationList<T> txn = optional.get();
+            txn.addNotification(notification);
+            txn.setAllNotificationsGathered(lastNotification);
+        }
+    }
+
+    private class TransactionNotificationList<T extends TransactionAware> {
+        private TransactionId id;
+        private String nId;
+        private List<T> notifications;
+        private boolean allNotificationsGathered;
+
+        public TransactionNotificationList(TransactionId id, String nId) {
+            this.nId = nId;
+            this.id = id;
+            notifications = new ArrayList<T>();
+        }
+
+        public void addNotification(T notification) {
+            notifications.add(notification);
+        }
+
+        public void setAllNotificationsGathered(boolean allNotificationsGathered) {
+            this.allNotificationsGathered = allNotificationsGathered;
+        }
+
+        public boolean areAllNotificationsGathered() {
+            return allNotificationsGathered;
+        }
+
+        public List<T> getNotifications() {
+            return notifications;
+        }
+
+    }
+
 }
index bcb2367e7a35685b8a660bf9b7d2a00c9effcb1b..2bc3e603096bc6c7ef75affef7b8214caf239b70 100644 (file)
@@ -10,11 +10,6 @@ package org.opendaylight.controller.sal.compatibility;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
-import java.math.BigInteger;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.List;
-import java.util.regex.Pattern;
 import org.opendaylight.controller.sal.common.util.Arguments;
 import org.opendaylight.controller.sal.core.AdvertisedBandwidth;
 import org.opendaylight.controller.sal.core.Bandwidth;
@@ -65,6 +60,12 @@ import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.math.BigInteger;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.regex.Pattern;
+
 public final class NodeMapping {
 
     private static final Logger LOG = LoggerFactory
@@ -167,7 +168,7 @@ public final class NodeMapping {
      * @param aDNode
      * @return
      */
-    private static NodeId toNodeId(org.opendaylight.controller.sal.core.Node aDNode) {
+    public static NodeId toNodeId(org.opendaylight.controller.sal.core.Node aDNode) {
         String targetPrefix = null;
         if (NodeIDType.OPENFLOW.equals(aDNode.getType())) {
                 targetPrefix = OPENFLOW_ID_PREFIX;