Bug 2707: Pipeline flows are not programmed because of failed onNodeUpdated call
[ovsdb.git] / openstack / net-virt-providers / src / main / java / org / opendaylight / ovsdb / openstack / netvirt / providers / openflow13 / PipelineOrchestratorImpl.java
1 /*
2  * Copyright (C) 2014 Red Hat, Inc.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  *
8  * Authors : Dave Tucker, Madhu Venugopal
9  */
10
11 package org.opendaylight.ovsdb.openstack.netvirt.providers.openflow13;
12
13 import com.google.common.collect.Lists;
14 import com.google.common.collect.Maps;
15 import java.util.List;
16 import java.util.Map;
17 import java.util.concurrent.BlockingQueue;
18 import java.util.concurrent.ExecutorService;
19 import java.util.concurrent.Executors;
20 import java.util.concurrent.LinkedBlockingQueue;
21 import org.osgi.framework.ServiceReference;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
24
25 public class PipelineOrchestratorImpl implements PipelineOrchestrator {
26
27     private static final Logger logger = LoggerFactory.getLogger(PipelineOrchestratorImpl.class);
28     private List<Service> staticPipeline = Lists.newArrayList(
29                                                                 Service.CLASSIFIER,
30                                                                 Service.ARP_RESPONDER,
31                                                                 Service.INBOUND_NAT,
32                                                                 Service.EGRESS_ACL,
33                                                                 Service.LOAD_BALANCER,
34                                                                 Service.ROUTING,
35                                                                 Service.L3_FORWARDING,
36                                                                 Service.L2_REWRITE,
37                                                                 Service.INGRESS_ACL,
38                                                                 Service.OUTBOUND_NAT,
39                                                                 Service.L2_FORWARDING
40                                                               );
41     Map<Service, AbstractServiceInstance> serviceRegistry = Maps.newConcurrentMap();
42     private volatile BlockingQueue<String> queue;
43     private ExecutorService eventHandler;
44     public PipelineOrchestratorImpl() {
45     }
46
47     public void registerService(final ServiceReference ref, AbstractServiceInstance serviceInstance){
48         Service service = (Service)ref.getProperty(AbstractServiceInstance.SERVICE_PROPERTY);
49         serviceRegistry.put(service, serviceInstance);
50     }
51
52     public void unregisterService(final ServiceReference ref) {
53         serviceRegistry.remove(ref.getProperty(AbstractServiceInstance.SERVICE_PROPERTY));
54     }
55     @Override
56     public Service getNextServiceInPipeline(Service service) {
57         int index = staticPipeline.indexOf(service);
58         if (index >= staticPipeline.size() - 1) return null;
59         return staticPipeline.get(index + 1);
60     }
61
62     @Override
63     public AbstractServiceInstance getServiceInstance(Service service) {
64         if (service == null) return null;
65         return serviceRegistry.get(service);
66     }
67
68     public void init() {
69         eventHandler = Executors.newSingleThreadExecutor();
70         this.queue = new LinkedBlockingQueue<String>();
71     }
72
73     public void start() {
74         eventHandler.submit(new Runnable()  {
75             @Override
76             public void run() {
77                 try {
78                     while (true) {
79                         String nodeId = queue.take();
80                         /*
81                          * Since we are hooking on OpendaylightInventoryListener and as observed in
82                          * Bug 1997 multiple Threads trying to write to a same table at the same time
83                          * causes programming issues. Hence delaying the programming by a second to
84                          * avoid the clash. This hack/workaround should be removed once Bug 1997 is resolved.
85                          */
86                         logger.info(">>>>> dequeue: {}", nodeId);
87                         Thread.sleep(1000);
88                         for (Service service : staticPipeline) {
89                             AbstractServiceInstance serviceInstance = getServiceInstance(service);
90                             serviceInstance.programDefaultPipelineRule(nodeId);
91                         }
92                     }
93                 } catch (Exception e) {
94                     logger.warn("Processing interrupted, terminating ", e);
95                 }
96
97                 while (!queue.isEmpty()) {
98                     queue.poll();
99                 }
100                 queue = null;
101             }
102         });
103     }
104
105     public void stop() {
106         queue.clear();
107         eventHandler.shutdownNow();
108     }
109
110     @Override
111     public void enqueue(String nodeId) {
112         logger.info(">>>>> enqueue: {}", nodeId);
113         try {
114             queue.put(new String(nodeId));
115         } catch (InterruptedException e) {
116             logger.warn("Failed to enqueue operation {}", nodeId, e);
117         }
118     }
119 }