Merge "L3: Add eth to br-ex"
[ovsdb.git] / openstack / net-virt-providers / src / main / java / org / opendaylight / ovsdb / openstack / netvirt / providers / openflow13 / PipelineOrchestratorImpl.java
1 /*
2  * Copyright (c) 2014, 2015 Red Hat, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.ovsdb.openstack.netvirt.providers.openflow13;
10
11 import java.util.Collections;
12 import java.util.List;
13 import java.util.Map;
14 import java.util.concurrent.BlockingQueue;
15 import java.util.concurrent.ExecutorService;
16 import java.util.concurrent.Executors;
17 import java.util.concurrent.LinkedBlockingQueue;
18
19 import org.opendaylight.ovsdb.openstack.netvirt.api.Action;
20 import org.opendaylight.ovsdb.openstack.netvirt.api.NodeCacheListener;
21 import org.opendaylight.ovsdb.openstack.netvirt.api.NodeCacheManager;
22 import org.opendaylight.ovsdb.openstack.netvirt.api.Southbound;
23 import org.opendaylight.ovsdb.openstack.netvirt.providers.ConfigInterface;
24 import org.opendaylight.ovsdb.utils.servicehelper.ServiceHelper;
25 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
26 import org.osgi.framework.BundleContext;
27 import org.osgi.framework.ServiceReference;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30
31 import com.google.common.collect.Lists;
32 import com.google.common.collect.Maps;
33
34 public class PipelineOrchestratorImpl implements ConfigInterface, NodeCacheListener, PipelineOrchestrator {
35     private static final Logger LOG = LoggerFactory.getLogger(PipelineOrchestratorImpl.class);
36
37     public List<Service> getStaticPipeline() {
38         return staticPipeline;
39     }
40
41     private List<Service> staticPipeline = Lists.newArrayList(
42             Service.CLASSIFIER,
43             Service.ARP_RESPONDER,
44             Service.INBOUND_NAT,
45             Service.EGRESS_ACL,
46             Service.LOAD_BALANCER,
47             Service.ROUTING,
48             Service.L3_FORWARDING,
49             Service.L2_REWRITE,
50             Service.INGRESS_ACL,
51             Service.OUTBOUND_NAT,
52             Service.L2_FORWARDING
53     );
54
55     public Map<Service, AbstractServiceInstance> getServiceRegistry() {
56         return serviceRegistry;
57     }
58
59     Map<Service, AbstractServiceInstance> serviceRegistry = Maps.newConcurrentMap();
60     private volatile BlockingQueue<Node> queue;
61     private ExecutorService eventHandler;
62     private Southbound southbound;
63
64     public PipelineOrchestratorImpl() {
65         eventHandler = Executors.newSingleThreadExecutor();
66         this.queue = new LinkedBlockingQueue<>();
67         LOG.info("PipelineOrchestratorImpl constructor");
68         start();
69     }
70
71     public void registerService(final ServiceReference ref, AbstractServiceInstance serviceInstance){
72         Service service = (Service)ref.getProperty(AbstractServiceInstance.SERVICE_PROPERTY);
73         LOG.info("registerService {} - {}", serviceInstance, service);
74         serviceRegistry.put(service, serviceInstance);
75         // insert the service if not already there. The list is ordered based of table ID.
76         if (!staticPipeline.contains(service) && !isTableInPipeline(service.getTable())) {
77             staticPipeline.add(service);
78             Collections.sort(staticPipeline, Service.insertComparator);
79         }
80         LOG.info("registerService: {}", staticPipeline);
81     }
82
83     private boolean isTableInPipeline (short tableId) {
84         boolean found = false;
85         for (Service service : staticPipeline) {
86             if (service.getTable() == tableId) {
87                 found = true;
88                 break;
89             }
90         }
91         return found;
92     }
93
94     public void unregisterService(final ServiceReference ref) {
95         serviceRegistry.remove(ref.getProperty(AbstractServiceInstance.SERVICE_PROPERTY));
96     }
97     @Override
98     public Service getNextServiceInPipeline(Service service) {
99         int index = staticPipeline.indexOf(service);
100         if (index >= staticPipeline.size() - 1) {
101             return null;
102         }
103         return staticPipeline.get(index + 1);
104     }
105
106     @Override
107     public AbstractServiceInstance getServiceInstance(Service service) {
108         if (service == null) {
109             return null;
110         }
111         return serviceRegistry.get(service);
112     }
113
114     public final void start() {
115         eventHandler.submit(new Runnable()  {
116             @Override
117             public void run() {
118                 try {
119                     while (true) {
120                         Node node = queue.take();
121                         LOG.info(">>>>> dequeue: {}", node);
122                         for (Service service : staticPipeline) {
123                             AbstractServiceInstance serviceInstance = getServiceInstance(service);
124                             if (serviceInstance != null && southbound.getBridge(node) != null) {
125                                 serviceInstance.programDefaultPipelineRule(node);
126                             }
127                         }
128                     }
129                 } catch (Exception e) {
130                     LOG.warn("Processing interrupted, terminating ", e);
131                 }
132
133                 while (!queue.isEmpty()) {
134                     queue.poll();
135                 }
136                 queue = null;
137             }
138         });
139     }
140
141     public void stop() {
142         queue.clear();
143         eventHandler.shutdownNow();
144     }
145
146     @Override
147     public void enqueue(Node node) {
148         LOG.info(">>>>> enqueue: {}", node);
149         try {
150             queue.put(node);
151         } catch (InterruptedException e) {
152             LOG.warn("Failed to enqueue operation {}", node, e);
153         }
154     }
155
156     @Override
157     public void notifyNode(Node node, Action action) {
158         if (action == Action.ADD) {
159             enqueue(node);
160         } else {
161             LOG.info("update ignored: {}", node);
162         }
163     }
164
165     @Override
166     public void setDependencies(BundleContext bundleContext, ServiceReference serviceReference) {
167         NodeCacheManager nodeCacheManager =
168                 (NodeCacheManager) ServiceHelper.getGlobalInstance(NodeCacheManager.class, this);
169         nodeCacheManager.cacheListenerAdded(
170                 bundleContext.getServiceReference(PipelineOrchestrator.class.getName()), this);
171         southbound =
172                 (Southbound) ServiceHelper.getGlobalInstance(Southbound.class, this);
173     }
174
175     @Override
176     public void setDependencies(Object impl) {}
177 }