Added support to update flows for induvidual security rule add/remove ,
[ovsdb.git] / openstack / net-virt-providers / src / main / java / org / opendaylight / ovsdb / openstack / netvirt / providers / openflow13 / PipelineOrchestratorImpl.java
1 /*
2  * Copyright (c) 2014, 2015 Red Hat, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.ovsdb.openstack.netvirt.providers.openflow13;
10
11 import java.util.List;
12 import java.util.Map;
13 import java.util.concurrent.BlockingQueue;
14 import java.util.concurrent.ExecutorService;
15 import java.util.concurrent.Executors;
16 import java.util.concurrent.LinkedBlockingQueue;
17
18 import org.opendaylight.ovsdb.openstack.netvirt.api.Action;
19 import org.opendaylight.ovsdb.openstack.netvirt.api.NodeCacheListener;
20 import org.opendaylight.ovsdb.openstack.netvirt.api.NodeCacheManager;
21 import org.opendaylight.ovsdb.openstack.netvirt.api.Southbound;
22 import org.opendaylight.ovsdb.openstack.netvirt.providers.ConfigInterface;
23 import org.opendaylight.ovsdb.utils.servicehelper.ServiceHelper;
24 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
25 import org.osgi.framework.BundleContext;
26 import org.osgi.framework.ServiceReference;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29
30 import com.google.common.collect.Lists;
31 import com.google.common.collect.Maps;
32
33 public class PipelineOrchestratorImpl implements ConfigInterface, NodeCacheListener, PipelineOrchestrator {
34     private static final Logger LOG = LoggerFactory.getLogger(PipelineOrchestratorImpl.class);
35     private List<Service> staticPipeline = Lists.newArrayList(
36             Service.CLASSIFIER,
37             Service.ARP_RESPONDER,
38             Service.INBOUND_NAT,
39             Service.EGRESS_ACL,
40             Service.LOAD_BALANCER,
41             Service.ROUTING,
42             Service.L3_FORWARDING,
43             Service.L2_REWRITE,
44             Service.INGRESS_ACL,
45             Service.OUTBOUND_NAT,
46             Service.L2_FORWARDING
47     );
48     Map<Service, AbstractServiceInstance> serviceRegistry = Maps.newConcurrentMap();
49     private volatile BlockingQueue<Node> queue;
50     private ExecutorService eventHandler;
51     private Southbound southbound;
52
53     public PipelineOrchestratorImpl() {
54         eventHandler = Executors.newSingleThreadExecutor();
55         this.queue = new LinkedBlockingQueue<>();
56         LOG.info("PipelineOrchestratorImpl constructor");
57         start();
58     }
59
60     public void registerService(final ServiceReference ref, AbstractServiceInstance serviceInstance){
61         Service service = (Service)ref.getProperty(AbstractServiceInstance.SERVICE_PROPERTY);
62         LOG.info("registerService {} - {}", serviceInstance, service);
63         serviceRegistry.put(service, serviceInstance);
64     }
65
66     public void unregisterService(final ServiceReference ref) {
67         serviceRegistry.remove(ref.getProperty(AbstractServiceInstance.SERVICE_PROPERTY));
68     }
69     @Override
70     public Service getNextServiceInPipeline(Service service) {
71         int index = staticPipeline.indexOf(service);
72         if (index >= staticPipeline.size() - 1) {
73             return null;
74         }
75         return staticPipeline.get(index + 1);
76     }
77
78     @Override
79     public AbstractServiceInstance getServiceInstance(Service service) {
80         if (service == null) {
81             return null;
82         }
83         return serviceRegistry.get(service);
84     }
85
86     public final void start() {
87         eventHandler.submit(new Runnable()  {
88             @Override
89             public void run() {
90                 try {
91                     while (true) {
92                         Node node = queue.take();
93                         LOG.info(">>>>> dequeue: {}", node);
94                         for (Service service : staticPipeline) {
95                             AbstractServiceInstance serviceInstance = getServiceInstance(service);
96                             if (serviceInstance != null && southbound.getBridge(node) != null) {
97                                 serviceInstance.programDefaultPipelineRule(node);
98                             }
99                         }
100                     }
101                 } catch (Exception e) {
102                     LOG.warn("Processing interrupted, terminating ", e);
103                 }
104
105                 while (!queue.isEmpty()) {
106                     queue.poll();
107                 }
108                 queue = null;
109             }
110         });
111     }
112
113     public void stop() {
114         queue.clear();
115         eventHandler.shutdownNow();
116     }
117
118     @Override
119     public void enqueue(Node node) {
120         LOG.info(">>>>> enqueue: {}", node);
121         try {
122             queue.put(node);
123         } catch (InterruptedException e) {
124             LOG.warn("Failed to enqueue operation {}", node, e);
125         }
126     }
127
128     @Override
129     public void notifyNode(Node node, Action action) {
130         if (action == Action.ADD) {
131             enqueue(node);
132         } else {
133             LOG.info("update ignored: {}", node);
134         }
135     }
136
137     @Override
138     public void setDependencies(BundleContext bundleContext, ServiceReference serviceReference) {
139         NodeCacheManager nodeCacheManager =
140                 (NodeCacheManager) ServiceHelper.getGlobalInstance(NodeCacheManager.class, this);
141         nodeCacheManager.cacheListenerAdded(
142                 bundleContext.getServiceReference(PipelineOrchestrator.class.getName()), this);
143         southbound =
144                 (Southbound) ServiceHelper.getGlobalInstance(Southbound.class, this);
145     }
146
147     @Override
148     public void setDependencies(Object impl) {}
149 }