Moving the InventoryListener out of AbstractServiceInstance and into a common place... 43/10843/3
authorMadhu Venugopal <mavenugo@gmail.com>
Fri, 5 Sep 2014 10:27:11 +0000 (03:27 -0700)
committerMadhu Venugopal <mavenugo@gmail.com>
Fri, 5 Sep 2014 12:22:47 +0000 (05:22 -0700)
Change-Id: Ic443bfcb160122f9e5e943aa64c36a73034a6c88
Signed-off-by: Madhu Venugopal <mavenugo@gmail.com>
openstack/net-virt-providers/src/main/java/org/opendaylight/ovsdb/openstack/netvirt/providers/Activator.java
openstack/net-virt-providers/src/main/java/org/opendaylight/ovsdb/openstack/netvirt/providers/openflow13/AbstractServiceInstance.java
openstack/net-virt-providers/src/main/java/org/opendaylight/ovsdb/openstack/netvirt/providers/openflow13/PipelineOrchestratorImpl.java

index bdedccb8eb02cc43f353e28f274e189f2c4c9761..cffedb68bd46acafc44613f6ed87374d73384fdb 100644 (file)
@@ -183,6 +183,7 @@ public class Activator extends ComponentActivatorAbstractBase {
             c.add(createServiceDependency()
                            .setService(AbstractServiceInstance.class)
                            .setCallbacks("registerService", "unregisterService"));
+            c.add(createServiceDependency().setService(MdsalConsumer.class).setRequired(true));
         }
 
         if (AbstractServiceInstance.class.isAssignableFrom((Class) imp)) {
index 5c071454abac97cbae7f4ff2990a3f8ca03e185e..a9fc778431b09fa36425e12010dca8942009c3ac 100644 (file)
 package org.opendaylight.ovsdb.openstack.netvirt.providers.openflow13;
 
 import java.util.List;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.LinkedBlockingDeque;
 
-import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
 import org.opendaylight.ovsdb.utils.mdsal.openflow.InstructionUtils;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
@@ -38,20 +31,11 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.M
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.instruction.list.Instruction;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.instruction.list.InstructionBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.instruction.list.InstructionKey;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRemoved;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorUpdated;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRemoved;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeUpdated;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.OpendaylightInventoryListener;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
-import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.InstanceIdentifierBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -59,8 +43,6 @@ import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
 
 /**
  * Any ServiceInstance class that extends AbstractServiceInstance to be a part of the pipeline
@@ -70,7 +52,7 @@ import com.google.common.util.concurrent.Futures;
  *    use it in any matching flows that needs to be further processed by next service in the pipeline.
  *
  */
-public abstract class AbstractServiceInstance implements OpendaylightInventoryListener, Runnable, TransactionChainListener {
+public abstract class AbstractServiceInstance {
     public static final String SERVICE_PROPERTY ="serviceProperty";
     private static final Logger logger = LoggerFactory.getLogger(AbstractServiceInstance.class);
     public static final String OPENFLOW = "openflow:";
@@ -81,10 +63,6 @@ public abstract class AbstractServiceInstance implements OpendaylightInventoryLi
     // Concrete Service that this AbstractServiceInstance represent
     private Service service;
 
-    // Process Notification in its own thread
-    Thread thread = null;
-    private final BlockingQueue<String> queue = new LinkedBlockingDeque<>();
-
     public AbstractServiceInstance (Service service) {
         this.service = service;
     }
@@ -104,20 +82,6 @@ public abstract class AbstractServiceInstance implements OpendaylightInventoryLi
         this.service = service;
     }
 
-    public void start() {
-        // Register for OpenFlow bridge/node Creation notification.
-        NotificationProviderService notificationService = mdsalConsumer.getNotificationService();
-        if (notificationService != null) {
-            notificationService.registerNotificationListener(this);
-        }
-
-        // Never block a Notification thread. Process the notification in its own Thread.
-        thread = new Thread(this);
-        thread.setDaemon(true);
-        thread.setName("AbstractServiceInstance-"+service.toString());
-        thread.start();
-    }
-
     public NodeBuilder createNodeBuilder(String nodeId) {
         NodeBuilder builder = new NodeBuilder();
         builder.setId(new NodeId(nodeId));
@@ -279,90 +243,4 @@ public abstract class AbstractServiceInstance implements OpendaylightInventoryLi
         flowBuilder.setIdleTimeout(0);
         writeFlow(flowBuilder, nodeBuilder);
     }
-
-    @Override
-    public void onNodeConnectorRemoved(NodeConnectorRemoved nodeConector) {
-    }
-
-    @Override
-    public void onNodeConnectorUpdated(NodeConnectorUpdated nodeConnector) {
-    }
-
-    @Override
-    public void onNodeRemoved(NodeRemoved node) {
-    }
-
-
-    @Override
-    public void run() {
-        try {
-            for (; ; ) {
-                String nodeId = queue.take();
-                this.programDefaultPipelineRule(nodeId);
-            }
-        } catch (InterruptedException e) {
-            logger.warn("Processing interrupted, terminating", e);
-        }
-
-        while (!queue.isEmpty()) {
-            queue.poll();
-        }
-
-    }
-
-    void enqueue(final String nodeId) {
-        try {
-            queue.put(nodeId);
-        } catch (InterruptedException e) {
-            logger.warn("Failed to enqueue operation {}", nodeId, e);
-        }
-    }
-
-    /**
-     * Process the Node update notification. Check for Openflow node and make sure if the bridge is part of the Pipeline before
-     * programming the Pipeline specific flows.
-     */
-    @Override
-    public void onNodeUpdated(NodeUpdated nodeUpdated) {
-        NodeRef ref = nodeUpdated.getNodeRef();
-        InstanceIdentifier<Node> identifier = (InstanceIdentifier<Node>) ref.getValue();
-        logger.info("GOT NOTIFICATION FOR "+identifier.toString());
-        final NodeKey key = identifier.firstKeyOf(Node.class, NodeKey.class);
-        final String nodeId = key.getId().getValue();
-        if (!this.isBridgeInPipeline(nodeId)) {
-            logger.debug("Bridge {} is not in pipeline", nodeId);
-            return;
-        }
-        if (key != null && key.getId().getValue().contains("openflow")) {
-            InstanceIdentifierBuilder<Node> builder = ((InstanceIdentifier<Node>) ref.getValue()).builder();
-            InstanceIdentifierBuilder<FlowCapableNode> augmentation = builder.augmentation(FlowCapableNode.class);
-            final InstanceIdentifier<FlowCapableNode> path = augmentation.build();
-            BindingTransactionChain txChain = mdsalConsumer.getDataBroker().createTransactionChain(this);
-            CheckedFuture readFuture = txChain.newReadWriteTransaction().read(LogicalDatastoreType.OPERATIONAL, path);
-            Futures.addCallback(readFuture, new FutureCallback<Optional<? extends DataObject>>() {
-                @Override
-                public void onSuccess(Optional<? extends DataObject> optional) {
-                    if (!optional.isPresent()) {
-                        enqueue(nodeId);
-                    }
-                }
-
-                @Override
-                public void onFailure(Throwable throwable) {
-                    logger.debug(String.format("Can't retrieve node data for node %s. Writing node data with table0.", nodeId));
-                    enqueue(nodeId);
-                }
-            });
-        }
-    }
-
-    @Override
-    public void onTransactionChainFailed(final TransactionChain<?, ?> chain, final AsyncTransaction<?, ?> transaction,
-            final Throwable cause) {
-        logger.error("Failed to export Flow Capable Inventory, Transaction {} failed.",transaction.getIdentifier(),cause);
-    }
-
-    @Override
-    public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
-    }
 }
