Squashed commit of the following:
[ovsdb.git] / openstack / net-virt-providers / src / main / java / org / opendaylight / ovsdb / openstack / netvirt / providers / openflow13 / PipelineOrchestratorImpl.java
index 696601b1b7236f4ebcd4bb702829de315e973e12..3baefebc4e14fa1879b8d1c21ede9c63c496bf7a 100644 (file)
 
 package org.opendaylight.ovsdb.openstack.netvirt.providers.openflow13;
 
-import java.util.List;
-import java.util.Map;
-
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.opendaylight.ovsdb.openstack.netvirt.MdsalUtils;
+import org.opendaylight.ovsdb.openstack.netvirt.api.Action;
+import org.opendaylight.ovsdb.openstack.netvirt.api.NodeCacheListener;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.osgi.framework.ServiceReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-public class PipelineOrchestratorImpl implements PipelineOrchestrator {
+public class PipelineOrchestratorImpl implements NodeCacheListener, PipelineOrchestrator {
 
+    private static final Logger logger = LoggerFactory.getLogger(PipelineOrchestratorImpl.class);
     private List<Service> staticPipeline = Lists.newArrayList(
-                                                                Service.CLASSIFIER,
-                                                                Service.DIRECTOR,
-                                                                Service.ARP_RESPONDER,
-                                                                Service.INBOUND_NAT,
-                                                                Service.INGRESS_ACL,
-                                                                Service.LOAD_BALANCER,
-                                                                Service.ROUTING,
-                                                                Service.L2_REWRITE,
-                                                                Service.L2_FORWARDING,
-                                                                Service.EGRESS_ACL,
-                                                                Service.OUTBOUND_NAT
-                                                              );
+            Service.CLASSIFIER,
+            Service.ARP_RESPONDER,
+            Service.INBOUND_NAT,
+            Service.EGRESS_ACL,
+            Service.LOAD_BALANCER,
+            Service.ROUTING,
+            Service.L3_FORWARDING,
+            Service.L2_REWRITE,
+            Service.INGRESS_ACL,
+            Service.OUTBOUND_NAT,
+            Service.L2_FORWARDING
+    );
     Map<Service, AbstractServiceInstance> serviceRegistry = Maps.newConcurrentMap();
-
+    private volatile BlockingQueue<Node> queue;
+    private ExecutorService eventHandler;
     public PipelineOrchestratorImpl() {
     }
-    @Override
-    public void registerService(Service service,
-            AbstractServiceInstance serviceInstance) {
+
+    public void registerService(final ServiceReference ref, AbstractServiceInstance serviceInstance){
+        Service service = (Service)ref.getProperty(AbstractServiceInstance.SERVICE_PROPERTY);
+        logger.info("registerService {} - {}", serviceInstance, service);
         serviceRegistry.put(service, serviceInstance);
     }
 
-    @Override
-    public void unregisterService(Service service) {
-        serviceRegistry.remove(service);
+    public void unregisterService(final ServiceReference ref) {
+        serviceRegistry.remove(ref.getProperty(AbstractServiceInstance.SERVICE_PROPERTY));
     }
-
     @Override
     public Service getNextServiceInPipeline(Service service) {
         int index = staticPipeline.indexOf(service);
@@ -58,4 +69,71 @@ public class PipelineOrchestratorImpl implements PipelineOrchestrator {
         if (service == null) return null;
         return serviceRegistry.get(service);
     }
+
+    public void init() {
+        eventHandler = Executors.newSingleThreadExecutor();
+        this.queue = new LinkedBlockingQueue<Node>();
+        logger.info(">>>>> init PipelineOrchestratorImpl");
+    }
+
+    public void start() {
+        eventHandler.submit(new Runnable()  {
+            @Override
+            public void run() {
+                try {
+                    while (true) {
+                        Node node = queue.take();
+                        /*
+                         * Since we are hooking on OpendaylightInventoryListener and as observed in
+                         * Bug 1997 multiple Threads trying to write to a same table at the same time
+                         * causes programming issues. Hence delaying the programming by a second to
+                         * avoid the clash. This hack/workaround should be removed once Bug 1997 is resolved.
+                         */
+                        logger.info(">>>>> dequeue: {}", node);
+                        Thread.sleep(1000);
+                        for (Service service : staticPipeline) {
+                            AbstractServiceInstance serviceInstance = getServiceInstance(service);
+                            //logger.info("pipeline: {} - {}", service, serviceInstance);
+                            if (serviceInstance != null) {
+                                if (MdsalUtils.getBridge(node) != null) {
+                                    serviceInstance.programDefaultPipelineRule(node);
+                                }
+                            }
+                        }
+                    }
+                } catch (Exception e) {
+                    logger.warn("Processing interrupted, terminating ", e);
+                }
+
+                while (!queue.isEmpty()) {
+                    queue.poll();
+                }
+                queue = null;
+            }
+        });
+    }
+
+    public void stop() {
+        queue.clear();
+        eventHandler.shutdownNow();
+    }
+
+    @Override
+    public void enqueue(Node node) {
+        logger.info(">>>>> enqueue: {}", node);
+        try {
+            queue.put(node);
+        } catch (InterruptedException e) {
+            logger.warn("Failed to enqueue operation {}", node, e);
+        }
+    }
+
+    @Override
+    public void notifyNode(Node node, Action action) {
+        if (action == Action.ADD) {
+            enqueue(node);
+        } else {
+            logger.info("update ignored: {}", node);
+        }
+    }
 }