Update .gitreview for new repo
[netvirt.git] / openstack / net-virt-providers / src / main / java / org / opendaylight / ovsdb / openstack / netvirt / providers / openflow13 / PipelineOrchestratorImpl.java
1 /*
2  * Copyright (c) 2014, 2015 Red Hat, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.ovsdb.openstack.netvirt.providers.openflow13;
10
11 import java.util.Collections;
12 import java.util.List;
13 import java.util.Map;
14 import java.util.concurrent.BlockingQueue;
15 import java.util.concurrent.ExecutorService;
16 import java.util.concurrent.Executors;
17 import java.util.concurrent.LinkedBlockingQueue;
18
19 import org.opendaylight.ovsdb.openstack.netvirt.api.Action;
20 import org.opendaylight.ovsdb.openstack.netvirt.api.NodeCacheListener;
21 import org.opendaylight.ovsdb.openstack.netvirt.api.NodeCacheManager;
22 import org.opendaylight.ovsdb.openstack.netvirt.api.Southbound;
23 import org.opendaylight.ovsdb.openstack.netvirt.providers.ConfigInterface;
24 import org.opendaylight.ovsdb.openstack.netvirt.providers.NetvirtProvidersProvider;
25 import org.opendaylight.ovsdb.utils.servicehelper.ServiceHelper;
26 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
27 import org.osgi.framework.BundleContext;
28 import org.osgi.framework.ServiceReference;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 import com.google.common.collect.Lists;
33 import com.google.common.collect.Maps;
34
35 public class PipelineOrchestratorImpl implements ConfigInterface, NodeCacheListener, PipelineOrchestrator {
36     private static final Logger LOG = LoggerFactory.getLogger(PipelineOrchestratorImpl.class);
37
38     /**
39      * Return the current table offset
40      * @return The table offset
41      */
42     @Override
43     public short getTableOffset() {
44         return NetvirtProvidersProvider.getTableOffset();
45     }
46
47     /**
48      * Return the offset adjusted table for the given {@link Service}
49      * @param service Identifies the openflow {@link Service}
50      * @return The table id
51      */
52     @Override
53     public short getTable(Service service) {
54         return (short)(getTableOffset() + service.getTable());
55     }
56
57     public List<Service> getStaticPipeline() {
58         return staticPipeline;
59     }
60
61     private List<Service> staticPipeline = Lists.newArrayList(
62             Service.CLASSIFIER,
63             Service.ARP_RESPONDER,
64             Service.INBOUND_NAT,
65             Service.EGRESS_ACL,
66             Service.LOAD_BALANCER,
67             Service.ROUTING,
68             Service.L3_FORWARDING,
69             Service.L2_REWRITE,
70             Service.INGRESS_ACL,
71             Service.OUTBOUND_NAT,
72             Service.L2_FORWARDING
73     );
74
75     public Map<Service, AbstractServiceInstance> getServiceRegistry() {
76         return serviceRegistry;
77     }
78
79     Map<Service, AbstractServiceInstance> serviceRegistry = Maps.newConcurrentMap();
80     private volatile BlockingQueue<Node> queue;
81     private ExecutorService eventHandler;
82     private Southbound southbound;
83
84     public PipelineOrchestratorImpl() {
85         eventHandler = Executors.newSingleThreadExecutor();
86         this.queue = new LinkedBlockingQueue<>();
87         LOG.info("PipelineOrchestratorImpl constructor");
88         start();
89     }
90
91     public void registerService(final ServiceReference ref, AbstractServiceInstance serviceInstance){
92         Service service = (Service)ref.getProperty(AbstractServiceInstance.SERVICE_PROPERTY);
93         LOG.info("registerService {} - {}", serviceInstance, service);
94         serviceRegistry.put(service, serviceInstance);
95         // insert the service if not already there. The list is ordered based of table ID.
96         if (!staticPipeline.contains(service) && !isTableInPipeline(service.getTable())) {
97             staticPipeline.add(service);
98             Collections.sort(staticPipeline, Service.insertComparator);
99         }
100         LOG.info("registerService: {}", staticPipeline);
101     }
102
103     private boolean isTableInPipeline (short tableId) {
104         boolean found = false;
105         for (Service service : staticPipeline) {
106             if (service.getTable() == tableId) {
107                 found = true;
108                 break;
109             }
110         }
111         return found;
112     }
113
114     public void unregisterService(final ServiceReference ref) {
115         serviceRegistry.remove(ref.getProperty(AbstractServiceInstance.SERVICE_PROPERTY));
116     }
117     @Override
118     public Service getNextServiceInPipeline(Service service) {
119         int index = staticPipeline.indexOf(service);
120         if (index >= staticPipeline.size() - 1) {
121             return null;
122         }
123         return staticPipeline.get(index + 1);
124     }
125
126     @Override
127     public AbstractServiceInstance getServiceInstance(Service service) {
128         if (service == null) {
129             return null;
130         }
131         return serviceRegistry.get(service);
132     }
133
134     public final void start() {
135         eventHandler.submit(new Runnable()  {
136             @Override
137             public void run() {
138                 try {
139                     while (true) {
140                         Node node = queue.take();
141                         LOG.info(">>>>> dequeue: {}", node);
142                         if (southbound.getBridge(node) != null) {
143                             for (Service service : staticPipeline) {
144                                 AbstractServiceInstance serviceInstance = getServiceInstance(service);
145                                 if (serviceInstance != null) {
146                                     serviceInstance.programDefaultPipelineRule(node);
147                                 }
148                             }
149                             // TODO: might need a flow to go from table 0 to the pipeline
150                         }
151                     }
152                 } catch (Exception e) {
153                     LOG.warn("Processing interrupted, terminating ", e);
154                 }
155
156                 while (!queue.isEmpty()) {
157                     queue.poll();
158                 }
159                 queue = null;
160             }
161         });
162     }
163
164     public void stop() {
165         queue.clear();
166         eventHandler.shutdownNow();
167     }
168
169     @Override
170     public void enqueue(Node node) {
171         LOG.info(">>>>> enqueue: {}", node);
172         try {
173             queue.put(node);
174         } catch (InterruptedException e) {
175             LOG.warn("Failed to enqueue operation {}", node, e);
176         }
177     }
178
179     @Override
180     public void notifyNode(Node node, Action action) {
181         if (action == Action.ADD) {
182             enqueue(node);
183         } else {
184             LOG.info("update ignored: {}", node);
185         }
186     }
187
188     @Override
189     public void setDependencies(BundleContext bundleContext, ServiceReference serviceReference) {
190         NodeCacheManager nodeCacheManager =
191                 (NodeCacheManager) ServiceHelper.getGlobalInstance(NodeCacheManager.class, this);
192         nodeCacheManager.cacheListenerAdded(
193                 bundleContext.getServiceReference(PipelineOrchestrator.class.getName()), this);
194         southbound =
195                 (Southbound) ServiceHelper.getGlobalInstance(Southbound.class, this);
196     }
197
198     @Override
199     public void setDependencies(Object impl) {}
200 }