Bug 2045 - Egress / Ingress ACL swap broke the pipeline
[ovsdb.git] / openstack / net-virt-providers / src / main / java / org / opendaylight / ovsdb / openstack / netvirt / providers / openflow13 / PipelineOrchestratorImpl.java
1 /*
2  * Copyright (C) 2014 Red Hat, Inc.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  *
8  * Authors : Dave Tucker, Madhu Venugopal
9  */
10
11 package org.opendaylight.ovsdb.openstack.netvirt.providers.openflow13;
12
13 import java.util.List;
14 import java.util.Map;
15 import java.util.concurrent.BlockingQueue;
16 import java.util.concurrent.ExecutorService;
17 import java.util.concurrent.Executors;
18 import java.util.concurrent.LinkedBlockingQueue;
19
20 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
21 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
22 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
23 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
24 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
25 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRemoved;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorUpdated;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRemoved;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeUpdated;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.OpendaylightInventoryListener;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
35 import org.opendaylight.yangtools.yang.binding.DataObject;
36 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
37 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.InstanceIdentifierBuilder;
38 import org.osgi.framework.ServiceReference;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42 import com.google.common.base.Optional;
43 import com.google.common.collect.Lists;
44 import com.google.common.collect.Maps;
45 import com.google.common.util.concurrent.CheckedFuture;
46 import com.google.common.util.concurrent.FutureCallback;
47 import com.google.common.util.concurrent.Futures;
48
49 public class PipelineOrchestratorImpl implements PipelineOrchestrator, OpendaylightInventoryListener, TransactionChainListener {
50
51     private static final Logger logger = LoggerFactory.getLogger(PipelineOrchestratorImpl.class);
52     private List<Service> staticPipeline = Lists.newArrayList(
53                                                                 Service.CLASSIFIER,
54                                                                 Service.ARP_RESPONDER,
55                                                                 Service.INBOUND_NAT,
56                                                                 Service.EGRESS_ACL,
57                                                                 Service.LOAD_BALANCER,
58                                                                 Service.ROUTING,
59                                                                 Service.L3_FORWARDING,
60                                                                 Service.L2_REWRITE,
61                                                                 Service.INGRESS_ACL,
62                                                                 Service.OUTBOUND_NAT,
63                                                                 Service.L2_FORWARDING
64                                                               );
65     Map<Service, AbstractServiceInstance> serviceRegistry = Maps.newConcurrentMap();
66     private volatile MdsalConsumer mdsalConsumer;
67     private volatile BlockingQueue<String> queue;
68     private ExecutorService eventHandler;
69     public PipelineOrchestratorImpl() {
70     }
71
72     public void registerService(final ServiceReference ref, AbstractServiceInstance serviceInstance){
73         Service service = (Service)ref.getProperty(AbstractServiceInstance.SERVICE_PROPERTY);
74         serviceRegistry.put(service, serviceInstance);
75     }
76
77     public void unregisterService(final ServiceReference ref) {
78         serviceRegistry.remove(ref.getProperty(AbstractServiceInstance.SERVICE_PROPERTY));
79     }
80     @Override
81     public Service getNextServiceInPipeline(Service service) {
82         int index = staticPipeline.indexOf(service);
83         if (index >= staticPipeline.size() - 1) return null;
84         return staticPipeline.get(index + 1);
85     }
86
87     @Override
88     public AbstractServiceInstance getServiceInstance(Service service) {
89         if (service == null) return null;
90         return serviceRegistry.get(service);
91     }
92
93     public void init() {
94         eventHandler = Executors.newSingleThreadExecutor();
95         this.queue = new LinkedBlockingQueue<String>();
96         NotificationProviderService notificationService = mdsalConsumer.getNotificationService();
97         if (notificationService != null) {
98             notificationService.registerNotificationListener(this);
99         }
100     }
101     public void start() {
102         eventHandler.submit(new Runnable()  {
103             @Override
104             public void run() {
105                 try {
106                     while (true) {
107                         String nodeId = queue.take();
108                         /*
109                          * Since we are hooking on OpendaylightInventoryListener and as observed in
110                          * Bug 1997 multiple Threads trying to write to a same table at the same time
111                          * causes programming issues. Hence delaying the programming by a second to
112                          * avoid the clash. This hack/workaround should be removed once Bug 1997 is resolved.
113                          */
114                         Thread.sleep(1000);
115                         for (Service service : staticPipeline) {
116                             AbstractServiceInstance serviceInstance = getServiceInstance(service);
117                             serviceInstance.programDefaultPipelineRule(nodeId);
118                         }
119                     }
120                 } catch (Exception e) {
121                     logger.warn("Processing interrupted, terminating ", e);
122                 }
123
124                 while (!queue.isEmpty()) {
125                     queue.poll();
126                 }
127                 queue = null;
128             }
129         });
130     }
131
132     public void stop() {
133         queue.clear();
134         eventHandler.shutdownNow();
135     }
136
137     void enqueue(String nodeId) {
138         try {
139             queue.put(new String(nodeId));
140         } catch (InterruptedException e) {
141             logger.warn("Failed to enqueue operation {}", nodeId, e);
142         }
143     }
144
145
146     /**
147      * Process the Node update notification. Check for Openflow node and make sure if the bridge is part of the Pipeline before
148      * programming the Pipeline specific flows.
149      */
150     @Override
151     public void onNodeUpdated(NodeUpdated nodeUpdated) {
152         NodeRef ref = nodeUpdated.getNodeRef();
153         InstanceIdentifier<Node> identifier = (InstanceIdentifier<Node>) ref.getValue();
154         logger.debug("Node Update received for : "+identifier.toString());
155         final NodeKey key = identifier.firstKeyOf(Node.class, NodeKey.class);
156         final String nodeId = key.getId().getValue();
157         if (key != null && key.getId().getValue().contains("openflow")) {
158             InstanceIdentifierBuilder<Node> builder = ((InstanceIdentifier<Node>) ref.getValue()).builder();
159             InstanceIdentifierBuilder<FlowCapableNode> augmentation = builder.augmentation(FlowCapableNode.class);
160             final InstanceIdentifier<FlowCapableNode> path = augmentation.build();
161             BindingTransactionChain txChain = mdsalConsumer.getDataBroker().createTransactionChain(this);
162             CheckedFuture readFuture = txChain.newReadWriteTransaction().read(LogicalDatastoreType.OPERATIONAL, path);
163             Futures.addCallback(readFuture, new FutureCallback<Optional<? extends DataObject>>() {
164                 @Override
165                 public void onSuccess(Optional<? extends DataObject> optional) {
166                     if (!optional.isPresent()) {
167                         enqueue(nodeId);
168                     }
169                 }
170
171                 @Override
172                 public void onFailure(Throwable throwable) {
173                     logger.debug(String.format("Can't retrieve node data for node %s. Writing node data with table0.", nodeId));
174                     enqueue(nodeId);
175                 }
176             });
177         }
178     }
179
180     @Override
181     public void onTransactionChainFailed(final TransactionChain<?, ?> chain, final AsyncTransaction<?, ?> transaction,
182             final Throwable cause) {
183         logger.error("Failed to export Flow Capable Inventory, Transaction {} failed.",transaction.getIdentifier(),cause);
184     }
185
186     @Override
187     public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
188     }
189
190     @Override
191     public void onNodeConnectorRemoved(NodeConnectorRemoved arg0) {
192         // TODO Auto-generated method stub
193
194     }
195
196     @Override
197     public void onNodeConnectorUpdated(NodeConnectorUpdated arg0) {
198         // TODO Auto-generated method stub
199
200     }
201
202     @Override
203     public void onNodeRemoved(NodeRemoved arg0) {
204         // TODO Auto-generated method stub
205
206     }
207
208 }