package org.opendaylight.ovsdb.openstack.netvirt.providers.openflow13;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
-
-import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
-import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRemoved;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorUpdated;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRemoved;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeUpdated;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.OpendaylightInventoryListener;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
-import org.opendaylight.yangtools.yang.binding.DataObject;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.InstanceIdentifierBuilder;
+import org.opendaylight.ovsdb.openstack.netvirt.MdsalUtils;
+import org.opendaylight.ovsdb.openstack.netvirt.api.Action;
+import org.opendaylight.ovsdb.openstack.netvirt.api.NodeCacheListener;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
import org.osgi.framework.ServiceReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-
-public class PipelineOrchestratorImpl implements PipelineOrchestrator, OpendaylightInventoryListener, TransactionChainListener {
+public class PipelineOrchestratorImpl implements NodeCacheListener, PipelineOrchestrator {
private static final Logger logger = LoggerFactory.getLogger(PipelineOrchestratorImpl.class);
private List<Service> staticPipeline = Lists.newArrayList(
- Service.CLASSIFIER,
- Service.ARP_RESPONDER,
- Service.INBOUND_NAT,
- Service.EGRESS_ACL,
- Service.LOAD_BALANCER,
- Service.ROUTING,
- Service.L3_FORWARDING,
- Service.L2_REWRITE,
- Service.INGRESS_ACL,
- Service.OUTBOUND_NAT,
- Service.L2_FORWARDING
- );
+ Service.CLASSIFIER,
+ Service.ARP_RESPONDER,
+ Service.INBOUND_NAT,
+ Service.EGRESS_ACL,
+ Service.LOAD_BALANCER,
+ Service.ROUTING,
+ Service.L3_FORWARDING,
+ Service.L2_REWRITE,
+ Service.INGRESS_ACL,
+ Service.OUTBOUND_NAT,
+ Service.L2_FORWARDING
+ );
Map<Service, AbstractServiceInstance> serviceRegistry = Maps.newConcurrentMap();
- private volatile MdsalConsumer mdsalConsumer;
- private volatile BlockingQueue<String> queue;
+ private volatile BlockingQueue<Node> queue;
private ExecutorService eventHandler;
public PipelineOrchestratorImpl() {
}
public void registerService(final ServiceReference ref, AbstractServiceInstance serviceInstance){
Service service = (Service)ref.getProperty(AbstractServiceInstance.SERVICE_PROPERTY);
+ logger.info("registerService {} - {}", serviceInstance, service);
serviceRegistry.put(service, serviceInstance);
}
public void init() {
eventHandler = Executors.newSingleThreadExecutor();
- this.queue = new LinkedBlockingQueue<String>();
- NotificationProviderService notificationService = mdsalConsumer.getNotificationService();
- if (notificationService != null) {
- notificationService.registerNotificationListener(this);
- }
+ this.queue = new LinkedBlockingQueue<Node>();
+ logger.info(">>>>> init PipelineOrchestratorImpl");
}
+
public void start() {
eventHandler.submit(new Runnable() {
@Override
public void run() {
try {
while (true) {
- String nodeId = queue.take();
+ Node node = queue.take();
/*
* Since we are hooking on OpendaylightInventoryListener and as observed in
* Bug 1997 multiple Threads trying to write to a same table at the same time
* causes programming issues. Hence delaying the programming by a second to
* avoid the clash. This hack/workaround should be removed once Bug 1997 is resolved.
*/
+ logger.info(">>>>> dequeue: {}", node);
Thread.sleep(1000);
for (Service service : staticPipeline) {
AbstractServiceInstance serviceInstance = getServiceInstance(service);
- serviceInstance.programDefaultPipelineRule(nodeId);
+ //logger.info("pipeline: {} - {}", service, serviceInstance);
+ if (serviceInstance != null) {
+ if (MdsalUtils.getBridge(node) != null) {
+ serviceInstance.programDefaultPipelineRule(node);
+ }
+ }
}
}
} catch (Exception e) {
eventHandler.shutdownNow();
}
- void enqueue(String nodeId) {
+ @Override
+ public void enqueue(Node node) {
+ logger.info(">>>>> enqueue: {}", node);
try {
- queue.put(new String(nodeId));
+ queue.put(node);
} catch (InterruptedException e) {
- logger.warn("Failed to enqueue operation {}", nodeId, e);
+ logger.warn("Failed to enqueue operation {}", node, e);
}
}
-
- /**
- * Process the Node update notification. Check for Openflow node and make sure if the bridge is part of the Pipeline before
- * programming the Pipeline specific flows.
- */
@Override
- public void onNodeUpdated(NodeUpdated nodeUpdated) {
- NodeRef ref = nodeUpdated.getNodeRef();
- InstanceIdentifier<Node> identifier = (InstanceIdentifier<Node>) ref.getValue();
- logger.debug("Node Update received for : "+identifier.toString());
- final NodeKey key = identifier.firstKeyOf(Node.class, NodeKey.class);
- final String nodeId = key.getId().getValue();
- if (key != null && key.getId().getValue().contains("openflow")) {
- InstanceIdentifierBuilder<Node> builder = ((InstanceIdentifier<Node>) ref.getValue()).builder();
- InstanceIdentifierBuilder<FlowCapableNode> augmentation = builder.augmentation(FlowCapableNode.class);
- final InstanceIdentifier<FlowCapableNode> path = augmentation.build();
- BindingTransactionChain txChain = mdsalConsumer.getDataBroker().createTransactionChain(this);
- CheckedFuture readFuture = txChain.newReadWriteTransaction().read(LogicalDatastoreType.OPERATIONAL, path);
- Futures.addCallback(readFuture, new FutureCallback<Optional<? extends DataObject>>() {
- @Override
- public void onSuccess(Optional<? extends DataObject> optional) {
- if (!optional.isPresent()) {
- enqueue(nodeId);
- }
- }
-
- @Override
- public void onFailure(Throwable throwable) {
- logger.debug(String.format("Can't retrieve node data for node %s. Writing node data with table0.", nodeId));
- enqueue(nodeId);
- }
- });
+ public void notifyNode(Node node, Action action) {
+ if (action == Action.ADD) {
+ enqueue(node);
+ } else {
+ logger.info("update ignored: {}", node);
}
}
-
- @Override
- public void onTransactionChainFailed(final TransactionChain<?, ?> chain, final AsyncTransaction<?, ?> transaction,
- final Throwable cause) {
- logger.error("Failed to export Flow Capable Inventory, Transaction {} failed.",transaction.getIdentifier(),cause);
- }
-
- @Override
- public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
- }
-
- @Override
- public void onNodeConnectorRemoved(NodeConnectorRemoved arg0) {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void onNodeConnectorUpdated(NodeConnectorUpdated arg0) {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void onNodeRemoved(NodeRemoved arg0) {
- // TODO Auto-generated method stub
-
- }
-
}