Merge "Revert "Added controller is-connected code""
[netvirt.git] / openstack / net-virt-providers / src / main / java / org / opendaylight / ovsdb / openstack / netvirt / providers / openflow13 / PipelineOrchestratorImpl.java
1 /*
2  * Copyright (C) 2014 Red Hat, Inc.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  *
8  * Authors : Dave Tucker, Madhu Venugopal
9  */
10
11 package org.opendaylight.ovsdb.openstack.netvirt.providers.openflow13;
12
13 import com.google.common.collect.Lists;
14 import com.google.common.collect.Maps;
15 import java.util.List;
16 import java.util.Map;
17 import java.util.concurrent.BlockingQueue;
18 import java.util.concurrent.ExecutorService;
19 import java.util.concurrent.Executors;
20 import java.util.concurrent.LinkedBlockingQueue;
21 import org.opendaylight.ovsdb.openstack.netvirt.MdsalUtils;
22 import org.opendaylight.ovsdb.openstack.netvirt.api.Action;
23 import org.opendaylight.ovsdb.openstack.netvirt.api.NodeCacheListener;
24 import org.opendaylight.ovsdb.openstack.netvirt.api.NodeCacheManager;
25 import org.opendaylight.ovsdb.openstack.netvirt.providers.ConfigInterface;
26 import org.opendaylight.ovsdb.utils.servicehelper.ServiceHelper;
27 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
28 import org.osgi.framework.BundleContext;
29 import org.osgi.framework.ServiceReference;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32
33 public class PipelineOrchestratorImpl implements ConfigInterface, NodeCacheListener, PipelineOrchestrator {
34     private static final Logger logger = LoggerFactory.getLogger(PipelineOrchestratorImpl.class);
35     private List<Service> staticPipeline = Lists.newArrayList(
36             Service.CLASSIFIER,
37             Service.ARP_RESPONDER,
38             Service.INBOUND_NAT,
39             Service.EGRESS_ACL,
40             Service.LOAD_BALANCER,
41             Service.ROUTING,
42             Service.L3_FORWARDING,
43             Service.L2_REWRITE,
44             Service.INGRESS_ACL,
45             Service.OUTBOUND_NAT,
46             Service.L2_FORWARDING
47     );
48     Map<Service, AbstractServiceInstance> serviceRegistry = Maps.newConcurrentMap();
49     private volatile BlockingQueue<Node> queue;
50     private ExecutorService eventHandler;
51
52     public PipelineOrchestratorImpl() {
53         eventHandler = Executors.newSingleThreadExecutor();
54         this.queue = new LinkedBlockingQueue<Node>();
55         logger.info("PipelineOrchestratorImpl constructor");
56         start();
57     }
58
59     public void registerService(final ServiceReference ref, AbstractServiceInstance serviceInstance){
60         Service service = (Service)ref.getProperty(AbstractServiceInstance.SERVICE_PROPERTY);
61         logger.info("registerService {} - {}", serviceInstance, service);
62         serviceRegistry.put(service, serviceInstance);
63     }
64
65     public void unregisterService(final ServiceReference ref) {
66         serviceRegistry.remove(ref.getProperty(AbstractServiceInstance.SERVICE_PROPERTY));
67     }
68     @Override
69     public Service getNextServiceInPipeline(Service service) {
70         int index = staticPipeline.indexOf(service);
71         if (index >= staticPipeline.size() - 1) return null;
72         return staticPipeline.get(index + 1);
73     }
74
75     @Override
76     public AbstractServiceInstance getServiceInstance(Service service) {
77         if (service == null) return null;
78         return serviceRegistry.get(service);
79     }
80
81     public void start() {
82         eventHandler.submit(new Runnable()  {
83             @Override
84             public void run() {
85                 try {
86                     while (true) {
87                         Node node = queue.take();
88                         /*
89                          * Since we are hooking on OpendaylightInventoryListener and as observed in
90                          * Bug 1997 multiple Threads trying to write to a same table at the same time
91                          * causes programming issues. Hence delaying the programming by a second to
92                          * avoid the clash. This hack/workaround should be removed once Bug 1997 is resolved.
93                          */
94                         logger.info(">>>>> dequeue: {}", node);
95                         Thread.sleep(1000);
96                         for (Service service : staticPipeline) {
97                             AbstractServiceInstance serviceInstance = getServiceInstance(service);
98                             //logger.info("pipeline: {} - {}", service, serviceInstance);
99                             if (serviceInstance != null) {
100                                 if (MdsalUtils.getBridge(node) != null) {
101                                     serviceInstance.programDefaultPipelineRule(node);
102                                 }
103                             }
104                         }
105                     }
106                 } catch (Exception e) {
107                     logger.warn("Processing interrupted, terminating ", e);
108                 }
109
110                 while (!queue.isEmpty()) {
111                     queue.poll();
112                 }
113                 queue = null;
114             }
115         });
116     }
117
118     public void stop() {
119         queue.clear();
120         eventHandler.shutdownNow();
121     }
122
123     @Override
124     public void enqueue(Node node) {
125         logger.info(">>>>> enqueue: {}", node);
126         try {
127             queue.put(node);
128         } catch (InterruptedException e) {
129             logger.warn("Failed to enqueue operation {}", node, e);
130         }
131     }
132
133     @Override
134     public void notifyNode(Node node, Action action) {
135         if (action == Action.ADD) {
136             enqueue(node);
137         } else {
138             logger.info("update ignored: {}", node);
139         }
140     }
141
142     @Override
143     public void setDependencies(BundleContext bundleContext, ServiceReference serviceReference) {
144         NodeCacheManager nodeCacheManager =
145                 (NodeCacheManager) ServiceHelper.getGlobalInstance(NodeCacheManager.class, this);
146         nodeCacheManager.cacheListenerAdded(
147                 bundleContext.getServiceReference(PipelineOrchestrator.class.getName()), this);
148     }
149
150     @Override
151     public void setDependencies(Object impl) {
152
153     }
154 }