2 * Copyright (c) 2014, 2015 Red Hat, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.ovsdb.openstack.netvirt.providers.openflow13;
11 import com.google.common.collect.Lists;
12 import com.google.common.collect.Maps;
13 import java.util.List;
15 import java.util.concurrent.BlockingQueue;
16 import java.util.concurrent.ExecutorService;
17 import java.util.concurrent.Executors;
18 import java.util.concurrent.LinkedBlockingQueue;
19 import org.opendaylight.ovsdb.openstack.netvirt.api.Southbound;
20 import org.opendaylight.ovsdb.openstack.netvirt.api.Action;
21 import org.opendaylight.ovsdb.openstack.netvirt.api.NodeCacheListener;
22 import org.opendaylight.ovsdb.openstack.netvirt.api.NodeCacheManager;
23 import org.opendaylight.ovsdb.openstack.netvirt.providers.ConfigInterface;
24 import org.opendaylight.ovsdb.utils.servicehelper.ServiceHelper;
25 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
26 import org.osgi.framework.BundleContext;
27 import org.osgi.framework.ServiceReference;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
31 public class PipelineOrchestratorImpl implements ConfigInterface, NodeCacheListener, PipelineOrchestrator {
32 private static final Logger LOG = LoggerFactory.getLogger(PipelineOrchestratorImpl.class);
33 private List<Service> staticPipeline = Lists.newArrayList(
35 Service.ARP_RESPONDER,
38 Service.LOAD_BALANCER,
40 Service.L3_FORWARDING,
46 Map<Service, AbstractServiceInstance> serviceRegistry = Maps.newConcurrentMap();
47 private volatile BlockingQueue<Node> queue;
48 private ExecutorService eventHandler;
49 private Southbound southbound;
51 public PipelineOrchestratorImpl() {
52 eventHandler = Executors.newSingleThreadExecutor();
53 this.queue = new LinkedBlockingQueue<>();
54 LOG.info("PipelineOrchestratorImpl constructor");
58 public void registerService(final ServiceReference ref, AbstractServiceInstance serviceInstance){
59 Service service = (Service)ref.getProperty(AbstractServiceInstance.SERVICE_PROPERTY);
60 LOG.info("registerService {} - {}", serviceInstance, service);
61 serviceRegistry.put(service, serviceInstance);
64 public void unregisterService(final ServiceReference ref) {
65 serviceRegistry.remove(ref.getProperty(AbstractServiceInstance.SERVICE_PROPERTY));
68 public Service getNextServiceInPipeline(Service service) {
69 int index = staticPipeline.indexOf(service);
70 if (index >= staticPipeline.size() - 1) {
73 return staticPipeline.get(index + 1);
77 public AbstractServiceInstance getServiceInstance(Service service) {
78 if (service == null) {
81 return serviceRegistry.get(service);
85 eventHandler.submit(new Runnable() {
90 Node node = queue.take();
92 * Since we are hooking on OpendaylightInventoryListener and as observed in
93 * Bug 1997 multiple Threads trying to write to a same table at the same time
94 * causes programming issues. Hence delaying the programming by a second to
95 * avoid the clash. This hack/workaround should be removed once Bug 1997 is resolved.
97 LOG.info(">>>>> dequeue: {}", node);
99 for (Service service : staticPipeline) {
100 AbstractServiceInstance serviceInstance = getServiceInstance(service);
101 //LOG.info("pipeline: {} - {}", service, serviceInstance);
102 if (serviceInstance != null) {
103 if (southbound.getBridge(node) != null) {
104 serviceInstance.programDefaultPipelineRule(node);
109 } catch (Exception e) {
110 LOG.warn("Processing interrupted, terminating ", e);
113 while (!queue.isEmpty()) {
123 eventHandler.shutdownNow();
127 public void enqueue(Node node) {
128 LOG.info(">>>>> enqueue: {}", node);
131 } catch (InterruptedException e) {
132 LOG.warn("Failed to enqueue operation {}", node, e);
137 public void notifyNode(Node node, Action action) {
138 if (action == Action.ADD) {
141 LOG.info("update ignored: {}", node);
146 public void setDependencies(BundleContext bundleContext, ServiceReference serviceReference) {
147 NodeCacheManager nodeCacheManager =
148 (NodeCacheManager) ServiceHelper.getGlobalInstance(NodeCacheManager.class, this);
149 nodeCacheManager.cacheListenerAdded(
150 bundleContext.getServiceReference(PipelineOrchestrator.class.getName()), this);
152 (Southbound) ServiceHelper.getGlobalInstance(Southbound.class, this);
156 public void setDependencies(Object impl) {}