Bug 2158: Fixed TopologyManager for edge updates.
[controller.git] / opendaylight / adsal / topologymanager / implementation / src / main / java / org / opendaylight / controller / topologymanager / internal / TopologyManagerImpl.java
index 659ee7dd81ca83d91c013ceddb7017edca9a8b1b..e1a0ca1e7686c39a8e7563e6f50cb81254fe8d99 100644 (file)
@@ -34,6 +34,7 @@ import org.opendaylight.controller.sal.utils.IObjectReader;
 import org.opendaylight.controller.sal.utils.NodeConnectorCreator;
 import org.opendaylight.controller.sal.utils.Status;
 import org.opendaylight.controller.sal.utils.StatusCode;
+import org.opendaylight.controller.switchmanager.IInventoryListener;
 import org.opendaylight.controller.switchmanager.ISwitchManager;
 import org.opendaylight.controller.topologymanager.ITopologyManager;
 import org.opendaylight.controller.topologymanager.ITopologyManagerAware;
@@ -58,6 +59,8 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -76,6 +79,7 @@ public class TopologyManagerImpl implements
         IConfigurationContainerAware,
         IListenTopoUpdates,
         IObjectReader,
+        IInventoryListener,
         CommandProvider {
     protected static final String TOPOEDGESDB = "topologymanager.edgesDB";
     protected static final String TOPOHOSTSDB = "topologymanager.hostsDB";
@@ -83,6 +87,8 @@ public class TopologyManagerImpl implements
     protected static final String TOPOUSERLINKSDB = "topologymanager.userLinksDB";
     private static final String USER_LINKS_FILE_NAME = "userTopology.conf";
     private static final Logger log = LoggerFactory.getLogger(TopologyManagerImpl.class);
+    private static final long PENDING_UPDATE_TIMEOUT = 5000L;
+
     private ITopologyService topoService;
     private IClusterContainerServices clusterContainerService;
     private IConfigurationContainerService configurationService;
@@ -104,7 +110,79 @@ public class TopologyManagerImpl implements
     private BlockingQueue<TopoEdgeUpdate> notifyQ = new LinkedBlockingQueue<TopoEdgeUpdate>();
     private volatile Boolean shuttingDown = false;
     private Thread notifyThread;
+    private final Map<NodeConnector, List<PendingUpdateTask>> pendingUpdates =
+        new HashMap<NodeConnector, List<PendingUpdateTask>>();
+    private final BlockingQueue<TopoEdgeUpdate> updateQ =
+        new LinkedBlockingQueue<TopoEdgeUpdate>();
+    private Timer pendingTimer;
+    private Thread updateThread;
+
+    private class PendingEdgeUpdate extends TopoEdgeUpdate {
+        private PendingEdgeUpdate(Edge e, Set<Property> p, UpdateType t) {
+            super(e, p, t);
+        }
+    }
+
+    private class UpdateTopology implements Runnable {
+        @Override
+        public void run() {
+            log.trace("Start topology update thread");
+
+            while (!shuttingDown) {
+                try {
+                    List<TopoEdgeUpdate> list = new ArrayList<TopoEdgeUpdate>();
+                    TopoEdgeUpdate teu = updateQ.take();
+                    for (; teu != null; teu = updateQ.poll()) {
+                        list.add(teu);
+                    }
+
+                    if (!list.isEmpty()) {
+                        log.trace("Update edges: {}", list);
+                        doEdgeUpdate(list);
+                    }
+                } catch (InterruptedException e) {
+                    if (shuttingDown) {
+                        break;
+                    }
+                    log.warn("Topology update thread interrupted", e);
+                } catch (Exception e) {
+                    log.error("Exception on topology update thread", e);
+                }
+            }
+
+            log.trace("Exit topology update thread");
+        }
+    }
+
+    private class PendingUpdateTask extends TimerTask {
+        private final Edge  edge;
+        private final Set<Property>  props;
+        private final UpdateType  type;
+
+        private PendingUpdateTask(Edge e, Set<Property> p, UpdateType t) {
+            edge = e;
+            props = p;
+            type = t;
+        }
+
+        private NodeConnector getHeadNodeConnector() {
+            return edge.getHeadNodeConnector();
+        }
+
+        private void flush() {
+            log.info("Flush pending topology update: edge {}, type {}",
+                     edge, type);
+            updateQ.add(new PendingEdgeUpdate(edge, props, type));
+        }
 
+        @Override
+        public void run() {
+            if (removePendingEvent(this)) {
+                log.warn("Pending topology update timed out: edge{}, type {}",
+                         edge, type);
+            }
+        }
+    }
 
     void nonClusterObjectCreate() {
         edgesDB = new ConcurrentHashMap<Edge, Set<Property>>();
@@ -210,6 +288,8 @@ public class TopologyManagerImpl implements
         // Restore the shuttingDown status on init of the component
         shuttingDown = false;
         notifyThread = new Thread(new TopologyNotify(notifyQ));
+        pendingTimer = new Timer("Topology Pending Update Timer");
+        updateThread = new Thread(new UpdateTopology(), "Topology Update");
     }
 
     @SuppressWarnings({ "unchecked" })
@@ -277,6 +357,8 @@ public class TopologyManagerImpl implements
      *
      */
     void started() {
+        updateThread.start();
+
         // Start the batcher thread for the cluster wide topology updates
         notifyThread.start();
         // SollicitRefresh MUST be called here else if called at init
@@ -287,7 +369,9 @@ public class TopologyManagerImpl implements
 
     void stop() {
         shuttingDown = true;
+        updateThread.interrupt();
         notifyThread.interrupt();
+        pendingTimer.cancel();
     }
 
     /**
@@ -297,6 +381,9 @@ public class TopologyManagerImpl implements
      *
      */
     void destroy() {
+        updateQ.clear();
+        updateThread = null;
+        pendingTimer = null;
         notifyQ.clear();
         notifyThread = null;
     }
@@ -571,17 +658,100 @@ public class TopologyManagerImpl implements
         return (switchManager.doesNodeConnectorExist(head));
     }
 
+    private void addPendingEvent(Edge e, Set<Property> p, UpdateType t) {
+        NodeConnector head = e.getHeadNodeConnector();
+        PendingUpdateTask task = new PendingUpdateTask(e, p, t);
+        synchronized (pendingUpdates) {
+            List<PendingUpdateTask> list = pendingUpdates.get(head);
+            if (list == null) {
+                list = new LinkedList<PendingUpdateTask>();
+                pendingUpdates.put(head, list);
+            }
+            list.add(task);
+            pendingTimer.schedule(task, PENDING_UPDATE_TIMEOUT);
+        }
+    }
+
+    private boolean enqueueEventIfPending(Edge e, Set<Property> p, UpdateType t) {
+        NodeConnector head = e.getHeadNodeConnector();
+        synchronized (pendingUpdates) {
+            List<PendingUpdateTask> list = pendingUpdates.get(head);
+            if (list != null) {
+                log.warn("Enqueue edge update: edge {}, type {}", e, t);
+                PendingUpdateTask task = new PendingUpdateTask(e, p, t);
+                list.add(task);
+                pendingTimer.schedule(task, PENDING_UPDATE_TIMEOUT);
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    private boolean removePendingEvent(PendingUpdateTask t) {
+        t.cancel();
+        NodeConnector head = t.getHeadNodeConnector();
+        boolean removed = false;
+
+        synchronized (pendingUpdates) {
+            List<PendingUpdateTask> list = pendingUpdates.get(head);
+            if (list != null) {
+                removed = list.remove(t);
+                if (list.isEmpty()) {
+                    pendingUpdates.remove(head);
+                }
+            }
+        }
+
+        return removed;
+    }
+
+    private void removePendingEvent(NodeConnector head, boolean doFlush) {
+        List<PendingUpdateTask> list;
+        synchronized (pendingUpdates) {
+            list = pendingUpdates.remove(head);
+        }
+
+        if (list != null) {
+            for (PendingUpdateTask task : list) {
+                if (task.cancel() && doFlush) {
+                    task.flush();
+                }
+            }
+            pendingTimer.purge();
+        }
+    }
+
     private TopoEdgeUpdate edgeUpdate(Edge e, UpdateType type, Set<Property> props) {
-        switch (type) {
-        case ADDED:
+        return edgeUpdate(e, type, props, false);
+    }
 
+    private TopoEdgeUpdate edgeUpdate(Edge e, UpdateType type, Set<Property> props, boolean isPending) {
+        if (!type.equals(UpdateType.ADDED) &&
+            enqueueEventIfPending(e, props, type)) {
+            return null;
+        }
 
+        switch (type) {
+        case ADDED:
             if (this.edgesDB.containsKey(e)) {
                 // Avoid redundant updates (e.g. cluster switch-over) as notifications trigger expensive tasks
                 log.trace("Skipping redundant edge addition: {}", e);
                 return null;
             }
 
+            // Ensure that head node connector exists
+            if (!isPending) {
+                if (headNodeConnectorExist(e)) {
+                    removePendingEvent(e.getHeadNodeConnector(), true);
+                } else {
+                    log.warn("Ignore edge that contains invalid node connector: {}",
+                             e);
+                    addPendingEvent(e, props, type);
+                    return null;
+                }
+            }
+
             // Make sure the props are non-null or create a copy
             if (props == null) {
                 props = new HashSet<Property>();
@@ -589,13 +759,6 @@ public class TopologyManagerImpl implements
                 props = new HashSet<Property>(props);
             }
 
-
-            // Ensure that head node connector exists
-            if (!headNodeConnectorExist(e)) {
-                log.warn("Ignore edge that contains invalid node connector: {}", e);
-                return null;
-            }
-
             // Check if nodeConnectors of the edge were correctly categorized
             // by protocol plugin
             crossCheckNodeConnectors(e);
@@ -702,16 +865,16 @@ public class TopologyManagerImpl implements
         return new TopoEdgeUpdate(e, props, type);
     }
 
-    @Override
-    public void edgeUpdate(List<TopoEdgeUpdate> topoedgeupdateList) {
+    private void doEdgeUpdate(List<TopoEdgeUpdate> topoedgeupdateList) {
         List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
-        for (int i = 0; i < topoedgeupdateList.size(); i++) {
-            Edge e = topoedgeupdateList.get(i).getEdge();
-            Set<Property> p = topoedgeupdateList.get(i).getProperty();
-            UpdateType type = topoedgeupdateList.get(i).getUpdateType();
-            TopoEdgeUpdate teu = edgeUpdate(e, type, p);
-            if (teu != null) {
-                teuList.add(teu);
+        for (TopoEdgeUpdate teu : topoedgeupdateList) {
+            boolean isPending = (teu instanceof PendingEdgeUpdate);
+            Edge e = teu.getEdge();
+            Set<Property> p = teu.getProperty();
+            UpdateType type = teu.getUpdateType();
+            TopoEdgeUpdate update = edgeUpdate(e, type, p, isPending);
+            if (update != null) {
+                teuList.add(update);
             }
         }
 
@@ -727,6 +890,11 @@ public class TopologyManagerImpl implements
         }
     }
 
+    @Override
+    public void edgeUpdate(List<TopoEdgeUpdate> topoedgeupdateList) {
+        updateQ.addAll(topoedgeupdateList);
+    }
+
     private Edge getReverseLinkTuple(TopologyUserLinkConfig link) {
         TopologyUserLinkConfig rLink = new TopologyUserLinkConfig(
                 link.getName(), link.getDstNodeConnector(), link.getSrcNodeConnector());
@@ -934,6 +1102,19 @@ public class TopologyManagerImpl implements
         notifyQ.add(upd);
     }
 
+    @Override
+    public void notifyNode(Node node, UpdateType type, Map<String, Property> propMap) {
+        // NOP
+    }
+
+    @Override
+    public void notifyNodeConnector(NodeConnector nc, UpdateType type, Map<String, Property> propMap) {
+        // Remove pending edge updates for the given node connector.
+        // Pending events should be notified if the node connector exists.
+        boolean doFlush = !type.equals(UpdateType.REMOVED);
+        removePendingEvent(nc, doFlush);
+    }
+
     @Override
     public void entryCreated(final Object key, final String cacheName, final boolean originLocal) {
         if (cacheName.equals(TOPOEDGESDB)) {
@@ -1094,4 +1275,35 @@ public class TopologyManagerImpl implements
         return result;
     }
 
+    // Only for unit test.
+    void startTest() {
+        pendingTimer = new Timer("Topology Pending Update Timer");
+        updateThread = new Thread(new UpdateTopology(), "Topology Update");
+        updateThread.start();
+    }
+
+    void stopTest() {
+        shuttingDown = true;
+        updateThread.interrupt();
+        pendingTimer.cancel();
+    }
+
+    boolean flushUpdateQueue(long timeout) {
+        long limit = System.currentTimeMillis() + timeout;
+        long cur;
+        do {
+            if (updateQ.peek() == null) {
+                return true;
+            }
+
+            try {
+                Thread.sleep(100);
+            } catch (InterruptedException e) {
+                break;
+            }
+            cur = System.currentTimeMillis();
+        } while (cur < limit);
+
+        return false;
+    }
 }