index 3b5165c7d732abe99d7279deff6a312e85da58fd..5264c93b9064c2bd5c5a7c1f6a7e234d4190eb54 100644 (file)
@@ -12,14 +12,43 @@ package org.opendaylight.ovsdb.openstack.netvirt.providers.openflow13;
 
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
 
+import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRemoved;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorUpdated;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRemoved;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeUpdated;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.OpendaylightInventoryListener;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.InstanceIdentifierBuilder;
 import org.osgi.framework.ServiceReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
 
-public class PipelineOrchestratorImpl implements PipelineOrchestrator {
+public class PipelineOrchestratorImpl implements PipelineOrchestrator, OpendaylightInventoryListener, TransactionChainListener {
 
+    private static final Logger logger = LoggerFactory.getLogger(PipelineOrchestratorImpl.class);
     private List<Service> staticPipeline = Lists.newArrayList(
                                                                 Service.CLASSIFIER,
                                                                 Service.ARP_RESPONDER,
@@ -33,7 +62,9 @@ public class PipelineOrchestratorImpl implements PipelineOrchestrator {
                                                                 Service.OUTBOUND_NAT
                                                               );
     Map<Service, AbstractServiceInstance> serviceRegistry = Maps.newConcurrentMap();
-
+    private volatile MdsalConsumer mdsalConsumer;
+    private volatile BlockingQueue<String> queue;
+    private ExecutorService eventHandler;
     public PipelineOrchestratorImpl() {
     }
 
@@ -57,4 +88,119 @@ public class PipelineOrchestratorImpl implements PipelineOrchestrator {
         if (service == null) return null;
         return serviceRegistry.get(service);
     }
+
+    public void init() {
+        eventHandler = Executors.newSingleThreadExecutor();
+        this.queue = new LinkedBlockingQueue<String>();
+        NotificationProviderService notificationService = mdsalConsumer.getNotificationService();
+        if (notificationService != null) {
+            notificationService.registerNotificationListener(this);
+        }
+    }
+    public void start() {
+        eventHandler.submit(new Runnable()  {
+            @Override
+            public void run() {
+                try {
+                    while (true) {
+                        String nodeId = queue.take();
+                        for (Service service : staticPipeline) {
+                            AbstractServiceInstance serviceInstance = getServiceInstance(service);
+                            if (!serviceInstance.isBridgeInPipeline(nodeId)) {
+                                logger.debug("Bridge {} is not in pipeline", nodeId);
+                                continue;
+                            }
+
+                            serviceInstance.programDefaultPipelineRule(nodeId);
+                        }
+                    }
+                } catch (Exception e) {
+                    logger.warn("Processing interrupted, terminating ", e);
+                    e.printStackTrace();
+                }
+
+                while (!queue.isEmpty()) {
+                    queue.poll();
+                }
+                queue = null;
+            }
+        });
+    }
+
+    public void stop() {
+        queue.clear();
+        eventHandler.shutdownNow();
+    }
+
+    void enqueue(String nodeId) {
+        try {
+            queue.put(new String(nodeId));
+        } catch (InterruptedException e) {
+            logger.warn("Failed to enqueue operation {}", nodeId, e);
+        }
+    }
+
+
+    /**
+     * Process the Node update notification. Check for Openflow node and make sure if the bridge is part of the Pipeline before
+     * programming the Pipeline specific flows.
+     */
+    @Override
+    public void onNodeUpdated(NodeUpdated nodeUpdated) {
+        NodeRef ref = nodeUpdated.getNodeRef();
+        InstanceIdentifier<Node> identifier = (InstanceIdentifier<Node>) ref.getValue();
+        logger.debug("Node Update received for : "+identifier.toString());
+        final NodeKey key = identifier.firstKeyOf(Node.class, NodeKey.class);
+        final String nodeId = key.getId().getValue();
+        if (key != null && key.getId().getValue().contains("openflow")) {
+            InstanceIdentifierBuilder<Node> builder = ((InstanceIdentifier<Node>) ref.getValue()).builder();
+            InstanceIdentifierBuilder<FlowCapableNode> augmentation = builder.augmentation(FlowCapableNode.class);
+            final InstanceIdentifier<FlowCapableNode> path = augmentation.build();
+            BindingTransactionChain txChain = mdsalConsumer.getDataBroker().createTransactionChain(this);
+            CheckedFuture readFuture = txChain.newReadWriteTransaction().read(LogicalDatastoreType.OPERATIONAL, path);
+            Futures.addCallback(readFuture, new FutureCallback<Optional<? extends DataObject>>() {
+                @Override
+                public void onSuccess(Optional<? extends DataObject> optional) {
+                    if (!optional.isPresent()) {
+                        enqueue(nodeId);
+                    }
+                }
+
+                @Override
+                public void onFailure(Throwable throwable) {
+                    logger.debug(String.format("Can't retrieve node data for node %s. Writing node data with table0.", nodeId));
+                    enqueue(nodeId);
+                }
+            });
+        }
+    }
+
+    @Override
+    public void onTransactionChainFailed(final TransactionChain<?, ?> chain, final AsyncTransaction<?, ?> transaction,
+            final Throwable cause) {
+        logger.error("Failed to export Flow Capable Inventory, Transaction {} failed.",transaction.getIdentifier(),cause);
+    }
+
+    @Override
+    public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
+    }
+
+    @Override
+    public void onNodeConnectorRemoved(NodeConnectorRemoved arg0) {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void onNodeConnectorUpdated(NodeConnectorUpdated arg0) {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void onNodeRemoved(NodeRemoved arg0) {
+        // TODO Auto-generated method stub
+
+    }
+
 }