X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fcompatibility%2Fsal-compatibility%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsal%2Fcompatibility%2FInventoryAndReadAdapter.java;h=560d8a1d3f379f13b7d4dedce3f45077caa6d1ad;hp=e2c13867754d187f26f9f9a1dc4414a2b4114572;hb=b5167b9bc04f2792b275cfe0eac78c0f5eb9442d;hpb=7d753ff9887cb803bdcd222aec2ab2a0a9c87906 diff --git a/opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/InventoryAndReadAdapter.java b/opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/InventoryAndReadAdapter.java index e2c1386775..560d8a1d3f 100644 --- a/opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/InventoryAndReadAdapter.java +++ b/opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/InventoryAndReadAdapter.java @@ -7,19 +7,11 @@ */ package org.opendaylight.controller.sal.compatibility; +import com.google.common.base.Optional; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import com.google.common.collect.Iterables; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CopyOnWriteArrayList; - import org.opendaylight.controller.md.sal.binding.util.TypeSafeDataReader; import org.opendaylight.controller.sal.binding.api.data.DataBrokerService; import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction; @@ -48,7 +40,9 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.ta import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.AggregateFlowStatisticsUpdate; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsData; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowsStatisticsUpdate; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowStatisticsFromFlowTableInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowStatisticsFromFlowTableInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowStatisticsFromFlowTableOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetFlowStatisticsFromFlowTableInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsListener; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsService; @@ -62,6 +56,8 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev13 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.flow.table.statistics.FlowTableStatistics; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.topology.discovery.rev130819.FlowTopologyDiscoveryService; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.topology.discovery.rev130819.Link; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionAware; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRef; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRemoved; @@ -71,7 +67,6 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRemoved; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeUpdated; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.OpendaylightInventoryListener; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey; import org.opendaylight.yang.gen.v1.urn.opendaylight.model.statistics.types.rev130925.GenericStatistics; @@ -85,19 +80,36 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.N import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsListener; import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsService; import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.node.connector.statistics.and.port.number.map.NodeConnectorStatisticsAndPortNumberMap; +import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.PathArgument; +import org.opendaylight.yangtools.yang.common.RpcResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class InventoryAndReadAdapter implements IPluginInReadService, IPluginInInventoryService, OpendaylightInventoryListener, OpendaylightFlowStatisticsListener, OpendaylightFlowTableStatisticsListener, OpendaylightPortStatisticsListener { +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +public class InventoryAndReadAdapter implements IPluginInReadService, IPluginInInventoryService, OpendaylightFlowStatisticsListener, OpendaylightFlowTableStatisticsListener, OpendaylightPortStatisticsListener { private static final Logger LOG = LoggerFactory.getLogger(InventoryAndReadAdapter.class); private static final short OPENFLOWV10_TABLE_ID = 0; + private static final int SLEEP_FOR_NOTIFICATIONS_MILLIS = 500; private final InventoryNotificationProvider inventoryNotificationProvider = new InventoryNotificationProvider(); private final Map> nodeToNodeConnectorsMap = new ConcurrentHashMap<>(); private List inventoryPublisher = new CopyOnWriteArrayList<>(); private List statisticsPublisher = new CopyOnWriteArrayList<>(); + private Cache> txCache; private OpendaylightFlowTableStatisticsService flowTableStatisticsService; private OpendaylightPortStatisticsService nodeConnectorStatisticsService; @@ -173,6 +185,7 @@ public class InventoryAndReadAdapter implements IPluginInReadService, IPluginInI public void startAdapter() { inventoryNotificationProvider.setDataProviderService(getDataProviderService()); inventoryNotificationProvider.setInventoryPublisher(getInventoryPublisher()); + txCache = CacheBuilder.newBuilder().expireAfterWrite(60L, TimeUnit.SECONDS).maximumSize(10000).build(); // inventoryNotificationProvider.start(); } @@ -241,40 +254,109 @@ public class InventoryAndReadAdapter implements IPluginInReadService, IPluginInI * @param id Table id * @return Table contents, or null if not present */ - private Table readConfigTable(final Node node, final short id) { + private Table readOperationalTable(final Node node, final short id) { final InstanceIdentifier tableRef = InstanceIdentifier.builder(Nodes.class) - .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class, InventoryMapping.toNodeKey(node)) + .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class, NodeMapping.toNodeKey(node)) .augmentation(FlowCapableNode.class) .child(Table.class, new TableKey(id)) .build(); - return (Table) startChange().readConfigurationData(tableRef); + return (Table) startChange().readOperationalData(tableRef); } @Override public List readAllFlow(final Node node, final boolean cached) { - final ArrayList output = new ArrayList<>(); - final Table table = readConfigTable(node, OPENFLOWV10_TABLE_ID); - if (table != null) { - final List flows = table.getFlow(); - LOG.trace("Number of flows installed in table 0 of node {} : {}", node, flows.size()); + final ArrayList ret= new ArrayList<>(); + if (cached) { + final Table table = readOperationalTable(node, OPENFLOWV10_TABLE_ID); + if (table != null) { + final List flows = table.getFlow(); + LOG.trace("Number of flows installed in table 0 of node {} : {}", node, flows.size()); + + for (final Flow flow : flows) { + final FlowStatisticsData statsFromDataStore = flow.getAugmentation(FlowStatisticsData.class); + if (statsFromDataStore != null) { + final FlowOnNode it = new FlowOnNode(ToSalConversionsUtils.toFlow(flow, node)); + ret.add(addFlowStats(it, statsFromDataStore.getFlowStatistics())); + } + } + } + } else { + LOG.debug("readAllFlow cached:{}", cached); + GetAllFlowStatisticsFromFlowTableInput input = + new GetAllFlowStatisticsFromFlowTableInputBuilder() + .setNode(NodeMapping.toNodeRef(node)) + .setTableId(new TableId(OPENFLOWV10_TABLE_ID)) + .build(); + + Future> future = + getFlowStatisticsService().getAllFlowStatisticsFromFlowTable(input); + + RpcResult result = null; + try { + // having a blocking call is fine here, as we need to join + // the notifications and return the result + result = future.get(); + } catch (Exception e) { + LOG.error("Exception in getAllFlowStatisticsFromFlowTable ", e); + return ret; + } + + GetAllFlowStatisticsFromFlowTableOutput output = result.getResult(); + if (output == null) { + return ret; + } + + TransactionId transactionId = output.getTransactionId(); + String cacheKey = buildCacheKey(transactionId, NodeMapping.toNodeId(node)); + LOG.info("readAllFlow transactionId:{} cacheKey:{}", transactionId, cacheKey); - for (final Flow flow : flows) { - final FlowStatisticsData statsFromDataStore = flow.getAugmentation(FlowStatisticsData.class); - if (statsFromDataStore != null) { - final FlowOnNode it = new FlowOnNode(ToSalConversionsUtils.toFlow(flow, node)); - output.add(addFlowStats(it, statsFromDataStore.getFlowStatistics())); + // insert an entry in tempcache, will get updated when notification is received + txCache.put(cacheKey, new TransactionNotificationList( + transactionId, node.getNodeIDString())); + + TransactionNotificationList txnList = + (TransactionNotificationList) txCache.getIfPresent(cacheKey); + + // this loop would not be infinite as the cache will remove an entry + // after defined time if not written to + while (txnList != null && !txnList.areAllNotificationsGathered()) { + LOG.debug("readAllFlow waiting for notification..."); + waitForNotification(); + txnList = (TransactionNotificationList) txCache.getIfPresent(cacheKey); + } + + if (txnList == null) { + return ret; + } + + List notifications = txnList.getNotifications(); + for (FlowsStatisticsUpdate flowsStatisticsUpdate : notifications) { + List flowAndStatisticsMapList = flowsStatisticsUpdate.getFlowAndStatisticsMapList(); + if (flowAndStatisticsMapList != null) { + for (FlowAndStatisticsMapList flowAndStatistics : flowAndStatisticsMapList) { + final FlowOnNode it = new FlowOnNode(ToSalConversionsUtils.toFlow(flowAndStatistics, node)); + ret.add(addFlowStats(it, flowAndStatistics)); + } } } } + return ret; + } - // TODO (main): Shall we send request to the switch? It will make async request to the switch. - // Once the plugin receives a response, it will let the adaptor know through onFlowStatisticsUpdate() - // If we assume that md-sal statistics manager will always be running, then it is not required - // But if not, then sending request will collect the latest data for adaptor at least. - getFlowStatisticsService().getAllFlowsStatisticsFromAllFlowTables( - new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder().setNode(NodeMapping.toNodeRef(node)).build()); - return output; + private String buildCacheKey(final TransactionId id, final NodeId nodeId) { + return String.valueOf(id.getValue()) + "-" + nodeId.getValue(); + } + + private void waitForNotification() { + try { + // going for a simple sleep approach,as wait-notify on a monitor would require + // us to maintain monitors per txn-node combo + Thread.sleep(SLEEP_FOR_NOTIFICATIONS_MILLIS); + LOG.trace("statCollector is waking up from a wait stat Response sleep"); + } catch (final InterruptedException e) { + LOG.warn("statCollector has been interrupted waiting stat Response sleep", e); + } } @Override @@ -334,7 +416,7 @@ public class InventoryAndReadAdapter implements IPluginInReadService, IPluginInI @Override public FlowOnNode readFlow(final Node node, final org.opendaylight.controller.sal.flowprogrammer.Flow targetFlow, final boolean cached) { FlowOnNode ret = null; - final Table table = readConfigTable(node, OPENFLOWV10_TABLE_ID); + final Table table = readOperationalTable(node, OPENFLOWV10_TABLE_ID); if (table != null) { final List flows = table.getFlow(); InventoryAndReadAdapter.LOG.trace("Number of flows installed in table 0 of node {} : {}", node, flows.size()); @@ -386,7 +468,7 @@ public class InventoryAndReadAdapter implements IPluginInReadService, IPluginInI @Override public NodeTableStatistics readNodeTable(final NodeTable nodeTable, final boolean cached) { NodeTableStatistics nodeStats = null; - final Table table = readConfigTable(nodeTable.getNode(), (short) nodeTable.getID()); + final Table table = readOperationalTable(nodeTable.getNode(), (short) nodeTable.getID()); if (table != null) { final FlowTableStatisticsData tableStats = table.getAugmentation(FlowTableStatisticsData.class); if (tableStats != null) { @@ -405,13 +487,11 @@ public class InventoryAndReadAdapter implements IPluginInReadService, IPluginInI return nodeStats; } - @Override - public void onNodeConnectorRemoved(final NodeConnectorRemoved update) { + public void onNodeConnectorRemovedInternal(final NodeConnectorRemoved update) { // Never received } - @Override - public void onNodeRemoved(final NodeRemoved notification) { + public void onNodeRemovedInternal(final NodeRemoved notification) { this.removeNodeConnectors(notification.getNodeRef().getValue()); try { final Node aDNode = NodeMapping.toADNode(notification.getNodeRef()); @@ -421,8 +501,7 @@ public class InventoryAndReadAdapter implements IPluginInReadService, IPluginInI } } - @Override - public void onNodeConnectorUpdated(final NodeConnectorUpdated update) { + public void onNodeConnectorUpdatedInternal(final NodeConnectorUpdated update) { final NodeConnectorRef ref = update.getNodeConnectorRef(); final UpdateType updateType; if (!this.isKnownNodeConnector(ref.getValue())) { @@ -441,8 +520,7 @@ public class InventoryAndReadAdapter implements IPluginInReadService, IPluginInI } } - @Override - public void onNodeUpdated(final NodeUpdated notification) { + public void onNodeUpdatedInternal(final NodeUpdated notification) { final NodeRef ref = notification.getNodeRef(); final UpdateType updateType; @@ -635,6 +713,8 @@ public class InventoryAndReadAdapter implements IPluginInReadService, IPluginInI for (final IPluginOutReadService statsPublisher : getStatisticsPublisher()) { statsPublisher.nodeFlowStatisticsUpdated(aDNode, adsalFlowsStatistics); } + + updateTransactionCache(notification, notification.getId(), !notification.isMoreReplies()); } /** @@ -790,4 +870,48 @@ public class InventoryAndReadAdapter implements IPluginInReadService, IPluginInI private List removeNodeConnectors(final InstanceIdentifier nodeIdentifier) { return this.nodeToNodeConnectorsMap.remove(Iterables.get(nodeIdentifier.getPathArguments(), 1)); } + + private void updateTransactionCache(T notification, NodeId nodeId, boolean lastNotification) { + + String cacheKey = buildCacheKey(notification.getTransactionId(), nodeId); + TransactionNotificationList txnList = (TransactionNotificationList) txCache.getIfPresent(cacheKey); + final Optional> optional = Optional.>fromNullable(txnList); + if (optional.isPresent()) { + LOG.info("updateTransactionCache cacheKey:{}, lastNotification:{}, txnList-present:{}", cacheKey, lastNotification, optional.isPresent()); + TransactionNotificationList txn = optional.get(); + txn.addNotification(notification); + txn.setAllNotificationsGathered(lastNotification); + } + } + + private class TransactionNotificationList { + private TransactionId id; + private String nId; + private List notifications; + private boolean allNotificationsGathered; + + public TransactionNotificationList(TransactionId id, String nId) { + this.nId = nId; + this.id = id; + notifications = new ArrayList(); + } + + public void addNotification(T notification) { + notifications.add(notification); + } + + public void setAllNotificationsGathered(boolean allNotificationsGathered) { + this.allNotificationsGathered = allNotificationsGathered; + } + + public boolean areAllNotificationsGathered() { + return allNotificationsGathered; + } + + public List getNotifications() { + return notifications; + } + + } + }