2 * Copyright (c) 2014, 2015 Red Hat, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.ovsdb.openstack.netvirt.providers.openflow13;
11 import java.util.List;
13 import java.util.concurrent.BlockingQueue;
14 import java.util.concurrent.ExecutorService;
15 import java.util.concurrent.Executors;
16 import java.util.concurrent.LinkedBlockingQueue;
18 import org.opendaylight.ovsdb.openstack.netvirt.api.Action;
19 import org.opendaylight.ovsdb.openstack.netvirt.api.NodeCacheListener;
20 import org.opendaylight.ovsdb.openstack.netvirt.api.NodeCacheManager;
21 import org.opendaylight.ovsdb.openstack.netvirt.api.Southbound;
22 import org.opendaylight.ovsdb.openstack.netvirt.providers.ConfigInterface;
23 import org.opendaylight.ovsdb.utils.servicehelper.ServiceHelper;
24 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
25 import org.osgi.framework.BundleContext;
26 import org.osgi.framework.ServiceReference;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
30 import com.google.common.collect.Lists;
31 import com.google.common.collect.Maps;
33 public class PipelineOrchestratorImpl implements ConfigInterface, NodeCacheListener, PipelineOrchestrator {
34 private static final Logger LOG = LoggerFactory.getLogger(PipelineOrchestratorImpl.class);
35 private List<Service> staticPipeline = Lists.newArrayList(
37 Service.ARP_RESPONDER,
40 Service.LOAD_BALANCER,
42 Service.L3_FORWARDING,
48 Map<Service, AbstractServiceInstance> serviceRegistry = Maps.newConcurrentMap();
49 private volatile BlockingQueue<Node> queue;
50 private ExecutorService eventHandler;
51 private Southbound southbound;
53 public PipelineOrchestratorImpl() {
54 eventHandler = Executors.newSingleThreadExecutor();
55 this.queue = new LinkedBlockingQueue<>();
56 LOG.info("PipelineOrchestratorImpl constructor");
60 public void registerService(final ServiceReference ref, AbstractServiceInstance serviceInstance){
61 Service service = (Service)ref.getProperty(AbstractServiceInstance.SERVICE_PROPERTY);
62 LOG.info("registerService {} - {}", serviceInstance, service);
63 serviceRegistry.put(service, serviceInstance);
66 public void unregisterService(final ServiceReference ref) {
67 serviceRegistry.remove(ref.getProperty(AbstractServiceInstance.SERVICE_PROPERTY));
70 public Service getNextServiceInPipeline(Service service) {
71 int index = staticPipeline.indexOf(service);
72 if (index >= staticPipeline.size() - 1) {
75 return staticPipeline.get(index + 1);
79 public AbstractServiceInstance getServiceInstance(Service service) {
80 if (service == null) {
83 return serviceRegistry.get(service);
86 public final void start() {
87 eventHandler.submit(new Runnable() {
92 Node node = queue.take();
93 LOG.info(">>>>> dequeue: {}", node);
94 for (Service service : staticPipeline) {
95 AbstractServiceInstance serviceInstance = getServiceInstance(service);
96 if (serviceInstance != null && southbound.getBridge(node) != null) {
97 serviceInstance.programDefaultPipelineRule(node);
101 } catch (Exception e) {
102 LOG.warn("Processing interrupted, terminating ", e);
105 while (!queue.isEmpty()) {
115 eventHandler.shutdownNow();
119 public void enqueue(Node node) {
120 LOG.info(">>>>> enqueue: {}", node);
123 } catch (InterruptedException e) {
124 LOG.warn("Failed to enqueue operation {}", node, e);
129 public void notifyNode(Node node, Action action) {
130 if (action == Action.ADD) {
133 LOG.info("update ignored: {}", node);
138 public void setDependencies(BundleContext bundleContext, ServiceReference serviceReference) {
139 NodeCacheManager nodeCacheManager =
140 (NodeCacheManager) ServiceHelper.getGlobalInstance(NodeCacheManager.class, this);
141 nodeCacheManager.cacheListenerAdded(
142 bundleContext.getServiceReference(PipelineOrchestrator.class.getName()), this);
144 (Southbound) ServiceHelper.getGlobalInstance(Southbound.class, this);
148 public void setDependencies(Object impl) {}