2 * Copyright (c) 2014, 2015 Red Hat, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.netvirt.openstack.netvirt.providers.openflow13;
11 import java.util.Collections;
12 import java.util.List;
14 import java.util.concurrent.BlockingQueue;
15 import java.util.concurrent.ExecutorService;
16 import java.util.concurrent.Executors;
17 import java.util.concurrent.LinkedBlockingQueue;
18 import java.util.concurrent.ThreadFactory;
19 import com.google.common.util.concurrent.ThreadFactoryBuilder;
20 import org.opendaylight.netvirt.openstack.netvirt.api.Action;
21 import org.opendaylight.netvirt.openstack.netvirt.api.NodeCacheManager;
22 import org.opendaylight.netvirt.openstack.netvirt.providers.NetvirtProvidersProvider;
23 import org.opendaylight.netvirt.openstack.netvirt.api.NodeCacheListener;
24 import org.opendaylight.netvirt.openstack.netvirt.api.Southbound;
25 import org.opendaylight.netvirt.openstack.netvirt.providers.ConfigInterface;
26 import org.opendaylight.netvirt.utils.servicehelper.ServiceHelper;
27 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
28 import org.osgi.framework.BundleContext;
29 import org.osgi.framework.ServiceReference;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
33 import com.google.common.collect.Lists;
34 import com.google.common.collect.Maps;
36 public class PipelineOrchestratorImpl implements ConfigInterface, NodeCacheListener, PipelineOrchestrator {
37 private static final Logger LOG = LoggerFactory.getLogger(PipelineOrchestratorImpl.class);
40 * Return the current table offset
41 * @return The table offset
44 public short getTableOffset() {
45 return NetvirtProvidersProvider.getTableOffset();
49 * Return the offset adjusted table for the given {@link Service}
50 * @param service Identifies the openflow {@link Service}
51 * @return The table id
54 public short getTable(Service service) {
55 return (short)(getTableOffset() + service.getTable());
58 public List<Service> getStaticPipeline() {
59 return staticPipeline;
62 private List<Service> staticPipeline = Lists.newArrayList(
64 Service.ARP_RESPONDER,
67 Service.LOAD_BALANCER,
69 Service.L3_FORWARDING,
76 public Map<Service, AbstractServiceInstance> getServiceRegistry() {
77 return serviceRegistry;
80 Map<Service, AbstractServiceInstance> serviceRegistry = Maps.newConcurrentMap();
81 private volatile BlockingQueue<Node> queue;
82 private ExecutorService eventHandler;
83 private Southbound southbound;
85 public PipelineOrchestratorImpl() {
86 ThreadFactory threadFactory = new ThreadFactoryBuilder()
87 .setNameFormat("NV-PipelineOrch-%d").build();
88 eventHandler = Executors.newSingleThreadExecutor(threadFactory);
89 this.queue = new LinkedBlockingQueue<>();
90 LOG.info("PipelineOrchestratorImpl constructor");
94 public void registerService(final ServiceReference ref, AbstractServiceInstance serviceInstance){
95 Service service = (Service)ref.getProperty(AbstractServiceInstance.SERVICE_PROPERTY);
96 LOG.info("registerService {} - {}", serviceInstance, service);
97 serviceRegistry.put(service, serviceInstance);
98 // insert the service if not already there. The list is ordered based of table ID.
99 if (!staticPipeline.contains(service) && !isTableInPipeline(service.getTable())) {
100 staticPipeline.add(service);
101 Collections.sort(staticPipeline, Service.insertComparator);
103 LOG.info("registerService: {}", staticPipeline);
106 private boolean isTableInPipeline (short tableId) {
107 boolean found = false;
108 for (Service service : staticPipeline) {
109 if (service.getTable() == tableId) {
117 public void unregisterService(final ServiceReference ref) {
118 serviceRegistry.remove(ref.getProperty(AbstractServiceInstance.SERVICE_PROPERTY));
121 public Service getNextServiceInPipeline(Service service) {
122 int index = staticPipeline.indexOf(service);
123 if (index >= staticPipeline.size() - 1) {
126 return staticPipeline.get(index + 1);
130 public AbstractServiceInstance getServiceInstance(Service service) {
131 if (service == null) {
134 return serviceRegistry.get(service);
137 public final void start() {
138 eventHandler.submit(new Runnable() {
143 Node node = queue.take();
144 LOG.info(">>>>> dequeue: {}", node);
145 if (southbound.getBridge(node) != null) {
146 for (Service service : staticPipeline) {
147 AbstractServiceInstance serviceInstance = getServiceInstance(service);
148 if (serviceInstance != null) {
149 serviceInstance.programDefaultPipelineRule(node);
152 // TODO: might need a flow to go from table 0 to the pipeline
155 } catch (Exception e) {
156 LOG.warn("Processing interrupted, terminating ", e);
159 while (!queue.isEmpty()) {
169 eventHandler.shutdownNow();
173 public void enqueue(Node node) {
174 LOG.info(">>>>> enqueue: {}", node);
177 } catch (InterruptedException e) {
178 LOG.warn("Failed to enqueue operation {}", node, e);
183 public void notifyNode(Node node, Action action) {
184 if (action == Action.ADD) {
187 LOG.info("update ignored: {}", node);
192 public void setDependencies(BundleContext bundleContext, ServiceReference serviceReference) {
193 NodeCacheManager nodeCacheManager =
194 (NodeCacheManager) ServiceHelper.getGlobalInstance(NodeCacheManager.class, this);
195 nodeCacheManager.cacheListenerAdded(
196 bundleContext.getServiceReference(PipelineOrchestrator.class.getName()), this);
198 (Southbound) ServiceHelper.getGlobalInstance(Southbound.class, this);
202 public void setDependencies(Object impl) {}