From ccd6cffd0ee433800e6da24103f03cdaecb6f76b Mon Sep 17 00:00:00 2001 From: Madhu Venugopal Date: Fri, 5 Sep 2014 03:27:11 -0700 Subject: [PATCH] Moving the InventoryListener out of AbstractServiceInstance and into a common place PipelineOrchestratorImpl Change-Id: Ic443bfcb160122f9e5e943aa64c36a73034a6c88 Signed-off-by: Madhu Venugopal --- .../netvirt/providers/Activator.java | 1 + .../openflow13/AbstractServiceInstance.java | 124 +-------------- .../openflow13/PipelineOrchestratorImpl.java | 150 +++++++++++++++++- 3 files changed, 150 insertions(+), 125 deletions(-) diff --git a/openstack/net-virt-providers/src/main/java/org/opendaylight/ovsdb/openstack/netvirt/providers/Activator.java b/openstack/net-virt-providers/src/main/java/org/opendaylight/ovsdb/openstack/netvirt/providers/Activator.java index bdedccb8e..cffedb68b 100644 --- a/openstack/net-virt-providers/src/main/java/org/opendaylight/ovsdb/openstack/netvirt/providers/Activator.java +++ b/openstack/net-virt-providers/src/main/java/org/opendaylight/ovsdb/openstack/netvirt/providers/Activator.java @@ -183,6 +183,7 @@ public class Activator extends ComponentActivatorAbstractBase { c.add(createServiceDependency() .setService(AbstractServiceInstance.class) .setCallbacks("registerService", "unregisterService")); + c.add(createServiceDependency().setService(MdsalConsumer.class).setRequired(true)); } if (AbstractServiceInstance.class.isAssignableFrom((Class) imp)) { diff --git a/openstack/net-virt-providers/src/main/java/org/opendaylight/ovsdb/openstack/netvirt/providers/openflow13/AbstractServiceInstance.java b/openstack/net-virt-providers/src/main/java/org/opendaylight/ovsdb/openstack/netvirt/providers/openflow13/AbstractServiceInstance.java index 5c071454a..a9fc77843 100644 --- a/openstack/net-virt-providers/src/main/java/org/opendaylight/ovsdb/openstack/netvirt/providers/openflow13/AbstractServiceInstance.java +++ b/openstack/net-virt-providers/src/main/java/org/opendaylight/ovsdb/openstack/netvirt/providers/openflow13/AbstractServiceInstance.java @@ -10,21 +10,14 @@ package org.opendaylight.ovsdb.openstack.netvirt.providers.openflow13; import java.util.List; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutionException; -import java.util.concurrent.LinkedBlockingDeque; -import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain; import org.opendaylight.controller.md.sal.binding.api.DataBroker; import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction; import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction; import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; -import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; -import org.opendaylight.controller.md.sal.common.api.data.TransactionChain; -import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; -import org.opendaylight.controller.sal.binding.api.NotificationProviderService; import org.opendaylight.ovsdb.utils.mdsal.openflow.InstructionUtils; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId; @@ -38,20 +31,11 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.M import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.instruction.list.Instruction; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.instruction.list.InstructionBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.instruction.list.InstructionKey; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRemoved; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorUpdated; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRemoved; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeUpdated; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.OpendaylightInventoryListener; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey; -import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; -import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.InstanceIdentifierBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,8 +43,6 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.util.concurrent.CheckedFuture; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; /** * Any ServiceInstance class that extends AbstractServiceInstance to be a part of the pipeline @@ -70,7 +52,7 @@ import com.google.common.util.concurrent.Futures; * use it in any matching flows that needs to be further processed by next service in the pipeline. * */ -public abstract class AbstractServiceInstance implements OpendaylightInventoryListener, Runnable, TransactionChainListener { +public abstract class AbstractServiceInstance { public static final String SERVICE_PROPERTY ="serviceProperty"; private static final Logger logger = LoggerFactory.getLogger(AbstractServiceInstance.class); public static final String OPENFLOW = "openflow:"; @@ -81,10 +63,6 @@ public abstract class AbstractServiceInstance implements OpendaylightInventoryLi // Concrete Service that this AbstractServiceInstance represent private Service service; - // Process Notification in its own thread - Thread thread = null; - private final BlockingQueue queue = new LinkedBlockingDeque<>(); - public AbstractServiceInstance (Service service) { this.service = service; } @@ -104,20 +82,6 @@ public abstract class AbstractServiceInstance implements OpendaylightInventoryLi this.service = service; } - public void start() { - // Register for OpenFlow bridge/node Creation notification. - NotificationProviderService notificationService = mdsalConsumer.getNotificationService(); - if (notificationService != null) { - notificationService.registerNotificationListener(this); - } - - // Never block a Notification thread. Process the notification in its own Thread. - thread = new Thread(this); - thread.setDaemon(true); - thread.setName("AbstractServiceInstance-"+service.toString()); - thread.start(); - } - public NodeBuilder createNodeBuilder(String nodeId) { NodeBuilder builder = new NodeBuilder(); builder.setId(new NodeId(nodeId)); @@ -279,90 +243,4 @@ public abstract class AbstractServiceInstance implements OpendaylightInventoryLi flowBuilder.setIdleTimeout(0); writeFlow(flowBuilder, nodeBuilder); } - - @Override - public void onNodeConnectorRemoved(NodeConnectorRemoved nodeConector) { - } - - @Override - public void onNodeConnectorUpdated(NodeConnectorUpdated nodeConnector) { - } - - @Override - public void onNodeRemoved(NodeRemoved node) { - } - - - @Override - public void run() { - try { - for (; ; ) { - String nodeId = queue.take(); - this.programDefaultPipelineRule(nodeId); - } - } catch (InterruptedException e) { - logger.warn("Processing interrupted, terminating", e); - } - - while (!queue.isEmpty()) { - queue.poll(); - } - - } - - void enqueue(final String nodeId) { - try { - queue.put(nodeId); - } catch (InterruptedException e) { - logger.warn("Failed to enqueue operation {}", nodeId, e); - } - } - - /** - * Process the Node update notification. Check for Openflow node and make sure if the bridge is part of the Pipeline before - * programming the Pipeline specific flows. - */ - @Override - public void onNodeUpdated(NodeUpdated nodeUpdated) { - NodeRef ref = nodeUpdated.getNodeRef(); - InstanceIdentifier identifier = (InstanceIdentifier) ref.getValue(); - logger.info("GOT NOTIFICATION FOR "+identifier.toString()); - final NodeKey key = identifier.firstKeyOf(Node.class, NodeKey.class); - final String nodeId = key.getId().getValue(); - if (!this.isBridgeInPipeline(nodeId)) { - logger.debug("Bridge {} is not in pipeline", nodeId); - return; - } - if (key != null && key.getId().getValue().contains("openflow")) { - InstanceIdentifierBuilder builder = ((InstanceIdentifier) ref.getValue()).builder(); - InstanceIdentifierBuilder augmentation = builder.augmentation(FlowCapableNode.class); - final InstanceIdentifier path = augmentation.build(); - BindingTransactionChain txChain = mdsalConsumer.getDataBroker().createTransactionChain(this); - CheckedFuture readFuture = txChain.newReadWriteTransaction().read(LogicalDatastoreType.OPERATIONAL, path); - Futures.addCallback(readFuture, new FutureCallback>() { - @Override - public void onSuccess(Optional optional) { - if (!optional.isPresent()) { - enqueue(nodeId); - } - } - - @Override - public void onFailure(Throwable throwable) { - logger.debug(String.format("Can't retrieve node data for node %s. Writing node data with table0.", nodeId)); - enqueue(nodeId); - } - }); - } - } - - @Override - public void onTransactionChainFailed(final TransactionChain chain, final AsyncTransaction transaction, - final Throwable cause) { - logger.error("Failed to export Flow Capable Inventory, Transaction {} failed.",transaction.getIdentifier(),cause); - } - - @Override - public void onTransactionChainSuccessful(final TransactionChain chain) { - } } diff --git a/openstack/net-virt-providers/src/main/java/org/opendaylight/ovsdb/openstack/netvirt/providers/openflow13/PipelineOrchestratorImpl.java b/openstack/net-virt-providers/src/main/java/org/opendaylight/ovsdb/openstack/netvirt/providers/openflow13/PipelineOrchestratorImpl.java index 3b5165c7d..5264c93b9 100644 --- a/openstack/net-virt-providers/src/main/java/org/opendaylight/ovsdb/openstack/netvirt/providers/openflow13/PipelineOrchestratorImpl.java +++ b/openstack/net-virt-providers/src/main/java/org/opendaylight/ovsdb/openstack/netvirt/providers/openflow13/PipelineOrchestratorImpl.java @@ -12,14 +12,43 @@ package org.opendaylight.ovsdb.openstack.netvirt.providers.openflow13; import java.util.List; import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain; +import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.TransactionChain; +import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener; +import org.opendaylight.controller.sal.binding.api.NotificationProviderService; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRemoved; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorUpdated; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRemoved; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeUpdated; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.OpendaylightInventoryListener; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey; +import org.opendaylight.yangtools.yang.binding.DataObject; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.InstanceIdentifierBuilder; import org.osgi.framework.ServiceReference; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.google.common.base.Optional; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; -public class PipelineOrchestratorImpl implements PipelineOrchestrator { +public class PipelineOrchestratorImpl implements PipelineOrchestrator, OpendaylightInventoryListener, TransactionChainListener { + private static final Logger logger = LoggerFactory.getLogger(PipelineOrchestratorImpl.class); private List staticPipeline = Lists.newArrayList( Service.CLASSIFIER, Service.ARP_RESPONDER, @@ -33,7 +62,9 @@ public class PipelineOrchestratorImpl implements PipelineOrchestrator { Service.OUTBOUND_NAT ); Map serviceRegistry = Maps.newConcurrentMap(); - + private volatile MdsalConsumer mdsalConsumer; + private volatile BlockingQueue queue; + private ExecutorService eventHandler; public PipelineOrchestratorImpl() { } @@ -57,4 +88,119 @@ public class PipelineOrchestratorImpl implements PipelineOrchestrator { if (service == null) return null; return serviceRegistry.get(service); } + + public void init() { + eventHandler = Executors.newSingleThreadExecutor(); + this.queue = new LinkedBlockingQueue(); + NotificationProviderService notificationService = mdsalConsumer.getNotificationService(); + if (notificationService != null) { + notificationService.registerNotificationListener(this); + } + } + public void start() { + eventHandler.submit(new Runnable() { + @Override + public void run() { + try { + while (true) { + String nodeId = queue.take(); + for (Service service : staticPipeline) { + AbstractServiceInstance serviceInstance = getServiceInstance(service); + if (!serviceInstance.isBridgeInPipeline(nodeId)) { + logger.debug("Bridge {} is not in pipeline", nodeId); + continue; + } + + serviceInstance.programDefaultPipelineRule(nodeId); + } + } + } catch (Exception e) { + logger.warn("Processing interrupted, terminating ", e); + e.printStackTrace(); + } + + while (!queue.isEmpty()) { + queue.poll(); + } + queue = null; + } + }); + } + + public void stop() { + queue.clear(); + eventHandler.shutdownNow(); + } + + void enqueue(String nodeId) { + try { + queue.put(new String(nodeId)); + } catch (InterruptedException e) { + logger.warn("Failed to enqueue operation {}", nodeId, e); + } + } + + + /** + * Process the Node update notification. Check for Openflow node and make sure if the bridge is part of the Pipeline before + * programming the Pipeline specific flows. + */ + @Override + public void onNodeUpdated(NodeUpdated nodeUpdated) { + NodeRef ref = nodeUpdated.getNodeRef(); + InstanceIdentifier identifier = (InstanceIdentifier) ref.getValue(); + logger.debug("Node Update received for : "+identifier.toString()); + final NodeKey key = identifier.firstKeyOf(Node.class, NodeKey.class); + final String nodeId = key.getId().getValue(); + if (key != null && key.getId().getValue().contains("openflow")) { + InstanceIdentifierBuilder builder = ((InstanceIdentifier) ref.getValue()).builder(); + InstanceIdentifierBuilder augmentation = builder.augmentation(FlowCapableNode.class); + final InstanceIdentifier path = augmentation.build(); + BindingTransactionChain txChain = mdsalConsumer.getDataBroker().createTransactionChain(this); + CheckedFuture readFuture = txChain.newReadWriteTransaction().read(LogicalDatastoreType.OPERATIONAL, path); + Futures.addCallback(readFuture, new FutureCallback>() { + @Override + public void onSuccess(Optional optional) { + if (!optional.isPresent()) { + enqueue(nodeId); + } + } + + @Override + public void onFailure(Throwable throwable) { + logger.debug(String.format("Can't retrieve node data for node %s. Writing node data with table0.", nodeId)); + enqueue(nodeId); + } + }); + } + } + + @Override + public void onTransactionChainFailed(final TransactionChain chain, final AsyncTransaction transaction, + final Throwable cause) { + logger.error("Failed to export Flow Capable Inventory, Transaction {} failed.",transaction.getIdentifier(),cause); + } + + @Override + public void onTransactionChainSuccessful(final TransactionChain chain) { + } + + @Override + public void onNodeConnectorRemoved(NodeConnectorRemoved arg0) { + // TODO Auto-generated method stub + + } + + @Override + public void onNodeConnectorUpdated(NodeConnectorUpdated arg0) { + // TODO Auto-generated method stub + + } + + @Override + public void onNodeRemoved(NodeRemoved arg0) { + // TODO Auto-generated method stub + + } + } -- 2.36.6