Bug 6014 - Named Thread pool Executors for better debugging
[netvirt.git] / openstack / net-virt-providers / src / main / java / org / opendaylight / netvirt / openstack / netvirt / providers / openflow13 / PipelineOrchestratorImpl.java
1 /*
2  * Copyright (c) 2014, 2015 Red Hat, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netvirt.openstack.netvirt.providers.openflow13;
10
11 import java.util.Collections;
12 import java.util.List;
13 import java.util.Map;
14 import java.util.concurrent.BlockingQueue;
15 import java.util.concurrent.ExecutorService;
16 import java.util.concurrent.Executors;
17 import java.util.concurrent.LinkedBlockingQueue;
18 import java.util.concurrent.ThreadFactory;
19 import com.google.common.util.concurrent.ThreadFactoryBuilder;
20 import org.opendaylight.netvirt.openstack.netvirt.api.Action;
21 import org.opendaylight.netvirt.openstack.netvirt.api.NodeCacheManager;
22 import org.opendaylight.netvirt.openstack.netvirt.providers.NetvirtProvidersProvider;
23 import org.opendaylight.netvirt.openstack.netvirt.api.NodeCacheListener;
24 import org.opendaylight.netvirt.openstack.netvirt.api.Southbound;
25 import org.opendaylight.netvirt.openstack.netvirt.providers.ConfigInterface;
26 import org.opendaylight.netvirt.utils.servicehelper.ServiceHelper;
27 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
28 import org.osgi.framework.BundleContext;
29 import org.osgi.framework.ServiceReference;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32
33 import com.google.common.collect.Lists;
34 import com.google.common.collect.Maps;
35
36 public class PipelineOrchestratorImpl implements ConfigInterface, NodeCacheListener, PipelineOrchestrator {
37     private static final Logger LOG = LoggerFactory.getLogger(PipelineOrchestratorImpl.class);
38
39     /**
40      * Return the current table offset
41      * @return The table offset
42      */
43     @Override
44     public short getTableOffset() {
45         return NetvirtProvidersProvider.getTableOffset();
46     }
47
48     /**
49      * Return the offset adjusted table for the given {@link Service}
50      * @param service Identifies the openflow {@link Service}
51      * @return The table id
52      */
53     @Override
54     public short getTable(Service service) {
55         return (short)(getTableOffset() + service.getTable());
56     }
57
58     public List<Service> getStaticPipeline() {
59         return staticPipeline;
60     }
61
62     private List<Service> staticPipeline = Lists.newArrayList(
63             Service.CLASSIFIER,
64             Service.ARP_RESPONDER,
65             Service.INBOUND_NAT,
66             Service.EGRESS_ACL,
67             Service.LOAD_BALANCER,
68             Service.ROUTING,
69             Service.L3_FORWARDING,
70             Service.L2_REWRITE,
71             Service.INGRESS_ACL,
72             Service.OUTBOUND_NAT,
73             Service.L2_FORWARDING
74     );
75
76     public Map<Service, AbstractServiceInstance> getServiceRegistry() {
77         return serviceRegistry;
78     }
79
80     Map<Service, AbstractServiceInstance> serviceRegistry = Maps.newConcurrentMap();
81     private volatile BlockingQueue<Node> queue;
82     private ExecutorService eventHandler;
83     private Southbound southbound;
84
85     public PipelineOrchestratorImpl() {
86         ThreadFactory threadFactory = new ThreadFactoryBuilder()
87             .setNameFormat("NV-PipelineOrch-%d").build();
88         eventHandler = Executors.newSingleThreadExecutor(threadFactory);
89         this.queue = new LinkedBlockingQueue<>();
90         LOG.info("PipelineOrchestratorImpl constructor");
91         start();
92     }
93
94     public void registerService(final ServiceReference ref, AbstractServiceInstance serviceInstance){
95         Service service = (Service)ref.getProperty(AbstractServiceInstance.SERVICE_PROPERTY);
96         LOG.info("registerService {} - {}", serviceInstance, service);
97         serviceRegistry.put(service, serviceInstance);
98         // insert the service if not already there. The list is ordered based of table ID.
99         if (!staticPipeline.contains(service) && !isTableInPipeline(service.getTable())) {
100             staticPipeline.add(service);
101             Collections.sort(staticPipeline, Service.insertComparator);
102         }
103         LOG.info("registerService: {}", staticPipeline);
104     }
105
106     private boolean isTableInPipeline (short tableId) {
107         boolean found = false;
108         for (Service service : staticPipeline) {
109             if (service.getTable() == tableId) {
110                 found = true;
111                 break;
112             }
113         }
114         return found;
115     }
116
117     public void unregisterService(final ServiceReference ref) {
118         serviceRegistry.remove(ref.getProperty(AbstractServiceInstance.SERVICE_PROPERTY));
119     }
120     @Override
121     public Service getNextServiceInPipeline(Service service) {
122         int index = staticPipeline.indexOf(service);
123         if (index >= staticPipeline.size() - 1) {
124             return null;
125         }
126         return staticPipeline.get(index + 1);
127     }
128
129     @Override
130     public AbstractServiceInstance getServiceInstance(Service service) {
131         if (service == null) {
132             return null;
133         }
134         return serviceRegistry.get(service);
135     }
136
137     public final void start() {
138         eventHandler.submit(new Runnable()  {
139             @Override
140             public void run() {
141                 try {
142                     while (true) {
143                         Node node = queue.take();
144                         LOG.info(">>>>> dequeue: {}", node);
145                         if (southbound.getBridge(node) != null) {
146                             for (Service service : staticPipeline) {
147                                 AbstractServiceInstance serviceInstance = getServiceInstance(service);
148                                 if (serviceInstance != null) {
149                                     serviceInstance.programDefaultPipelineRule(node);
150                                 }
151                             }
152                             // TODO: might need a flow to go from table 0 to the pipeline
153                         }
154                     }
155                 } catch (Exception e) {
156                     LOG.warn("Processing interrupted, terminating ", e);
157                 }
158
159                 while (!queue.isEmpty()) {
160                     queue.poll();
161                 }
162                 queue = null;
163             }
164         });
165     }
166
167     public void stop() {
168         queue.clear();
169         eventHandler.shutdownNow();
170     }
171
172     @Override
173     public void enqueue(Node node) {
174         LOG.info(">>>>> enqueue: {}", node);
175         try {
176             queue.put(node);
177         } catch (InterruptedException e) {
178             LOG.warn("Failed to enqueue operation {}", node, e);
179         }
180     }
181
182     @Override
183     public void notifyNode(Node node, Action action) {
184         if (action == Action.ADD) {
185             enqueue(node);
186         } else {
187             LOG.info("update ignored: {}", node);
188         }
189     }
190
191     @Override
192     public void setDependencies(BundleContext bundleContext, ServiceReference serviceReference) {
193         NodeCacheManager nodeCacheManager =
194                 (NodeCacheManager) ServiceHelper.getGlobalInstance(NodeCacheManager.class, this);
195         nodeCacheManager.cacheListenerAdded(
196                 bundleContext.getServiceReference(PipelineOrchestrator.class.getName()), this);
197         southbound =
198                 (Southbound) ServiceHelper.getGlobalInstance(Southbound.class, this);
199     }
200
201     @Override
202     public void setDependencies(Object impl) {}
203 }