Add support for openflow node callbacks
[ovsdb.git] / openstack / net-virt-providers / src / main / java / org / opendaylight / ovsdb / openstack / netvirt / providers / openflow13 / PipelineOrchestratorImpl.java
index 2a47cd589c99e0cd02fea7429850aaba52f75ab5..5bf95d76e0da7bea774ac7480e13c8fb01dba773 100644 (file)
@@ -18,11 +18,14 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
+import org.opendaylight.ovsdb.openstack.netvirt.api.Action;
+import org.opendaylight.ovsdb.openstack.netvirt.api.NodeCacheListener;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
 import org.osgi.framework.ServiceReference;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class PipelineOrchestratorImpl implements PipelineOrchestrator {
+public class PipelineOrchestratorImpl implements NodeCacheListener, PipelineOrchestrator {
 
     private static final Logger logger = LoggerFactory.getLogger(PipelineOrchestratorImpl.class);
     private List<Service> staticPipeline = Lists.newArrayList(
@@ -39,7 +42,7 @@ public class PipelineOrchestratorImpl implements PipelineOrchestrator {
             Service.L2_FORWARDING
     );
     Map<Service, AbstractServiceInstance> serviceRegistry = Maps.newConcurrentMap();
-    private volatile BlockingQueue<String> queue;
+    private volatile BlockingQueue<Node> queue;
     private ExecutorService eventHandler;
     public PipelineOrchestratorImpl() {
     }
@@ -68,7 +71,7 @@ public class PipelineOrchestratorImpl implements PipelineOrchestrator {
 
     public void init() {
         eventHandler = Executors.newSingleThreadExecutor();
-        this.queue = new LinkedBlockingQueue<String>();
+        this.queue = new LinkedBlockingQueue<Node>();
         logger.info(">>>>> init PipelineOrchestratorImpl");
     }
 
@@ -78,20 +81,20 @@ public class PipelineOrchestratorImpl implements PipelineOrchestrator {
             public void run() {
                 try {
                     while (true) {
-                        String nodeId = queue.take();
+                        Node node = queue.take();
                         /*
                          * Since we are hooking on OpendaylightInventoryListener and as observed in
                          * Bug 1997 multiple Threads trying to write to a same table at the same time
                          * causes programming issues. Hence delaying the programming by a second to
                          * avoid the clash. This hack/workaround should be removed once Bug 1997 is resolved.
                          */
-                        logger.info(">>>>> dequeue: {}", nodeId);
+                        logger.info(">>>>> dequeue: {}", node);
                         Thread.sleep(1000);
                         for (Service service : staticPipeline) {
                             AbstractServiceInstance serviceInstance = getServiceInstance(service);
                             logger.info("pipeline: {} - {}", service, serviceInstance);
                             if (serviceInstance != null) {
-                                serviceInstance.programDefaultPipelineRule(nodeId);
+                                serviceInstance.programDefaultPipelineRule(node);
                             }
                         }
                     }
@@ -113,12 +116,17 @@ public class PipelineOrchestratorImpl implements PipelineOrchestrator {
     }
 
     @Override
-    public void enqueue(String nodeId) {
-        logger.info(">>>>> enqueue: {}", nodeId);
+    public void enqueue(Node node) {
+        logger.info(">>>>> enqueue: {}", node);
         try {
-            queue.put(new String(nodeId));
+            queue.put(node);
         } catch (InterruptedException e) {
-            logger.warn("Failed to enqueue operation {}", nodeId, e);
+            logger.warn("Failed to enqueue operation {}", node, e);
         }
     }
+
+    @Override
+    public void notifyNode(Node node, Action action) {
+        enqueue(node);
+    }
 }