-
/*
* Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
*
import org.opendaylight.controller.sal.core.ComponentActivatorAbstractBase;
import org.opendaylight.controller.sal.topology.IListenTopoUpdates;
import org.opendaylight.controller.sal.topology.ITopologyService;
+import org.opendaylight.controller.switchmanager.IInventoryListener;
import org.opendaylight.controller.switchmanager.ISwitchManager;
import org.opendaylight.controller.topologymanager.ITopologyManager;
import org.opendaylight.controller.topologymanager.ITopologyManagerAware;
props.put("cachenames", propSet);
c.setInterface(new String[] { IListenTopoUpdates.class.getName(),
+ IInventoryListener.class.getName(),
ITopologyManager.class.getName(),
ITopologyManagerShell.class.getName(),
IConfigurationContainerAware.class.getName(),
import org.opendaylight.controller.sal.utils.NodeConnectorCreator;
import org.opendaylight.controller.sal.utils.Status;
import org.opendaylight.controller.sal.utils.StatusCode;
+import org.opendaylight.controller.switchmanager.IInventoryListener;
import org.opendaylight.controller.switchmanager.ISwitchManager;
import org.opendaylight.controller.topologymanager.ITopologyManager;
import org.opendaylight.controller.topologymanager.ITopologyManagerAware;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
IConfigurationContainerAware,
IListenTopoUpdates,
IObjectReader,
+ IInventoryListener,
CommandProvider {
protected static final String TOPOEDGESDB = "topologymanager.edgesDB";
protected static final String TOPOHOSTSDB = "topologymanager.hostsDB";
protected static final String TOPOUSERLINKSDB = "topologymanager.userLinksDB";
private static final String USER_LINKS_FILE_NAME = "userTopology.conf";
private static final Logger log = LoggerFactory.getLogger(TopologyManagerImpl.class);
+ private static final long PENDING_UPDATE_TIMEOUT = 5000L;
+
private ITopologyService topoService;
private IClusterContainerServices clusterContainerService;
private IConfigurationContainerService configurationService;
private BlockingQueue<TopoEdgeUpdate> notifyQ = new LinkedBlockingQueue<TopoEdgeUpdate>();
private volatile Boolean shuttingDown = false;
private Thread notifyThread;
+ private final Map<NodeConnector, List<PendingUpdateTask>> pendingUpdates =
+ new HashMap<NodeConnector, List<PendingUpdateTask>>();
+ private final BlockingQueue<TopoEdgeUpdate> updateQ =
+ new LinkedBlockingQueue<TopoEdgeUpdate>();
+ private Timer pendingTimer;
+ private Thread updateThread;
+
+ private class PendingEdgeUpdate extends TopoEdgeUpdate {
+ private PendingEdgeUpdate(Edge e, Set<Property> p, UpdateType t) {
+ super(e, p, t);
+ }
+ }
+
+ private class UpdateTopology implements Runnable {
+ @Override
+ public void run() {
+ log.trace("Start topology update thread");
+
+ while (!shuttingDown) {
+ try {
+ List<TopoEdgeUpdate> list = new ArrayList<TopoEdgeUpdate>();
+ TopoEdgeUpdate teu = updateQ.take();
+ for (; teu != null; teu = updateQ.poll()) {
+ list.add(teu);
+ }
+
+ if (!list.isEmpty()) {
+ log.trace("Update edges: {}", list);
+ doEdgeUpdate(list);
+ }
+ } catch (InterruptedException e) {
+ if (shuttingDown) {
+ break;
+ }
+ log.warn("Topology update thread interrupted", e);
+ } catch (Exception e) {
+ log.error("Exception on topology update thread", e);
+ }
+ }
+
+ log.trace("Exit topology update thread");
+ }
+ }
+
+ private class PendingUpdateTask extends TimerTask {
+ private final Edge edge;
+ private final Set<Property> props;
+ private final UpdateType type;
+
+ private PendingUpdateTask(Edge e, Set<Property> p, UpdateType t) {
+ edge = e;
+ props = p;
+ type = t;
+ }
+
+ private NodeConnector getHeadNodeConnector() {
+ return edge.getHeadNodeConnector();
+ }
+
+ private void flush() {
+ log.info("Flush pending topology update: edge {}, type {}",
+ edge, type);
+ updateQ.add(new PendingEdgeUpdate(edge, props, type));
+ }
+ @Override
+ public void run() {
+ if (removePendingEvent(this)) {
+ log.warn("Pending topology update timed out: edge{}, type {}",
+ edge, type);
+ }
+ }
+ }
void nonClusterObjectCreate() {
edgesDB = new ConcurrentHashMap<Edge, Set<Property>>();
// Restore the shuttingDown status on init of the component
shuttingDown = false;
notifyThread = new Thread(new TopologyNotify(notifyQ));
+ pendingTimer = new Timer("Topology Pending Update Timer");
+ updateThread = new Thread(new UpdateTopology(), "Topology Update");
}
@SuppressWarnings({ "unchecked" })
*
*/
void started() {
+ updateThread.start();
+
// Start the batcher thread for the cluster wide topology updates
notifyThread.start();
// SollicitRefresh MUST be called here else if called at init
void stop() {
shuttingDown = true;
+ updateThread.interrupt();
notifyThread.interrupt();
+ pendingTimer.cancel();
}
/**
*
*/
void destroy() {
+ updateQ.clear();
+ updateThread = null;
+ pendingTimer = null;
notifyQ.clear();
notifyThread = null;
}
return (switchManager.doesNodeConnectorExist(head));
}
+ private void addPendingEvent(Edge e, Set<Property> p, UpdateType t) {
+ NodeConnector head = e.getHeadNodeConnector();
+ PendingUpdateTask task = new PendingUpdateTask(e, p, t);
+ synchronized (pendingUpdates) {
+ List<PendingUpdateTask> list = pendingUpdates.get(head);
+ if (list == null) {
+ list = new LinkedList<PendingUpdateTask>();
+ pendingUpdates.put(head, list);
+ }
+ list.add(task);
+ pendingTimer.schedule(task, PENDING_UPDATE_TIMEOUT);
+ }
+ }
+
+ private boolean enqueueEventIfPending(Edge e, Set<Property> p, UpdateType t) {
+ NodeConnector head = e.getHeadNodeConnector();
+ synchronized (pendingUpdates) {
+ List<PendingUpdateTask> list = pendingUpdates.get(head);
+ if (list != null) {
+ log.warn("Enqueue edge update: edge {}, type {}", e, t);
+ PendingUpdateTask task = new PendingUpdateTask(e, p, t);
+ list.add(task);
+ pendingTimer.schedule(task, PENDING_UPDATE_TIMEOUT);
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ private boolean removePendingEvent(PendingUpdateTask t) {
+ t.cancel();
+ NodeConnector head = t.getHeadNodeConnector();
+ boolean removed = false;
+
+ synchronized (pendingUpdates) {
+ List<PendingUpdateTask> list = pendingUpdates.get(head);
+ if (list != null) {
+ removed = list.remove(t);
+ if (list.isEmpty()) {
+ pendingUpdates.remove(head);
+ }
+ }
+ }
+
+ return removed;
+ }
+
+ private void removePendingEvent(NodeConnector head, boolean doFlush) {
+ List<PendingUpdateTask> list;
+ synchronized (pendingUpdates) {
+ list = pendingUpdates.remove(head);
+ }
+
+ if (list != null) {
+ for (PendingUpdateTask task : list) {
+ if (task.cancel() && doFlush) {
+ task.flush();
+ }
+ }
+ pendingTimer.purge();
+ }
+ }
+
private TopoEdgeUpdate edgeUpdate(Edge e, UpdateType type, Set<Property> props) {
- switch (type) {
- case ADDED:
+ return edgeUpdate(e, type, props, false);
+ }
+ private TopoEdgeUpdate edgeUpdate(Edge e, UpdateType type, Set<Property> props, boolean isPending) {
+ if (!type.equals(UpdateType.ADDED) &&
+ enqueueEventIfPending(e, props, type)) {
+ return null;
+ }
+ switch (type) {
+ case ADDED:
if (this.edgesDB.containsKey(e)) {
// Avoid redundant updates (e.g. cluster switch-over) as notifications trigger expensive tasks
log.trace("Skipping redundant edge addition: {}", e);
return null;
}
+ // Ensure that head node connector exists
+ if (!isPending) {
+ if (headNodeConnectorExist(e)) {
+ removePendingEvent(e.getHeadNodeConnector(), true);
+ } else {
+ log.warn("Ignore edge that contains invalid node connector: {}",
+ e);
+ addPendingEvent(e, props, type);
+ return null;
+ }
+ }
+
// Make sure the props are non-null or create a copy
if (props == null) {
props = new HashSet<Property>();
props = new HashSet<Property>(props);
}
-
- // Ensure that head node connector exists
- if (!headNodeConnectorExist(e)) {
- log.warn("Ignore edge that contains invalid node connector: {}", e);
- return null;
- }
-
// Check if nodeConnectors of the edge were correctly categorized
// by protocol plugin
crossCheckNodeConnectors(e);
return new TopoEdgeUpdate(e, props, type);
}
- @Override
- public void edgeUpdate(List<TopoEdgeUpdate> topoedgeupdateList) {
+ private void doEdgeUpdate(List<TopoEdgeUpdate> topoedgeupdateList) {
List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
- for (int i = 0; i < topoedgeupdateList.size(); i++) {
- Edge e = topoedgeupdateList.get(i).getEdge();
- Set<Property> p = topoedgeupdateList.get(i).getProperty();
- UpdateType type = topoedgeupdateList.get(i).getUpdateType();
- TopoEdgeUpdate teu = edgeUpdate(e, type, p);
- if (teu != null) {
- teuList.add(teu);
+ for (TopoEdgeUpdate teu : topoedgeupdateList) {
+ boolean isPending = (teu instanceof PendingEdgeUpdate);
+ Edge e = teu.getEdge();
+ Set<Property> p = teu.getProperty();
+ UpdateType type = teu.getUpdateType();
+ TopoEdgeUpdate update = edgeUpdate(e, type, p, isPending);
+ if (update != null) {
+ teuList.add(update);
}
}
}
}
+ @Override
+ public void edgeUpdate(List<TopoEdgeUpdate> topoedgeupdateList) {
+ updateQ.addAll(topoedgeupdateList);
+ }
+
private Edge getReverseLinkTuple(TopologyUserLinkConfig link) {
TopologyUserLinkConfig rLink = new TopologyUserLinkConfig(
link.getName(), link.getDstNodeConnector(), link.getSrcNodeConnector());
notifyQ.add(upd);
}
+ @Override
+ public void notifyNode(Node node, UpdateType type, Map<String, Property> propMap) {
+ // NOP
+ }
+
+ @Override
+ public void notifyNodeConnector(NodeConnector nc, UpdateType type, Map<String, Property> propMap) {
+ // Remove pending edge updates for the given node connector.
+ // Pending events should be notified if the node connector exists.
+ boolean doFlush = !type.equals(UpdateType.REMOVED);
+ removePendingEvent(nc, doFlush);
+ }
+
@Override
public void entryCreated(final Object key, final String cacheName, final boolean originLocal) {
if (cacheName.equals(TOPOEDGESDB)) {
return result;
}
+ // Only for unit test.
+ void startTest() {
+ pendingTimer = new Timer("Topology Pending Update Timer");
+ updateThread = new Thread(new UpdateTopology(), "Topology Update");
+ updateThread.start();
+ }
+
+ void stopTest() {
+ shuttingDown = true;
+ updateThread.interrupt();
+ pendingTimer.cancel();
+ }
+
+ boolean flushUpdateQueue(long timeout) {
+ long limit = System.currentTimeMillis() + timeout;
+ long cur;
+ do {
+ if (updateQ.peek() == null) {
+ return true;
+ }
+
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ break;
+ }
+ cur = System.currentTimeMillis();
+ } while (cur < limit);
+
+ return false;
+ }
}
package org.opendaylight.controller.topologymanager.internal;
import org.junit.Assert;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.sal.core.Bandwidth;
import org.opendaylight.controller.sal.core.ConstructionException;
import java.util.concurrent.ConcurrentMap;
public class TopologyManagerImplTest {
+ private TopologyManagerImpl topoManagerImpl;
+
/**
* Mockup of switch manager that only maintains existence of node
* connector.
}
}
+ private void clear() {
+ nodeSet.clear();
+ nodeConnectorSet.clear();
+ }
+
@Override
public Status addSubnet(SubnetConfig configObject) {
return null;
}
}
+ @Before
+ public void setUp() {
+ topoManagerImpl = new TopologyManagerImpl();
+ topoManagerImpl.startTest();
+ }
+
+ @After
+ public void tearDown() {
+ if (topoManagerImpl != null) {
+ topoManagerImpl.stopTest();
+ topoManagerImpl = null;
+ }
+ }
+
/*
* Sets the node, edges and properties for edges here: Edge <SwitchId :
* NodeConnectorId> : <1:1>--><11:11>; <1:2>--><11:12>; <3:3>--><13:13>;
topoedgeupdateList.add(teu2);
topoManagerImpl.edgeUpdate(topoedgeupdateList);
}
+
+ Assert.assertTrue(topoManagerImpl.flushUpdateQueue(5000));
}
@Test
public void testGetNodeEdges() throws ConstructionException {
- TopologyManagerImpl topoManagerImpl = new TopologyManagerImpl();
TestSwitchManager swMgr = new TestSwitchManager();
topoManagerImpl.setSwitchManager(swMgr);
setNodeEdges(topoManagerImpl, swMgr);
@Test
public void testGetEdges() throws ConstructionException {
- TopologyManagerImpl topoManagerImpl = new TopologyManagerImpl();
TestSwitchManager swMgr = new TestSwitchManager();
topoManagerImpl.setSwitchManager(swMgr);
setNodeEdges(topoManagerImpl, swMgr);
TopologyUserLinkConfig link4 = new TopologyUserLinkConfig("default20",
"OF|10@OF|20", "OF|10@OF|30");
- TopologyManagerImpl topoManagerImpl = new TopologyManagerImpl();
TestSwitchManager swMgr = new TestSwitchManager();
topoManagerImpl.setSwitchManager(swMgr);
topoManagerImpl.nonClusterObjectCreate();
public void testGetUserLink() {
TopologyUserLinkConfig[] link = new TopologyUserLinkConfig[5];
TopologyUserLinkConfig[] reverseLink = new TopologyUserLinkConfig[5];
- TopologyManagerImpl topoManagerImpl = new TopologyManagerImpl();
TestSwitchManager swMgr = new TestSwitchManager();
topoManagerImpl.setSwitchManager(swMgr);
topoManagerImpl.nonClusterObjectCreate();
@Test
public void testHostLinkMethods() throws ConstructionException,
UnknownHostException {
- TopologyManagerImpl topoManagerImpl = new TopologyManagerImpl();
TestSwitchManager swMgr = new TestSwitchManager();
topoManagerImpl.setSwitchManager(swMgr);
topoManagerImpl.nonClusterObjectCreate();
@Test
public void testGetNodesWithNodeConnectorHost()
throws ConstructionException, UnknownHostException {
- TopologyManagerImpl topoManagerImpl = new TopologyManagerImpl();
TestSwitchManager swMgr = new TestSwitchManager();
topoManagerImpl.setSwitchManager(swMgr);
topoManagerImpl.nonClusterObjectCreate();
@Test
public void bug1348FixTest() throws ConstructionException {
- TopologyManagerImpl topoManagerImpl = new TopologyManagerImpl();
TestSwitchManager swMgr = new TestSwitchManager();
topoManagerImpl.setSwitchManager(swMgr);
topoManagerImpl.nonClusterObjectCreate();
Assert.fail("Exception was raised when trying to update edge properties: " + e.getMessage());
}
+ Assert.assertTrue(topoManagerImpl.flushUpdateQueue(5000));
Assert.assertEquals(1, topoManagerImpl.getEdges().size());
Assert.assertNotNull(topoManagerImpl.getEdges().get(edge));
}
+
+ @Test
+ public void testNotifyNodeConnector() throws ConstructionException {
+ TestSwitchManager swMgr = new TestSwitchManager();
+ topoManagerImpl.setSwitchManager(swMgr);
+ topoManagerImpl.nonClusterObjectCreate();
+
+ // Test NodeConnector notification in the case that there are no
+ // related edge updates.
+ NodeConnector nc1 = NodeConnectorCreator.createOFNodeConnector(
+ (short) 1, NodeCreator.createOFNode(1000L));
+ Map<String, Property> propMap = new HashMap<>();
+ swMgr.addNodeConnectors(nc1);
+ topoManagerImpl.notifyNodeConnector(nc1, UpdateType.ADDED, propMap);
+ Assert.assertEquals(0, topoManagerImpl.getEdges().size());
+
+ topoManagerImpl.notifyNodeConnector(nc1, UpdateType.CHANGED, propMap);
+ Assert.assertEquals(0, topoManagerImpl.getEdges().size());
+
+ swMgr.clear();
+ topoManagerImpl.notifyNodeConnector(nc1, UpdateType.REMOVED, propMap);
+ Assert.assertEquals(0, topoManagerImpl.getEdges().size());
+
+ // Test NodeConnector notification in the case that there is a related
+ // edge update just before the notification.
+ NodeConnector nc2 = NodeConnectorCreator.createOFNodeConnector(
+ (short) 2, NodeCreator.createOFNode(2000L));
+ Edge edge1 = new Edge(nc1, nc2);
+ Edge edge2 = new Edge(nc2, nc1);
+ Set<Property> props = new HashSet<Property>();
+ TopoEdgeUpdate teu1 = new TopoEdgeUpdate(edge1, props, UpdateType.ADDED);
+ TopoEdgeUpdate teu2 = new TopoEdgeUpdate(edge2, props, UpdateType.ADDED);
+ List<TopoEdgeUpdate> topoedgeupdateList = new ArrayList<TopoEdgeUpdate>();
+ topoedgeupdateList.add(teu1);
+ topoedgeupdateList.add(teu2);
+ topoManagerImpl.edgeUpdate(topoedgeupdateList);
+ swMgr.addNodeConnectors(nc1);
+ topoManagerImpl.notifyNodeConnector(nc1, UpdateType.ADDED, propMap);
+ swMgr.addNodeConnectors(nc2);
+ topoManagerImpl.notifyNodeConnector(nc2, UpdateType.CHANGED, propMap);
+ Assert.assertTrue(topoManagerImpl.flushUpdateQueue(5000));
+ Assert.assertEquals(2, topoManagerImpl.getEdges().size());
+
+ teu1 = new TopoEdgeUpdate(edge1, props, UpdateType.REMOVED);
+ teu2 = new TopoEdgeUpdate(edge2, props, UpdateType.REMOVED);
+ topoedgeupdateList = new ArrayList<TopoEdgeUpdate>();
+ topoedgeupdateList.add(teu1);
+ topoedgeupdateList.add(teu2);
+ topoManagerImpl.edgeUpdate(topoedgeupdateList);
+ Assert.assertTrue(topoManagerImpl.flushUpdateQueue(5000));
+ Assert.assertEquals(0, topoManagerImpl.getEdges().size());
+ topoManagerImpl.notifyNodeConnector(nc1, UpdateType.REMOVED, propMap);
+ topoManagerImpl.notifyNodeConnector(nc2, UpdateType.REMOVED, propMap);
+
+ swMgr.clear();
+
+ // Test NodeConnector notification in the case that there are multiple
+ // edge updates related to the NodeConnector just before the notification.
+ teu1 = new TopoEdgeUpdate(edge1, props, UpdateType.ADDED);
+ teu2 = new TopoEdgeUpdate(edge2, props, UpdateType.ADDED);
+ TopoEdgeUpdate teu3 = new TopoEdgeUpdate(edge1, props, UpdateType.CHANGED);
+ TopoEdgeUpdate teu4 = new TopoEdgeUpdate(edge2, props, UpdateType.CHANGED);
+ TopoEdgeUpdate teu5 = new TopoEdgeUpdate(edge1, props, UpdateType.REMOVED);
+ TopoEdgeUpdate teu6 = new TopoEdgeUpdate(edge2, props, UpdateType.REMOVED);
+ topoedgeupdateList = new ArrayList<TopoEdgeUpdate>();
+ topoedgeupdateList.add(teu1);
+ topoedgeupdateList.add(teu2);
+ topoedgeupdateList.add(teu3);
+ topoedgeupdateList.add(teu4);
+ topoedgeupdateList.add(teu5);
+ topoedgeupdateList.add(teu6);
+ topoManagerImpl.edgeUpdate(topoedgeupdateList);
+ swMgr.addNodeConnectors(nc1);
+ topoManagerImpl.notifyNodeConnector(nc1, UpdateType.ADDED, propMap);
+ swMgr.addNodeConnectors(nc2);
+ topoManagerImpl.notifyNodeConnector(nc2, UpdateType.CHANGED, propMap);
+ Assert.assertTrue(topoManagerImpl.flushUpdateQueue(5000));
+ Assert.assertEquals(0, topoManagerImpl.getEdges().size());
+ topoManagerImpl.notifyNodeConnector(nc1, UpdateType.REMOVED, propMap);
+ topoManagerImpl.notifyNodeConnector(nc2, UpdateType.REMOVED, propMap);
+ Assert.assertTrue(topoManagerImpl.flushUpdateQueue(5000));
+ Assert.assertEquals(0, topoManagerImpl.getEdges().size());
+ }
}
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.queues.Queue;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.OpendaylightGroupStatisticsListener;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.OpendaylightMeterStatisticsListener;
import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.OpendaylightPortStatisticsListener;
* Internal {@link TransactionChainListener} joining all DS commits
* to Set of chained changes for prevent often DataStore touches.
*/
- public interface StatDataStoreOperation {
+ public abstract class StatDataStoreOperation {
+ public enum StatsManagerOperationType {
+ /**
+ * Operation will carry out work related to new node addition /
+ * update
+ */
+ NODE_UPDATE,
+ /**
+ * Operation will carry out work related to node removal
+ */
+ NODE_REMOVAL,
+ /**
+ * Operation will commit data to the operational data store
+ */
+ DATA_COMMIT_OPER_DS
+ }
+
+ private NodeId nodeId;
+ private StatsManagerOperationType operationType = StatsManagerOperationType.DATA_COMMIT_OPER_DS;
+
+ public StatDataStoreOperation(final StatsManagerOperationType operType, final NodeId id){
+ if(operType != null){
+ operationType = operType;
+ }
+ nodeId = id;
+ }
+
+ public final StatsManagerOperationType getType() {
+ return operationType;
+ }
+
+ public final NodeId getNodeId(){
+ return nodeId;
+ }
/**
- * Apply all read / write (put|merge) operation
- * for DataStore
+ * Apply all read / write (put|merge) operation for DataStore
+ *
* @param {@link ReadWriteTransaction} tx
*/
- void applyOperation(ReadWriteTransaction tx);
+ public abstract void applyOperation(ReadWriteTransaction tx);
}
import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager.TransactionCacheContainer;
import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation;
+import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation.StatsManagerOperationType;
import org.opendaylight.controller.md.statistics.manager.impl.helper.FlowComparator;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
return;
}
/* check flow Capable Node and write statistics */
- manager.enqueue(new StatDataStoreOperation() {
+ manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
@Override
public void applyOperation(final ReadWriteTransaction tx) {
return;
}
/* add flow's statistics */
- manager.enqueue(new StatDataStoreOperation() {
+ manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
@Override
public void applyOperation(final ReadWriteTransaction tx) {
final Optional<TransactionCacheContainer<?>> txContainer = getTransactionCacheContainer(transId, nodeId);
/* Notification for continue collecting statistics */
notifyToCollectNextStatistics(nodeIdent, transId);
}
+
});
}
import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager.TransactionCacheContainer;
import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation;
+import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation.StatsManagerOperationType;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionAware;
}
/* Don't block RPC Notification thread */
- manager.enqueue(new StatDataStoreOperation() {
+ manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
@Override
public void applyOperation(final ReadWriteTransaction tx) {
final InstanceIdentifier<Node> nodeIdent = InstanceIdentifier
}
/* Don't block RPC Notification thread */
- manager.enqueue(new StatDataStoreOperation() {
+ manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
@Override
public void applyOperation(final ReadWriteTransaction tx) {
/* Get and Validate TransactionCacheContainer */
}
/* Don't block RPC Notification thread */
- manager.enqueue(new StatDataStoreOperation() {
+ manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
@Override
public void applyOperation(final ReadWriteTransaction tx) {
import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager.TransactionCacheContainer;
import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation;
+import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation.StatsManagerOperationType;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
}
/* Don't block RPC Notification thread */
- manager.enqueue(new StatDataStoreOperation() {
+ manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
@Override
public void applyOperation(final ReadWriteTransaction tx) {
}
/* Don't block RPC Notification thread */
- manager.enqueue(new StatDataStoreOperation() {
+ manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
@Override
public void applyOperation(final ReadWriteTransaction tx) {
/* Get and Validate TransactionCacheContainer */
}
/* Don't block RPC Notification thread */
- manager.enqueue(new StatDataStoreOperation() {
+ manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
@Override
public void applyOperation(final ReadWriteTransaction tx) {
import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager.TransactionCacheContainer;
import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation;
+import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation.StatsManagerOperationType;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionAware;
}
/* Don't block RPC Notification thread */
- manager.enqueue(new StatDataStoreOperation() {
+ manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
@Override
public void applyOperation(final ReadWriteTransaction tx) {
import org.opendaylight.controller.md.statistics.manager.StatPermCollector.StatCapabTypes;
import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation;
+import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation.StatsManagerOperationType;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FeatureCapability;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeUpdated;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
Preconditions.checkNotNull(data, "SwitchFeatures data for {} can not be null!", keyIdent);
Preconditions.checkArgument(( ! keyIdent.isWildcarded()), "InstanceIdentifier is WildCarded!");
- manager.enqueue(new StatDataStoreOperation() {
+ manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.NODE_UPDATE,nodeIdent.firstKeyOf(Node.class, NodeKey.class).getId()) {
+
@Override
public void applyOperation(final ReadWriteTransaction tx) {
Preconditions.checkArgument(nodeIdent != null, "InstanceIdentifier can not be NULL!");
Preconditions.checkArgument(( ! nodeIdent.isWildcarded()),
"InstanceIdentifier {} is WildCarded!", nodeIdent);
- manager.enqueue(new StatDataStoreOperation() {
+ manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.NODE_REMOVAL,nodeIdent.firstKeyOf(Node.class, NodeKey.class).getId()) {
+
@Override
public void applyOperation(final ReadWriteTransaction tx) {
manager.disconnectedNodeUnregistration(nodeIdent);
import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager.TransactionCacheContainer;
import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation;
+import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation.StatsManagerOperationType;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionAware;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
final InstanceIdentifier<Node> nodeIdent = InstanceIdentifier.create(Nodes.class)
.child(Node.class, new NodeKey(nodeId));
/* Don't block RPC Notification thread */
- manager.enqueue(new StatDataStoreOperation() {
+ manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
@Override
public void applyOperation(final ReadWriteTransaction trans) {
final Optional<TransactionCacheContainer<?>> txContainer = getTransactionCacheContainer(transId, nodeId);
import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager.TransactionCacheContainer;
import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation;
+import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation.StatsManagerOperationType;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
return;
}
/* Don't block RPC Notification thread */
- manager.enqueue(new StatDataStoreOperation() {
+ manager.enqueue(new StatDataStoreOperation(StatsManagerOperationType.DATA_COMMIT_OPER_DS,nodeId) {
@Override
public void applyOperation(final ReadWriteTransaction trans) {
final List<FlowTableAndStatisticsMap> tableStats = new ArrayList<FlowTableAndStatisticsMap>(10);
import org.opendaylight.controller.md.statistics.manager.StatPermCollector.StatCapabTypes;
import org.opendaylight.controller.md.statistics.manager.StatRpcMsgManager;
import org.opendaylight.controller.md.statistics.manager.StatisticsManager;
+import org.opendaylight.controller.md.statistics.manager.StatisticsManager.StatDataStoreOperation.StatsManagerOperationType;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
private synchronized void cleanDataStoreOperQueue() {
// Drain all events, making sure any blocked threads are unblocked
while (! dataStoreOperQueue.isEmpty()) {
- dataStoreOperQueue.poll();
+ StatDataStoreOperation op = dataStoreOperQueue.poll();
+
+ // Execute the node removal clean up operation if queued in the
+ // operational queue.
+ if (op.getType() == StatsManagerOperationType.NODE_REMOVAL) {
+ try {
+ LOG.debug("Node {} disconnected. Cleaning internal data.",op.getNodeId());
+ op.applyOperation(null);
+ } catch (final Exception ex) {
+ LOG.warn("Unhandled exception while cleaning up internal data of node [{}]",op.getNodeId());
+ }
+ }
}
}