package org.opendaylight.ovsdb.openstack.netvirt.providers.openflow13;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import java.util.List;
import java.util.Map;
-
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.opendaylight.ovsdb.openstack.netvirt.MdsalUtils;
+import org.opendaylight.ovsdb.openstack.netvirt.api.Action;
+import org.opendaylight.ovsdb.openstack.netvirt.api.NodeCacheListener;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
import org.osgi.framework.ServiceReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-public class PipelineOrchestratorImpl implements PipelineOrchestrator {
+public class PipelineOrchestratorImpl implements NodeCacheListener, PipelineOrchestrator {
+ private static final Logger logger = LoggerFactory.getLogger(PipelineOrchestratorImpl.class);
private List<Service> staticPipeline = Lists.newArrayList(
- Service.CLASSIFIER,
- Service.ARP_RESPONDER,
- Service.INBOUND_NAT,
- Service.INGRESS_ACL,
- Service.LOAD_BALANCER,
- Service.ROUTING,
- Service.L2_REWRITE,
- Service.L2_FORWARDING,
- Service.EGRESS_ACL,
- Service.OUTBOUND_NAT
- );
+ Service.CLASSIFIER,
+ Service.ARP_RESPONDER,
+ Service.INBOUND_NAT,
+ Service.EGRESS_ACL,
+ Service.LOAD_BALANCER,
+ Service.ROUTING,
+ Service.L3_FORWARDING,
+ Service.L2_REWRITE,
+ Service.INGRESS_ACL,
+ Service.OUTBOUND_NAT,
+ Service.L2_FORWARDING
+ );
Map<Service, AbstractServiceInstance> serviceRegistry = Maps.newConcurrentMap();
-
+ private volatile BlockingQueue<Node> queue;
+ private ExecutorService eventHandler;
public PipelineOrchestratorImpl() {
}
public void registerService(final ServiceReference ref, AbstractServiceInstance serviceInstance){
Service service = (Service)ref.getProperty(AbstractServiceInstance.SERVICE_PROPERTY);
+ logger.info("registerService {} - {}", serviceInstance, service);
serviceRegistry.put(service, serviceInstance);
}
if (service == null) return null;
return serviceRegistry.get(service);
}
+
+ public void init() {
+ eventHandler = Executors.newSingleThreadExecutor();
+ this.queue = new LinkedBlockingQueue<Node>();
+ logger.info(">>>>> init PipelineOrchestratorImpl");
+ }
+
+ public void start() {
+ eventHandler.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ while (true) {
+ Node node = queue.take();
+ /*
+ * Since we are hooking on OpendaylightInventoryListener and as observed in
+ * Bug 1997 multiple Threads trying to write to a same table at the same time
+ * causes programming issues. Hence delaying the programming by a second to
+ * avoid the clash. This hack/workaround should be removed once Bug 1997 is resolved.
+ */
+ logger.info(">>>>> dequeue: {}", node);
+ Thread.sleep(1000);
+ for (Service service : staticPipeline) {
+ AbstractServiceInstance serviceInstance = getServiceInstance(service);
+ //logger.info("pipeline: {} - {}", service, serviceInstance);
+ if (serviceInstance != null) {
+ if (MdsalUtils.getBridge(node) != null) {
+ serviceInstance.programDefaultPipelineRule(node);
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ logger.warn("Processing interrupted, terminating ", e);
+ }
+
+ while (!queue.isEmpty()) {
+ queue.poll();
+ }
+ queue = null;
+ }
+ });
+ }
+
+ public void stop() {
+ queue.clear();
+ eventHandler.shutdownNow();
+ }
+
+ @Override
+ public void enqueue(Node node) {
+ logger.info(">>>>> enqueue: {}", node);
+ try {
+ queue.put(node);
+ } catch (InterruptedException e) {
+ logger.warn("Failed to enqueue operation {}", node, e);
+ }
+ }
+
+ @Override
+ public void notifyNode(Node node, Action action) {
+ if (action == Action.ADD) {
+ enqueue(node);
+ } else {
+ logger.info("update ignored: {}", node);
+ }
+ }
}