Moving the InventoryListener out of AbstractServiceInstance and into a common place...
[ovsdb.git] / openstack / net-virt-providers / src / main / java / org / opendaylight / ovsdb / openstack / netvirt / providers / openflow13 / PipelineOrchestratorImpl.java
index 3b5165c7d732abe99d7279deff6a312e85da58fd..5264c93b9064c2bd5c5a7c1f6a7e234d4190eb54 100644 (file)
@@ -12,14 +12,43 @@ package org.opendaylight.ovsdb.openstack.netvirt.providers.openflow13;
 
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
 
+import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRemoved;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorUpdated;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRemoved;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeUpdated;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.OpendaylightInventoryListener;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.InstanceIdentifierBuilder;
 import org.osgi.framework.ServiceReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
 
-public class PipelineOrchestratorImpl implements PipelineOrchestrator {
+public class PipelineOrchestratorImpl implements PipelineOrchestrator, OpendaylightInventoryListener, TransactionChainListener {
 
+    private static final Logger logger = LoggerFactory.getLogger(PipelineOrchestratorImpl.class);
     private List<Service> staticPipeline = Lists.newArrayList(
                                                                 Service.CLASSIFIER,
                                                                 Service.ARP_RESPONDER,
@@ -33,7 +62,9 @@ public class PipelineOrchestratorImpl implements PipelineOrchestrator {
                                                                 Service.OUTBOUND_NAT
                                                               );
     Map<Service, AbstractServiceInstance> serviceRegistry = Maps.newConcurrentMap();
-
+    private volatile MdsalConsumer mdsalConsumer;
+    private volatile BlockingQueue<String> queue;
+    private ExecutorService eventHandler;
     public PipelineOrchestratorImpl() {
     }
 
@@ -57,4 +88,119 @@ public class PipelineOrchestratorImpl implements PipelineOrchestrator {
         if (service == null) return null;
         return serviceRegistry.get(service);
     }
+
+    public void init() {
+        eventHandler = Executors.newSingleThreadExecutor();
+        this.queue = new LinkedBlockingQueue<String>();
+        NotificationProviderService notificationService = mdsalConsumer.getNotificationService();
+        if (notificationService != null) {
+            notificationService.registerNotificationListener(this);
+        }
+    }
+    public void start() {
+        eventHandler.submit(new Runnable()  {
+            @Override
+            public void run() {
+                try {
+                    while (true) {
+                        String nodeId = queue.take();
+                        for (Service service : staticPipeline) {
+                            AbstractServiceInstance serviceInstance = getServiceInstance(service);
+                            if (!serviceInstance.isBridgeInPipeline(nodeId)) {
+                                logger.debug("Bridge {} is not in pipeline", nodeId);
+                                continue;
+                            }
+
+                            serviceInstance.programDefaultPipelineRule(nodeId);
+                        }
+                    }
+                } catch (Exception e) {
+                    logger.warn("Processing interrupted, terminating ", e);
+                    e.printStackTrace();
+                }
+
+                while (!queue.isEmpty()) {
+                    queue.poll();
+                }
+                queue = null;
+            }
+        });
+    }
+
+    public void stop() {
+        queue.clear();
+        eventHandler.shutdownNow();
+    }
+
+    void enqueue(String nodeId) {
+        try {
+            queue.put(new String(nodeId));
+        } catch (InterruptedException e) {
+            logger.warn("Failed to enqueue operation {}", nodeId, e);
+        }
+    }
+
+
+    /**
+     * Process the Node update notification. Check for Openflow node and make sure if the bridge is part of the Pipeline before
+     * programming the Pipeline specific flows.
+     */
+    @Override
+    public void onNodeUpdated(NodeUpdated nodeUpdated) {
+        NodeRef ref = nodeUpdated.getNodeRef();
+        InstanceIdentifier<Node> identifier = (InstanceIdentifier<Node>) ref.getValue();
+        logger.debug("Node Update received for : "+identifier.toString());
+        final NodeKey key = identifier.firstKeyOf(Node.class, NodeKey.class);
+        final String nodeId = key.getId().getValue();
+        if (key != null && key.getId().getValue().contains("openflow")) {
+            InstanceIdentifierBuilder<Node> builder = ((InstanceIdentifier<Node>) ref.getValue()).builder();
+            InstanceIdentifierBuilder<FlowCapableNode> augmentation = builder.augmentation(FlowCapableNode.class);
+            final InstanceIdentifier<FlowCapableNode> path = augmentation.build();
+            BindingTransactionChain txChain = mdsalConsumer.getDataBroker().createTransactionChain(this);
+            CheckedFuture readFuture = txChain.newReadWriteTransaction().read(LogicalDatastoreType.OPERATIONAL, path);
+            Futures.addCallback(readFuture, new FutureCallback<Optional<? extends DataObject>>() {
+                @Override
+                public void onSuccess(Optional<? extends DataObject> optional) {
+                    if (!optional.isPresent()) {
+                        enqueue(nodeId);
+                    }
+                }
+
+                @Override
+                public void onFailure(Throwable throwable) {
+                    logger.debug(String.format("Can't retrieve node data for node %s. Writing node data with table0.", nodeId));
+                    enqueue(nodeId);
+                }
+            });
+        }
+    }
+
+    @Override
+    public void onTransactionChainFailed(final TransactionChain<?, ?> chain, final AsyncTransaction<?, ?> transaction,
+            final Throwable cause) {
+        logger.error("Failed to export Flow Capable Inventory, Transaction {} failed.",transaction.getIdentifier(),cause);
+    }
+
+    @Override
+    public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
+    }
+
+    @Override
+    public void onNodeConnectorRemoved(NodeConnectorRemoved arg0) {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void onNodeConnectorUpdated(NodeConnectorUpdated arg0) {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void onNodeRemoved(NodeRemoved arg0) {
+        // TODO Auto-generated method stub
+
+    }
+
 }