*/
package org.opendaylight.controller.sal.compatibility;
+import com.google.common.base.Optional;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Iterables;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-
import org.opendaylight.controller.md.sal.binding.util.TypeSafeDataReader;
import org.opendaylight.controller.sal.binding.api.data.DataBrokerService;
import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.AggregateFlowStatisticsUpdate;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowStatisticsData;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowsStatisticsUpdate;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowStatisticsFromFlowTableInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowStatisticsFromFlowTableInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowStatisticsFromFlowTableOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetFlowStatisticsFromFlowTableInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsListener;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.table.statistics.rev131215.flow.table.statistics.FlowTableStatistics;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.topology.discovery.rev130819.FlowTopologyDiscoveryService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.topology.discovery.rev130819.Link;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionAware;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRef;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRemoved;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRemoved;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeUpdated;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.OpendaylightInventoryListener;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
import org.opendaylight.yang.gen.v1.urn.opendaylight.model.statistics.types.rev130925.GenericStatistics;
import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsListener;
import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.node.connector.statistics.and.port.number.map.NodeConnectorStatisticsAndPortNumberMap;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.common.RpcResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class InventoryAndReadAdapter implements IPluginInReadService, IPluginInInventoryService, OpendaylightInventoryListener, OpendaylightFlowStatisticsListener, OpendaylightFlowTableStatisticsListener, OpendaylightPortStatisticsListener {
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+public class InventoryAndReadAdapter implements IPluginInReadService, IPluginInInventoryService, OpendaylightFlowStatisticsListener, OpendaylightFlowTableStatisticsListener, OpendaylightPortStatisticsListener {
private static final Logger LOG = LoggerFactory.getLogger(InventoryAndReadAdapter.class);
private static final short OPENFLOWV10_TABLE_ID = 0;
+ private static final int SLEEP_FOR_NOTIFICATIONS_MILLIS = 500;
private final InventoryNotificationProvider inventoryNotificationProvider = new InventoryNotificationProvider();
private final Map<PathArgument,List<PathArgument>> nodeToNodeConnectorsMap = new ConcurrentHashMap<>();
private List<IPluginOutInventoryService> inventoryPublisher = new CopyOnWriteArrayList<>();
private List<IPluginOutReadService> statisticsPublisher = new CopyOnWriteArrayList<>();
+ private Cache<String, TransactionNotificationList<? extends TransactionAware>> txCache;
private OpendaylightFlowTableStatisticsService flowTableStatisticsService;
private OpendaylightPortStatisticsService nodeConnectorStatisticsService;
public void startAdapter() {
inventoryNotificationProvider.setDataProviderService(getDataProviderService());
inventoryNotificationProvider.setInventoryPublisher(getInventoryPublisher());
+ txCache = CacheBuilder.newBuilder().expireAfterWrite(60L, TimeUnit.SECONDS).maximumSize(10000).build();
// inventoryNotificationProvider.start();
}
* @param id Table id
* @return Table contents, or null if not present
*/
- private Table readConfigTable(final Node node, final short id) {
+ private Table readOperationalTable(final Node node, final short id) {
final InstanceIdentifier<Table> tableRef = InstanceIdentifier.builder(Nodes.class)
- .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class, InventoryMapping.toNodeKey(node))
+ .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class, NodeMapping.toNodeKey(node))
.augmentation(FlowCapableNode.class)
.child(Table.class, new TableKey(id))
.build();
- return (Table) startChange().readConfigurationData(tableRef);
+ return (Table) startChange().readOperationalData(tableRef);
}
@Override
public List<FlowOnNode> readAllFlow(final Node node, final boolean cached) {
- final ArrayList<FlowOnNode> output = new ArrayList<>();
- final Table table = readConfigTable(node, OPENFLOWV10_TABLE_ID);
- if (table != null) {
- final List<Flow> flows = table.getFlow();
- LOG.trace("Number of flows installed in table 0 of node {} : {}", node, flows.size());
+ final ArrayList<FlowOnNode> ret= new ArrayList<>();
+ if (cached) {
+ final Table table = readOperationalTable(node, OPENFLOWV10_TABLE_ID);
+ if (table != null) {
+ final List<Flow> flows = table.getFlow();
+ LOG.trace("Number of flows installed in table 0 of node {} : {}", node, flows.size());
+
+ for (final Flow flow : flows) {
+ final FlowStatisticsData statsFromDataStore = flow.getAugmentation(FlowStatisticsData.class);
+ if (statsFromDataStore != null) {
+ final FlowOnNode it = new FlowOnNode(ToSalConversionsUtils.toFlow(flow, node));
+ ret.add(addFlowStats(it, statsFromDataStore.getFlowStatistics()));
+ }
+ }
+ }
+ } else {
+ LOG.debug("readAllFlow cached:{}", cached);
+ GetAllFlowStatisticsFromFlowTableInput input =
+ new GetAllFlowStatisticsFromFlowTableInputBuilder()
+ .setNode(NodeMapping.toNodeRef(node))
+ .setTableId(new TableId(OPENFLOWV10_TABLE_ID))
+ .build();
+
+ Future<RpcResult<GetAllFlowStatisticsFromFlowTableOutput>> future =
+ getFlowStatisticsService().getAllFlowStatisticsFromFlowTable(input);
+
+ RpcResult<GetAllFlowStatisticsFromFlowTableOutput> result = null;
+ try {
+ // having a blocking call is fine here, as we need to join
+ // the notifications and return the result
+ result = future.get();
+ } catch (Exception e) {
+ LOG.error("Exception in getAllFlowStatisticsFromFlowTable ", e);
+ return ret;
+ }
+
+ GetAllFlowStatisticsFromFlowTableOutput output = result.getResult();
+ if (output == null) {
+ return ret;
+ }
+
+ TransactionId transactionId = output.getTransactionId();
+ String cacheKey = buildCacheKey(transactionId, NodeMapping.toNodeId(node));
+ LOG.info("readAllFlow transactionId:{} cacheKey:{}", transactionId, cacheKey);
- for (final Flow flow : flows) {
- final FlowStatisticsData statsFromDataStore = flow.getAugmentation(FlowStatisticsData.class);
- if (statsFromDataStore != null) {
- final FlowOnNode it = new FlowOnNode(ToSalConversionsUtils.toFlow(flow, node));
- output.add(addFlowStats(it, statsFromDataStore.getFlowStatistics()));
+ // insert an entry in tempcache, will get updated when notification is received
+ txCache.put(cacheKey, new TransactionNotificationList<FlowsStatisticsUpdate>(
+ transactionId, node.getNodeIDString()));
+
+ TransactionNotificationList<FlowsStatisticsUpdate> txnList =
+ (TransactionNotificationList<FlowsStatisticsUpdate>) txCache.getIfPresent(cacheKey);
+
+ // this loop would not be infinite as the cache will remove an entry
+ // after defined time if not written to
+ while (txnList != null && !txnList.areAllNotificationsGathered()) {
+ LOG.debug("readAllFlow waiting for notification...");
+ waitForNotification();
+ txnList = (TransactionNotificationList<FlowsStatisticsUpdate>) txCache.getIfPresent(cacheKey);
+ }
+
+ if (txnList == null) {
+ return ret;
+ }
+
+ List<FlowsStatisticsUpdate> notifications = txnList.getNotifications();
+ for (FlowsStatisticsUpdate flowsStatisticsUpdate : notifications) {
+ List<FlowAndStatisticsMapList> flowAndStatisticsMapList = flowsStatisticsUpdate.getFlowAndStatisticsMapList();
+ if (flowAndStatisticsMapList != null) {
+ for (FlowAndStatisticsMapList flowAndStatistics : flowAndStatisticsMapList) {
+ final FlowOnNode it = new FlowOnNode(ToSalConversionsUtils.toFlow(flowAndStatistics, node));
+ ret.add(addFlowStats(it, flowAndStatistics));
+ }
}
}
}
+ return ret;
+ }
- // TODO (main): Shall we send request to the switch? It will make async request to the switch.
- // Once the plugin receives a response, it will let the adaptor know through onFlowStatisticsUpdate()
- // If we assume that md-sal statistics manager will always be running, then it is not required
- // But if not, then sending request will collect the latest data for adaptor at least.
- getFlowStatisticsService().getAllFlowsStatisticsFromAllFlowTables(
- new GetAllFlowsStatisticsFromAllFlowTablesInputBuilder().setNode(NodeMapping.toNodeRef(node)).build());
- return output;
+ private String buildCacheKey(final TransactionId id, final NodeId nodeId) {
+ return String.valueOf(id.getValue()) + "-" + nodeId.getValue();
+ }
+
+ private void waitForNotification() {
+ try {
+ // going for a simple sleep approach,as wait-notify on a monitor would require
+ // us to maintain monitors per txn-node combo
+ Thread.sleep(SLEEP_FOR_NOTIFICATIONS_MILLIS);
+ LOG.trace("statCollector is waking up from a wait stat Response sleep");
+ } catch (final InterruptedException e) {
+ LOG.warn("statCollector has been interrupted waiting stat Response sleep", e);
+ }
}
@Override
@Override
public FlowOnNode readFlow(final Node node, final org.opendaylight.controller.sal.flowprogrammer.Flow targetFlow, final boolean cached) {
FlowOnNode ret = null;
- final Table table = readConfigTable(node, OPENFLOWV10_TABLE_ID);
+ final Table table = readOperationalTable(node, OPENFLOWV10_TABLE_ID);
if (table != null) {
final List<Flow> flows = table.getFlow();
InventoryAndReadAdapter.LOG.trace("Number of flows installed in table 0 of node {} : {}", node, flows.size());
@Override
public NodeTableStatistics readNodeTable(final NodeTable nodeTable, final boolean cached) {
NodeTableStatistics nodeStats = null;
- final Table table = readConfigTable(nodeTable.getNode(), (short) nodeTable.getID());
+ final Table table = readOperationalTable(nodeTable.getNode(), (short) nodeTable.getID());
if (table != null) {
final FlowTableStatisticsData tableStats = table.getAugmentation(FlowTableStatisticsData.class);
if (tableStats != null) {
return nodeStats;
}
- @Override
- public void onNodeConnectorRemoved(final NodeConnectorRemoved update) {
+ public void onNodeConnectorRemovedInternal(final NodeConnectorRemoved update) {
// Never received
}
- @Override
- public void onNodeRemoved(final NodeRemoved notification) {
+ public void onNodeRemovedInternal(final NodeRemoved notification) {
this.removeNodeConnectors(notification.getNodeRef().getValue());
try {
final Node aDNode = NodeMapping.toADNode(notification.getNodeRef());
}
}
- @Override
- public void onNodeConnectorUpdated(final NodeConnectorUpdated update) {
+ public void onNodeConnectorUpdatedInternal(final NodeConnectorUpdated update) {
final NodeConnectorRef ref = update.getNodeConnectorRef();
final UpdateType updateType;
if (!this.isKnownNodeConnector(ref.getValue())) {
}
}
- @Override
- public void onNodeUpdated(final NodeUpdated notification) {
+ public void onNodeUpdatedInternal(final NodeUpdated notification) {
final NodeRef ref = notification.getNodeRef();
final UpdateType updateType;
for (final IPluginOutReadService statsPublisher : getStatisticsPublisher()) {
statsPublisher.nodeFlowStatisticsUpdated(aDNode, adsalFlowsStatistics);
}
+
+ updateTransactionCache(notification, notification.getId(), !notification.isMoreReplies());
}
/**
private List<PathArgument> removeNodeConnectors(final InstanceIdentifier<? extends Object> nodeIdentifier) {
return this.nodeToNodeConnectorsMap.remove(Iterables.get(nodeIdentifier.getPathArguments(), 1));
}
+
+ private <T extends TransactionAware> void updateTransactionCache(T notification, NodeId nodeId, boolean lastNotification) {
+
+ String cacheKey = buildCacheKey(notification.getTransactionId(), nodeId);
+ TransactionNotificationList<T> txnList = (TransactionNotificationList<T>) txCache.getIfPresent(cacheKey);
+ final Optional<TransactionNotificationList<T>> optional = Optional.<TransactionNotificationList<T>>fromNullable(txnList);
+ if (optional.isPresent()) {
+ LOG.info("updateTransactionCache cacheKey:{}, lastNotification:{}, txnList-present:{}", cacheKey, lastNotification, optional.isPresent());
+ TransactionNotificationList<T> txn = optional.get();
+ txn.addNotification(notification);
+ txn.setAllNotificationsGathered(lastNotification);
+ }
+ }
+
+ private class TransactionNotificationList<T extends TransactionAware> {
+ private TransactionId id;
+ private String nId;
+ private List<T> notifications;
+ private boolean allNotificationsGathered;
+
+ public TransactionNotificationList(TransactionId id, String nId) {
+ this.nId = nId;
+ this.id = id;
+ notifications = new ArrayList<T>();
+ }
+
+ public void addNotification(T notification) {
+ notifications.add(notification);
+ }
+
+ public void setAllNotificationsGathered(boolean allNotificationsGathered) {
+ this.allNotificationsGathered = allNotificationsGathered;
+ }
+
+ public boolean areAllNotificationsGathered() {
+ return allNotificationsGathered;
+ }
+
+ public List<T> getNotifications() {
+ return notifications;
+ }
+
+ }
+
}