Merge "Remove incorrect or unnecessary relativePaths"
[netvirt.git] / openstack / net-virt-providers / src / main / java / org / opendaylight / ovsdb / openstack / netvirt / providers / openflow13 / PipelineOrchestratorImpl.java
1 /*
2  * Copyright (c) 2014, 2015 Red Hat, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.ovsdb.openstack.netvirt.providers.openflow13;
10
11 import com.google.common.collect.Lists;
12 import com.google.common.collect.Maps;
13 import java.util.List;
14 import java.util.Map;
15 import java.util.concurrent.BlockingQueue;
16 import java.util.concurrent.ExecutorService;
17 import java.util.concurrent.Executors;
18 import java.util.concurrent.LinkedBlockingQueue;
19 import org.opendaylight.ovsdb.openstack.netvirt.api.Southbound;
20 import org.opendaylight.ovsdb.openstack.netvirt.api.Action;
21 import org.opendaylight.ovsdb.openstack.netvirt.api.NodeCacheListener;
22 import org.opendaylight.ovsdb.openstack.netvirt.api.NodeCacheManager;
23 import org.opendaylight.ovsdb.openstack.netvirt.providers.ConfigInterface;
24 import org.opendaylight.ovsdb.utils.servicehelper.ServiceHelper;
25 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
26 import org.osgi.framework.BundleContext;
27 import org.osgi.framework.ServiceReference;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30
31 public class PipelineOrchestratorImpl implements ConfigInterface, NodeCacheListener, PipelineOrchestrator {
32     private static final Logger LOG = LoggerFactory.getLogger(PipelineOrchestratorImpl.class);
33     private List<Service> staticPipeline = Lists.newArrayList(
34             Service.CLASSIFIER,
35             Service.ARP_RESPONDER,
36             Service.INBOUND_NAT,
37             Service.EGRESS_ACL,
38             Service.LOAD_BALANCER,
39             Service.ROUTING,
40             Service.L3_FORWARDING,
41             Service.L2_REWRITE,
42             Service.INGRESS_ACL,
43             Service.OUTBOUND_NAT,
44             Service.L2_FORWARDING
45     );
46     Map<Service, AbstractServiceInstance> serviceRegistry = Maps.newConcurrentMap();
47     private volatile BlockingQueue<Node> queue;
48     private ExecutorService eventHandler;
49     private Southbound southbound;
50
51     public PipelineOrchestratorImpl() {
52         eventHandler = Executors.newSingleThreadExecutor();
53         this.queue = new LinkedBlockingQueue<>();
54         LOG.info("PipelineOrchestratorImpl constructor");
55         start();
56     }
57
58     public void registerService(final ServiceReference ref, AbstractServiceInstance serviceInstance){
59         Service service = (Service)ref.getProperty(AbstractServiceInstance.SERVICE_PROPERTY);
60         LOG.info("registerService {} - {}", serviceInstance, service);
61         serviceRegistry.put(service, serviceInstance);
62     }
63
64     public void unregisterService(final ServiceReference ref) {
65         serviceRegistry.remove(ref.getProperty(AbstractServiceInstance.SERVICE_PROPERTY));
66     }
67     @Override
68     public Service getNextServiceInPipeline(Service service) {
69         int index = staticPipeline.indexOf(service);
70         if (index >= staticPipeline.size() - 1) {
71             return null;
72         }
73         return staticPipeline.get(index + 1);
74     }
75
76     @Override
77     public AbstractServiceInstance getServiceInstance(Service service) {
78         if (service == null) {
79             return null;
80         }
81         return serviceRegistry.get(service);
82     }
83
84     public void start() {
85         eventHandler.submit(new Runnable()  {
86             @Override
87             public void run() {
88                 try {
89                     while (true) {
90                         Node node = queue.take();
91                         /*
92                          * Since we are hooking on OpendaylightInventoryListener and as observed in
93                          * Bug 1997 multiple Threads trying to write to a same table at the same time
94                          * causes programming issues. Hence delaying the programming by a second to
95                          * avoid the clash. This hack/workaround should be removed once Bug 1997 is resolved.
96                          */
97                         LOG.info(">>>>> dequeue: {}", node);
98                         Thread.sleep(1000);
99                         for (Service service : staticPipeline) {
100                             AbstractServiceInstance serviceInstance = getServiceInstance(service);
101                             //LOG.info("pipeline: {} - {}", service, serviceInstance);
102                             if (serviceInstance != null) {
103                                 if (southbound.getBridge(node) != null) {
104                                     serviceInstance.programDefaultPipelineRule(node);
105                                 }
106                             }
107                         }
108                     }
109                 } catch (Exception e) {
110                     LOG.warn("Processing interrupted, terminating ", e);
111                 }
112
113                 while (!queue.isEmpty()) {
114                     queue.poll();
115                 }
116                 queue = null;
117             }
118         });
119     }
120
121     public void stop() {
122         queue.clear();
123         eventHandler.shutdownNow();
124     }
125
126     @Override
127     public void enqueue(Node node) {
128         LOG.info(">>>>> enqueue: {}", node);
129         try {
130             queue.put(node);
131         } catch (InterruptedException e) {
132             LOG.warn("Failed to enqueue operation {}", node, e);
133         }
134     }
135
136     @Override
137     public void notifyNode(Node node, Action action) {
138         if (action == Action.ADD) {
139             enqueue(node);
140         } else {
141             LOG.info("update ignored: {}", node);
142         }
143     }
144
145     @Override
146     public void setDependencies(BundleContext bundleContext, ServiceReference serviceReference) {
147         NodeCacheManager nodeCacheManager =
148                 (NodeCacheManager) ServiceHelper.getGlobalInstance(NodeCacheManager.class, this);
149         nodeCacheManager.cacheListenerAdded(
150                 bundleContext.getServiceReference(PipelineOrchestrator.class.getName()), this);
151         southbound =
152                 (Southbound) ServiceHelper.getGlobalInstance(Southbound.class, this);
153     }
154
155     @Override
156     public void setDependencies(Object impl) {}
157 }