Add support for openflow node callbacks
[ovsdb.git] / openstack / net-virt-providers / src / main / java / org / opendaylight / ovsdb / openstack / netvirt / providers / openflow13 / PipelineOrchestratorImpl.java
1 /*
2  * Copyright (C) 2014 Red Hat, Inc.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  *
8  * Authors : Dave Tucker, Madhu Venugopal
9  */
10
11 package org.opendaylight.ovsdb.openstack.netvirt.providers.openflow13;
12
13 import com.google.common.collect.Lists;
14 import com.google.common.collect.Maps;
15 import java.util.List;
16 import java.util.Map;
17 import java.util.concurrent.BlockingQueue;
18 import java.util.concurrent.ExecutorService;
19 import java.util.concurrent.Executors;
20 import java.util.concurrent.LinkedBlockingQueue;
21 import org.opendaylight.ovsdb.openstack.netvirt.api.Action;
22 import org.opendaylight.ovsdb.openstack.netvirt.api.NodeCacheListener;
23 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
24 import org.osgi.framework.ServiceReference;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27
28 public class PipelineOrchestratorImpl implements NodeCacheListener, PipelineOrchestrator {
29
30     private static final Logger logger = LoggerFactory.getLogger(PipelineOrchestratorImpl.class);
31     private List<Service> staticPipeline = Lists.newArrayList(
32             Service.CLASSIFIER,
33             Service.ARP_RESPONDER,
34             Service.INBOUND_NAT,
35             Service.EGRESS_ACL,
36             Service.LOAD_BALANCER,
37             Service.ROUTING,
38             Service.L3_FORWARDING,
39             Service.L2_REWRITE,
40             Service.INGRESS_ACL,
41             Service.OUTBOUND_NAT,
42             Service.L2_FORWARDING
43     );
44     Map<Service, AbstractServiceInstance> serviceRegistry = Maps.newConcurrentMap();
45     private volatile BlockingQueue<Node> queue;
46     private ExecutorService eventHandler;
47     public PipelineOrchestratorImpl() {
48     }
49
50     public void registerService(final ServiceReference ref, AbstractServiceInstance serviceInstance){
51         Service service = (Service)ref.getProperty(AbstractServiceInstance.SERVICE_PROPERTY);
52         logger.info("registerService {} - {}", serviceInstance, service);
53         serviceRegistry.put(service, serviceInstance);
54     }
55
56     public void unregisterService(final ServiceReference ref) {
57         serviceRegistry.remove(ref.getProperty(AbstractServiceInstance.SERVICE_PROPERTY));
58     }
59     @Override
60     public Service getNextServiceInPipeline(Service service) {
61         int index = staticPipeline.indexOf(service);
62         if (index >= staticPipeline.size() - 1) return null;
63         return staticPipeline.get(index + 1);
64     }
65
66     @Override
67     public AbstractServiceInstance getServiceInstance(Service service) {
68         if (service == null) return null;
69         return serviceRegistry.get(service);
70     }
71
72     public void init() {
73         eventHandler = Executors.newSingleThreadExecutor();
74         this.queue = new LinkedBlockingQueue<Node>();
75         logger.info(">>>>> init PipelineOrchestratorImpl");
76     }
77
78     public void start() {
79         eventHandler.submit(new Runnable()  {
80             @Override
81             public void run() {
82                 try {
83                     while (true) {
84                         Node node = queue.take();
85                         /*
86                          * Since we are hooking on OpendaylightInventoryListener and as observed in
87                          * Bug 1997 multiple Threads trying to write to a same table at the same time
88                          * causes programming issues. Hence delaying the programming by a second to
89                          * avoid the clash. This hack/workaround should be removed once Bug 1997 is resolved.
90                          */
91                         logger.info(">>>>> dequeue: {}", node);
92                         Thread.sleep(1000);
93                         for (Service service : staticPipeline) {
94                             AbstractServiceInstance serviceInstance = getServiceInstance(service);
95                             logger.info("pipeline: {} - {}", service, serviceInstance);
96                             if (serviceInstance != null) {
97                                 serviceInstance.programDefaultPipelineRule(node);
98                             }
99                         }
100                     }
101                 } catch (Exception e) {
102                     logger.warn("Processing interrupted, terminating ", e);
103                 }
104
105                 while (!queue.isEmpty()) {
106                     queue.poll();
107                 }
108                 queue = null;
109             }
110         });
111     }
112
113     public void stop() {
114         queue.clear();
115         eventHandler.shutdownNow();
116     }
117
118     @Override
119     public void enqueue(Node node) {
120         logger.info(">>>>> enqueue: {}", node);
121         try {
122             queue.put(node);
123         } catch (InterruptedException e) {
124             logger.warn("Failed to enqueue operation {}", node, e);
125         }
126     }
127
128     @Override
129     public void notifyNode(Node node, Action action) {
130         enqueue(node);
131     }
132 }