X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fcompatibility%2Fsal-compatibility%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsal%2Fcompatibility%2FInventoryAndReadAdapter.java;fp=opendaylight%2Fmd-sal%2Fcompatibility%2Fsal-compatibility%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsal%2Fcompatibility%2FInventoryAndReadAdapter.java;h=560d8a1d3f379f13b7d4dedce3f45077caa6d1ad;hp=bbb6673a8e04cf25fa2b43a7fe8ba686b8e36690;hb=15b8c3b464cd1d61c2445cdfeafac2689a620c6d;hpb=1bc01a15b1e7811ee59249eab7e815408518e354 diff --git a/opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/InventoryAndReadAdapter.java b/opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/InventoryAndReadAdapter.java index bbb6673a8e..560d8a1d3f 100644 --- a/opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/InventoryAndReadAdapter.java +++ b/opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/InventoryAndReadAdapter.java @@ -7,16 +7,10 @@ */ package org.opendaylight.controller.sal.compatibility; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CopyOnWriteArrayList; +import com.google.common.base.Optional; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.collect.Iterables; import org.opendaylight.controller.md.sal.binding.util.TypeSafeDataReader; import org.opendaylight.controller.sal.binding.api.data.DataBrokerService; @@ -46,6 +40,9 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.ta import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.AggregateFlowStatisticsUpdate; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsData; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowsStatisticsUpdate; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowStatisticsFromFlowTableInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowStatisticsFromFlowTableInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowStatisticsFromFlowTableOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetFlowStatisticsFromFlowTableInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsListener; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsService; @@ -59,6 +56,8 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev13 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.flow.table.statistics.FlowTableStatistics; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.topology.discovery.rev130819.FlowTopologyDiscoveryService; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.topology.discovery.rev130819.Link; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionAware; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRef; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRemoved; @@ -81,21 +80,36 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.N import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsListener; import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsService; import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.node.connector.statistics.and.port.number.map.NodeConnectorStatisticsAndPortNumberMap; +import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.PathArgument; +import org.opendaylight.yangtools.yang.common.RpcResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Iterables; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; public class InventoryAndReadAdapter implements IPluginInReadService, IPluginInInventoryService, OpendaylightFlowStatisticsListener, OpendaylightFlowTableStatisticsListener, OpendaylightPortStatisticsListener { private static final Logger LOG = LoggerFactory.getLogger(InventoryAndReadAdapter.class); private static final short OPENFLOWV10_TABLE_ID = 0; + private static final int SLEEP_FOR_NOTIFICATIONS_MILLIS = 500; private final InventoryNotificationProvider inventoryNotificationProvider = new InventoryNotificationProvider(); private final Map> nodeToNodeConnectorsMap = new ConcurrentHashMap<>(); private List inventoryPublisher = new CopyOnWriteArrayList<>(); private List statisticsPublisher = new CopyOnWriteArrayList<>(); + private Cache> txCache; private OpendaylightFlowTableStatisticsService flowTableStatisticsService; private OpendaylightPortStatisticsService nodeConnectorStatisticsService; @@ -171,6 +185,7 @@ public class InventoryAndReadAdapter implements IPluginInReadService, IPluginInI public void startAdapter() { inventoryNotificationProvider.setDataProviderService(getDataProviderService()); inventoryNotificationProvider.setInventoryPublisher(getInventoryPublisher()); + txCache = CacheBuilder.newBuilder().expireAfterWrite(60L, TimeUnit.SECONDS).maximumSize(10000).build(); // inventoryNotificationProvider.start(); } @@ -251,22 +266,97 @@ public class InventoryAndReadAdapter implements IPluginInReadService, IPluginInI @Override public List readAllFlow(final Node node, final boolean cached) { - final ArrayList output = new ArrayList<>(); - final Table table = readOperationalTable(node, OPENFLOWV10_TABLE_ID); - if (table != null) { - final List flows = table.getFlow(); - LOG.trace("Number of flows installed in table 0 of node {} : {}", node, flows.size()); + final ArrayList ret= new ArrayList<>(); + if (cached) { + final Table table = readOperationalTable(node, OPENFLOWV10_TABLE_ID); + if (table != null) { + final List flows = table.getFlow(); + LOG.trace("Number of flows installed in table 0 of node {} : {}", node, flows.size()); + + for (final Flow flow : flows) { + final FlowStatisticsData statsFromDataStore = flow.getAugmentation(FlowStatisticsData.class); + if (statsFromDataStore != null) { + final FlowOnNode it = new FlowOnNode(ToSalConversionsUtils.toFlow(flow, node)); + ret.add(addFlowStats(it, statsFromDataStore.getFlowStatistics())); + } + } + } + } else { + LOG.debug("readAllFlow cached:{}", cached); + GetAllFlowStatisticsFromFlowTableInput input = + new GetAllFlowStatisticsFromFlowTableInputBuilder() + .setNode(NodeMapping.toNodeRef(node)) + .setTableId(new TableId(OPENFLOWV10_TABLE_ID)) + .build(); + + Future> future = + getFlowStatisticsService().getAllFlowStatisticsFromFlowTable(input); - for (final Flow flow : flows) { - final FlowStatisticsData statsFromDataStore = flow.getAugmentation(FlowStatisticsData.class); - if (statsFromDataStore != null) { - final FlowOnNode it = new FlowOnNode(ToSalConversionsUtils.toFlow(flow, node)); - output.add(addFlowStats(it, statsFromDataStore.getFlowStatistics())); + RpcResult result = null; + try { + // having a blocking call is fine here, as we need to join + // the notifications and return the result + result = future.get(); + } catch (Exception e) { + LOG.error("Exception in getAllFlowStatisticsFromFlowTable ", e); + return ret; + } + + GetAllFlowStatisticsFromFlowTableOutput output = result.getResult(); + if (output == null) { + return ret; + } + + TransactionId transactionId = output.getTransactionId(); + String cacheKey = buildCacheKey(transactionId, NodeMapping.toNodeId(node)); + LOG.info("readAllFlow transactionId:{} cacheKey:{}", transactionId, cacheKey); + + // insert an entry in tempcache, will get updated when notification is received + txCache.put(cacheKey, new TransactionNotificationList( + transactionId, node.getNodeIDString())); + + TransactionNotificationList txnList = + (TransactionNotificationList) txCache.getIfPresent(cacheKey); + + // this loop would not be infinite as the cache will remove an entry + // after defined time if not written to + while (txnList != null && !txnList.areAllNotificationsGathered()) { + LOG.debug("readAllFlow waiting for notification..."); + waitForNotification(); + txnList = (TransactionNotificationList) txCache.getIfPresent(cacheKey); + } + + if (txnList == null) { + return ret; + } + + List notifications = txnList.getNotifications(); + for (FlowsStatisticsUpdate flowsStatisticsUpdate : notifications) { + List flowAndStatisticsMapList = flowsStatisticsUpdate.getFlowAndStatisticsMapList(); + if (flowAndStatisticsMapList != null) { + for (FlowAndStatisticsMapList flowAndStatistics : flowAndStatisticsMapList) { + final FlowOnNode it = new FlowOnNode(ToSalConversionsUtils.toFlow(flowAndStatistics, node)); + ret.add(addFlowStats(it, flowAndStatistics)); + } } } } + return ret; + } + + private String buildCacheKey(final TransactionId id, final NodeId nodeId) { + return String.valueOf(id.getValue()) + "-" + nodeId.getValue(); + } - return output; + private void waitForNotification() { + try { + // going for a simple sleep approach,as wait-notify on a monitor would require + // us to maintain monitors per txn-node combo + Thread.sleep(SLEEP_FOR_NOTIFICATIONS_MILLIS); + LOG.trace("statCollector is waking up from a wait stat Response sleep"); + } catch (final InterruptedException e) { + LOG.warn("statCollector has been interrupted waiting stat Response sleep", e); + } } @Override @@ -623,6 +713,8 @@ public class InventoryAndReadAdapter implements IPluginInReadService, IPluginInI for (final IPluginOutReadService statsPublisher : getStatisticsPublisher()) { statsPublisher.nodeFlowStatisticsUpdated(aDNode, adsalFlowsStatistics); } + + updateTransactionCache(notification, notification.getId(), !notification.isMoreReplies()); } /** @@ -778,4 +870,48 @@ public class InventoryAndReadAdapter implements IPluginInReadService, IPluginInI private List removeNodeConnectors(final InstanceIdentifier nodeIdentifier) { return this.nodeToNodeConnectorsMap.remove(Iterables.get(nodeIdentifier.getPathArguments(), 1)); } + + private void updateTransactionCache(T notification, NodeId nodeId, boolean lastNotification) { + + String cacheKey = buildCacheKey(notification.getTransactionId(), nodeId); + TransactionNotificationList txnList = (TransactionNotificationList) txCache.getIfPresent(cacheKey); + final Optional> optional = Optional.>fromNullable(txnList); + if (optional.isPresent()) { + LOG.info("updateTransactionCache cacheKey:{}, lastNotification:{}, txnList-present:{}", cacheKey, lastNotification, optional.isPresent()); + TransactionNotificationList txn = optional.get(); + txn.addNotification(notification); + txn.setAllNotificationsGathered(lastNotification); + } + } + + private class TransactionNotificationList { + private TransactionId id; + private String nId; + private List notifications; + private boolean allNotificationsGathered; + + public TransactionNotificationList(TransactionId id, String nId) { + this.nId = nId; + this.id = id; + notifications = new ArrayList(); + } + + public void addNotification(T notification) { + notifications.add(notification); + } + + public void setAllNotificationsGathered(boolean allNotificationsGathered) { + this.allNotificationsGathered = allNotificationsGathered; + } + + public boolean areAllNotificationsGathered() { + return allNotificationsGathered; + } + + public List getNotifications() { + return notifications; + } + + } + }