X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openstack%2Fnet-virt-providers%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fovsdb%2Fopenstack%2Fnetvirt%2Fproviders%2Fopenflow13%2FPipelineOrchestratorImpl.java;h=299ab21a08b76f4261a5fceee18f5e49cf8c7046;hb=73dcc9092351e52836ad9970f0a15fe12d71108e;hp=b7d48671039b22649d1edc07e95bdae776f08d8c;hpb=c6efe31a199b6aba3c346da19c3fb4c2e70709af;p=netvirt.git diff --git a/openstack/net-virt-providers/src/main/java/org/opendaylight/ovsdb/openstack/netvirt/providers/openflow13/PipelineOrchestratorImpl.java b/openstack/net-virt-providers/src/main/java/org/opendaylight/ovsdb/openstack/netvirt/providers/openflow13/PipelineOrchestratorImpl.java index b7d4867103..299ab21a08 100644 --- a/openstack/net-virt-providers/src/main/java/org/opendaylight/ovsdb/openstack/netvirt/providers/openflow13/PipelineOrchestratorImpl.java +++ b/openstack/net-virt-providers/src/main/java/org/opendaylight/ovsdb/openstack/netvirt/providers/openflow13/PipelineOrchestratorImpl.java @@ -1,27 +1,25 @@ /* - * Copyright (C) 2014 Red Hat, Inc. + * Copyright (c) 2014, 2015 Red Hat, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html - * - * Authors : Dave Tucker, Madhu Venugopal */ package org.opendaylight.ovsdb.openstack.netvirt.providers.openflow13; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; -import org.opendaylight.ovsdb.openstack.netvirt.api.Southbound; + import org.opendaylight.ovsdb.openstack.netvirt.api.Action; import org.opendaylight.ovsdb.openstack.netvirt.api.NodeCacheListener; import org.opendaylight.ovsdb.openstack.netvirt.api.NodeCacheManager; +import org.opendaylight.ovsdb.openstack.netvirt.api.Southbound; import org.opendaylight.ovsdb.openstack.netvirt.providers.ConfigInterface; import org.opendaylight.ovsdb.utils.servicehelper.ServiceHelper; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node; @@ -30,8 +28,16 @@ import org.osgi.framework.ServiceReference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + public class PipelineOrchestratorImpl implements ConfigInterface, NodeCacheListener, PipelineOrchestrator { - private static final Logger logger = LoggerFactory.getLogger(PipelineOrchestratorImpl.class); + private static final Logger LOG = LoggerFactory.getLogger(PipelineOrchestratorImpl.class); + + public List getStaticPipeline() { + return staticPipeline; + } + private List staticPipeline = Lists.newArrayList( Service.CLASSIFIER, Service.ARP_RESPONDER, @@ -45,6 +51,11 @@ public class PipelineOrchestratorImpl implements ConfigInterface, NodeCacheListe Service.OUTBOUND_NAT, Service.L2_FORWARDING ); + + public Map getServiceRegistry() { + return serviceRegistry; + } + Map serviceRegistry = Maps.newConcurrentMap(); private volatile BlockingQueue queue; private ExecutorService eventHandler; @@ -52,15 +63,32 @@ public class PipelineOrchestratorImpl implements ConfigInterface, NodeCacheListe public PipelineOrchestratorImpl() { eventHandler = Executors.newSingleThreadExecutor(); - this.queue = new LinkedBlockingQueue(); - logger.info("PipelineOrchestratorImpl constructor"); + this.queue = new LinkedBlockingQueue<>(); + LOG.info("PipelineOrchestratorImpl constructor"); start(); } public void registerService(final ServiceReference ref, AbstractServiceInstance serviceInstance){ Service service = (Service)ref.getProperty(AbstractServiceInstance.SERVICE_PROPERTY); - logger.info("registerService {} - {}", serviceInstance, service); + LOG.info("registerService {} - {}", serviceInstance, service); serviceRegistry.put(service, serviceInstance); + // insert the service if not already there. The list is ordered based of table ID. + if (!staticPipeline.contains(service) && !isTableInPipeline(service.getTable())) { + staticPipeline.add(service); + Collections.sort(staticPipeline, Service.insertComparator); + } + LOG.info("registerService: {}", staticPipeline); + } + + private boolean isTableInPipeline (short tableId) { + boolean found = false; + for (Service service : staticPipeline) { + if (service.getTable() == tableId) { + found = true; + break; + } + } + return found; } public void unregisterService(final ServiceReference ref) { @@ -83,33 +111,23 @@ public class PipelineOrchestratorImpl implements ConfigInterface, NodeCacheListe return serviceRegistry.get(service); } - public void start() { + public final void start() { eventHandler.submit(new Runnable() { @Override public void run() { try { while (true) { Node node = queue.take(); - /* - * Since we are hooking on OpendaylightInventoryListener and as observed in - * Bug 1997 multiple Threads trying to write to a same table at the same time - * causes programming issues. Hence delaying the programming by a second to - * avoid the clash. This hack/workaround should be removed once Bug 1997 is resolved. - */ - logger.info(">>>>> dequeue: {}", node); - Thread.sleep(1000); + LOG.info(">>>>> dequeue: {}", node); for (Service service : staticPipeline) { AbstractServiceInstance serviceInstance = getServiceInstance(service); - //logger.info("pipeline: {} - {}", service, serviceInstance); - if (serviceInstance != null) { - if (southbound.getBridge(node) != null) { - serviceInstance.programDefaultPipelineRule(node); - } + if (serviceInstance != null && southbound.getBridge(node) != null) { + serviceInstance.programDefaultPipelineRule(node); } } } } catch (Exception e) { - logger.warn("Processing interrupted, terminating ", e); + LOG.warn("Processing interrupted, terminating ", e); } while (!queue.isEmpty()) { @@ -127,11 +145,11 @@ public class PipelineOrchestratorImpl implements ConfigInterface, NodeCacheListe @Override public void enqueue(Node node) { - logger.info(">>>>> enqueue: {}", node); + LOG.info(">>>>> enqueue: {}", node); try { queue.put(node); } catch (InterruptedException e) { - logger.warn("Failed to enqueue operation {}", node, e); + LOG.warn("Failed to enqueue operation {}", node, e); } } @@ -140,7 +158,7 @@ public class PipelineOrchestratorImpl implements ConfigInterface, NodeCacheListe if (action == Action.ADD) { enqueue(node); } else { - logger.info("update ignored: {}", node); + LOG.info("update ignored: {}", node); } }