Squashed commit of the following:
[ovsdb.git] / openstack / net-virt-providers / src / main / java / org / opendaylight / ovsdb / openstack / netvirt / providers / openflow13 / PipelineOrchestratorImpl.java
index 94b6555c4e64e8f09dce48d4f9e3bbbd045a43cf..3baefebc4e14fa1879b8d1c21ede9c63c496bf7a 100644 (file)
 
 package org.opendaylight.ovsdb.openstack.netvirt.providers.openflow13;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
-
-import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
-import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRemoved;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorUpdated;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRemoved;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeUpdated;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.OpendaylightInventoryListener;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
-import org.opendaylight.yangtools.yang.binding.DataObject;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.InstanceIdentifierBuilder;
+import org.opendaylight.ovsdb.openstack.netvirt.MdsalUtils;
+import org.opendaylight.ovsdb.openstack.netvirt.api.Action;
+import org.opendaylight.ovsdb.openstack.netvirt.api.NodeCacheListener;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
 import org.osgi.framework.ServiceReference;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-
-public class PipelineOrchestratorImpl implements PipelineOrchestrator, OpendaylightInventoryListener, TransactionChainListener {
+public class PipelineOrchestratorImpl implements NodeCacheListener, PipelineOrchestrator {
 
     private static final Logger logger = LoggerFactory.getLogger(PipelineOrchestratorImpl.class);
     private List<Service> staticPipeline = Lists.newArrayList(
-                                                                Service.CLASSIFIER,
-                                                                Service.ARP_RESPONDER,
-                                                                Service.INBOUND_NAT,
-                                                                Service.INGRESS_ACL,
-                                                                Service.LOAD_BALANCER,
-                                                                Service.ROUTING,
-                                                                Service.L3_FORWARDING,
-                                                                Service.L2_REWRITE,
-                                                                Service.EGRESS_ACL,
-                                                                Service.OUTBOUND_NAT,
-                                                                Service.L2_FORWARDING
-                                                              );
+            Service.CLASSIFIER,
+            Service.ARP_RESPONDER,
+            Service.INBOUND_NAT,
+            Service.EGRESS_ACL,
+            Service.LOAD_BALANCER,
+            Service.ROUTING,
+            Service.L3_FORWARDING,
+            Service.L2_REWRITE,
+            Service.INGRESS_ACL,
+            Service.OUTBOUND_NAT,
+            Service.L2_FORWARDING
+    );
     Map<Service, AbstractServiceInstance> serviceRegistry = Maps.newConcurrentMap();
-    private volatile MdsalConsumer mdsalConsumer;
-    private volatile BlockingQueue<String> queue;
+    private volatile BlockingQueue<Node> queue;
     private ExecutorService eventHandler;
     public PipelineOrchestratorImpl() {
     }
 
     public void registerService(final ServiceReference ref, AbstractServiceInstance serviceInstance){
         Service service = (Service)ref.getProperty(AbstractServiceInstance.SERVICE_PROPERTY);
+        logger.info("registerService {} - {}", serviceInstance, service);
         serviceRegistry.put(service, serviceInstance);
     }
 
@@ -92,29 +72,33 @@ public class PipelineOrchestratorImpl implements PipelineOrchestrator, Opendayli
 
     public void init() {
         eventHandler = Executors.newSingleThreadExecutor();
-        this.queue = new LinkedBlockingQueue<String>();
-        NotificationProviderService notificationService = mdsalConsumer.getNotificationService();
-        if (notificationService != null) {
-            notificationService.registerNotificationListener(this);
-        }
+        this.queue = new LinkedBlockingQueue<Node>();
+        logger.info(">>>>> init PipelineOrchestratorImpl");
     }
+
     public void start() {
         eventHandler.submit(new Runnable()  {
             @Override
             public void run() {
                 try {
                     while (true) {
-                        String nodeId = queue.take();
+                        Node node = queue.take();
                         /*
                          * Since we are hooking on OpendaylightInventoryListener and as observed in
                          * Bug 1997 multiple Threads trying to write to a same table at the same time
                          * causes programming issues. Hence delaying the programming by a second to
                          * avoid the clash. This hack/workaround should be removed once Bug 1997 is resolved.
                          */
+                        logger.info(">>>>> dequeue: {}", node);
                         Thread.sleep(1000);
                         for (Service service : staticPipeline) {
                             AbstractServiceInstance serviceInstance = getServiceInstance(service);
-                            serviceInstance.programDefaultPipelineRule(nodeId);
+                            //logger.info("pipeline: {} - {}", service, serviceInstance);
+                            if (serviceInstance != null) {
+                                if (MdsalUtils.getBridge(node) != null) {
+                                    serviceInstance.programDefaultPipelineRule(node);
+                                }
+                            }
                         }
                     }
                 } catch (Exception e) {
@@ -134,75 +118,22 @@ public class PipelineOrchestratorImpl implements PipelineOrchestrator, Opendayli
         eventHandler.shutdownNow();
     }
 
-    void enqueue(String nodeId) {
+    @Override
+    public void enqueue(Node node) {
+        logger.info(">>>>> enqueue: {}", node);
         try {
-            queue.put(new String(nodeId));
+            queue.put(node);
         } catch (InterruptedException e) {
-            logger.warn("Failed to enqueue operation {}", nodeId, e);
+            logger.warn("Failed to enqueue operation {}", node, e);
         }
     }
 
-
-    /**
-     * Process the Node update notification. Check for Openflow node and make sure if the bridge is part of the Pipeline before
-     * programming the Pipeline specific flows.
-     */
     @Override
-    public void onNodeUpdated(NodeUpdated nodeUpdated) {
-        NodeRef ref = nodeUpdated.getNodeRef();
-        InstanceIdentifier<Node> identifier = (InstanceIdentifier<Node>) ref.getValue();
-        logger.debug("Node Update received for : "+identifier.toString());
-        final NodeKey key = identifier.firstKeyOf(Node.class, NodeKey.class);
-        final String nodeId = key.getId().getValue();
-        if (key != null && key.getId().getValue().contains("openflow")) {
-            InstanceIdentifierBuilder<Node> builder = ((InstanceIdentifier<Node>) ref.getValue()).builder();
-            InstanceIdentifierBuilder<FlowCapableNode> augmentation = builder.augmentation(FlowCapableNode.class);
-            final InstanceIdentifier<FlowCapableNode> path = augmentation.build();
-            BindingTransactionChain txChain = mdsalConsumer.getDataBroker().createTransactionChain(this);
-            CheckedFuture readFuture = txChain.newReadWriteTransaction().read(LogicalDatastoreType.OPERATIONAL, path);
-            Futures.addCallback(readFuture, new FutureCallback<Optional<? extends DataObject>>() {
-                @Override
-                public void onSuccess(Optional<? extends DataObject> optional) {
-                    if (!optional.isPresent()) {
-                        enqueue(nodeId);
-                    }
-                }
-
-                @Override
-                public void onFailure(Throwable throwable) {
-                    logger.debug(String.format("Can't retrieve node data for node %s. Writing node data with table0.", nodeId));
-                    enqueue(nodeId);
-                }
-            });
+    public void notifyNode(Node node, Action action) {
+        if (action == Action.ADD) {
+            enqueue(node);
+        } else {
+            logger.info("update ignored: {}", node);
         }
     }
-
-    @Override
-    public void onTransactionChainFailed(final TransactionChain<?, ?> chain, final AsyncTransaction<?, ?> transaction,
-            final Throwable cause) {
-        logger.error("Failed to export Flow Capable Inventory, Transaction {} failed.",transaction.getIdentifier(),cause);
-    }
-
-    @Override
-    public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
-    }
-
-    @Override
-    public void onNodeConnectorRemoved(NodeConnectorRemoved arg0) {
-        // TODO Auto-generated method stub
-
-    }
-
-    @Override
-    public void onNodeConnectorUpdated(NodeConnectorUpdated arg0) {
-        // TODO Auto-generated method stub
-
-    }
-
-    @Override
-    public void onNodeRemoved(NodeRemoved arg0) {
-        // TODO Auto-generated method stub
-
-    }
-
 }