934f763bf4d790be89d1750c451b39a35b28a330
[ovsdb.git] / openstack / net-virt-providers / src / main / java / org / opendaylight / ovsdb / openstack / netvirt / providers / openflow13 / AbstractServiceInstance.java
1 /*
2  * Copyright (C) 2014 Red Hat, Inc.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  *
8  * Authors : Madhu Venugopal
9  */
10 package org.opendaylight.ovsdb.openstack.netvirt.providers.openflow13;
11
12 import java.util.List;
13 import java.util.concurrent.BlockingQueue;
14 import java.util.concurrent.ExecutionException;
15 import java.util.concurrent.LinkedBlockingDeque;
16
17 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
18 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
19 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
20 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
21 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
22 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
23 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
24 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
25 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
26 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
27 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
28 import org.opendaylight.ovsdb.utils.mdsal.openflow.InstructionUtils;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.InstructionsBuilder;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.MatchBuilder;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.instruction.list.Instruction;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.instruction.list.InstructionBuilder;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.instruction.list.InstructionKey;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRemoved;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorUpdated;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRemoved;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeUpdated;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.OpendaylightInventoryListener;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeBuilder;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
52 import org.opendaylight.yangtools.yang.binding.DataObject;
53 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
54 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.InstanceIdentifierBuilder;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
57
58 import com.google.common.base.Optional;
59 import com.google.common.base.Preconditions;
60 import com.google.common.collect.Lists;
61 import com.google.common.util.concurrent.CheckedFuture;
62 import com.google.common.util.concurrent.FutureCallback;
63 import com.google.common.util.concurrent.Futures;
64
65 /**
66  * Any ServiceInstance class that extends AbstractServiceInstance to be a part of the pipeline
67  * have 2 basic requirements : <br>
68  * 1. Program a default pipeline flow to take any unmatched traffic to the next table in the pipeline. <br>
69  * 2. Get Pipeline Instructions from AbstractServiceInstance (using getMutablePipelineInstructionBuilder) and
70  *    use it in any matching flows that needs to be further processed by next service in the pipeline.
71  *
72  */
73 public abstract class AbstractServiceInstance implements OpendaylightInventoryListener, Runnable, TransactionChainListener {
74     public static final String SERVICE_PROPERTY ="serviceProperty";
75     private static final Logger logger = LoggerFactory.getLogger(AbstractServiceInstance.class);
76     public static final String OPENFLOW = "openflow:";
77     // OSGi Services that we are dependent on.
78     private volatile MdsalConsumer mdsalConsumer;
79     private volatile PipelineOrchestrator orchestrator;
80
81     // Concrete Service that this AbstractServiceInstance represent
82     private Service service;
83
84     private BindingTransactionChain txChain;
85
86     // Process Notification in its own thread
87     Thread thread = null;
88     private final BlockingQueue<String> queue = new LinkedBlockingDeque<>();
89
90     public AbstractServiceInstance (Service service) {
91         this.service = service;
92     }
93
94     // Let the Concrete service instance class decide if a Bride is part of the pipeline or not.
95     public abstract boolean isBridgeInPipeline (String nodeId);
96
97     public short getTable() {
98         return service.getTable();
99     }
100
101     public Service getService() {
102         return service;
103     }
104
105     public void setService(Service service) {
106         this.service = service;
107     }
108
109     public void start() {
110         // Register for OpenFlow bridge/node Creation notification.
111         NotificationProviderService notificationService = mdsalConsumer.getNotificationService();
112         if (notificationService != null) {
113             notificationService.registerNotificationListener(this);
114         }
115         this.txChain =  mdsalConsumer.getDataBroker().createTransactionChain(this);
116
117         // Never block a Notification thread. Process the notification in its own Thread.
118         thread = new Thread(this);
119         thread.setDaemon(true);
120         thread.setName("AbstractServiceInstance-"+service.toString());
121         thread.start();
122     }
123
124     public NodeBuilder createNodeBuilder(String nodeId) {
125         NodeBuilder builder = new NodeBuilder();
126         builder.setId(new NodeId(nodeId));
127         builder.setKey(new NodeKey(builder.getId()));
128         return builder;
129     }
130
131     /**
132      * This method returns the required Pipeline Instructions to by used by any matching flows that needs
133      * to be further processed by next service in the pipeline.
134      *
135      * Important to note that this is a convenience method which returns a mutable instructionBuilder which
136      * needs to be further adjusted by the concrete ServiceInstance class such as setting the Instruction Order, etc.
137      * @return Newly created InstructionBuilder to be used along with other instructions on the main flow
138      */
139     protected final InstructionBuilder getMutablePipelineInstructionBuilder() {
140         Service nextService = orchestrator.getNextServiceInPipeline(service);
141         if (nextService != null) {
142             return InstructionUtils.createGotoTableInstructions(new InstructionBuilder(), nextService.getTable());
143         } else {
144             return InstructionUtils.createDropInstructions(new InstructionBuilder());
145         }
146     }
147
148     protected void writeFlow(FlowBuilder flowBuilder, NodeBuilder nodeBuilder) {
149         Preconditions.checkNotNull(mdsalConsumer);
150         if (mdsalConsumer == null) {
151             logger.error("ERROR finding MDSAL Service. Its possible that writeFlow is called too soon ?");
152             return;
153         }
154
155         DataBroker dataBroker = mdsalConsumer.getDataBroker();
156         if (dataBroker == null) {
157             logger.error("ERROR finding reference for DataBroker. Please check MD-SAL support on the Controller.");
158             return;
159         }
160
161         ReadWriteTransaction modification = dataBroker.newReadWriteTransaction();
162         InstanceIdentifier<Flow> path1 = InstanceIdentifier.builder(Nodes.class).child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory
163                 .rev130819.nodes.Node.class, nodeBuilder.getKey()).augmentation(FlowCapableNode.class).child(Table.class,
164                 new TableKey(flowBuilder.getTableId())).child(Flow.class, flowBuilder.getKey()).build();
165
166         //modification.put(LogicalDatastoreType.OPERATIONAL, path1, flowBuilder.build());
167         modification.put(LogicalDatastoreType.CONFIGURATION, path1, flowBuilder.build(), true /*createMissingParents*/);
168
169
170         CheckedFuture<Void, TransactionCommitFailedException> commitFuture = modification.submit();
171         try {
172             commitFuture.get();  // TODO: Make it async (See bug 1362)
173             logger.debug("Transaction success for write of Flow "+flowBuilder.getFlowName());
174         } catch (InterruptedException|ExecutionException e) {
175             logger.error(e.getMessage(), e);
176
177         }
178     }
179
180     protected void removeFlow(FlowBuilder flowBuilder, NodeBuilder nodeBuilder) {
181         Preconditions.checkNotNull(mdsalConsumer);
182         if (mdsalConsumer == null) {
183             logger.error("ERROR finding MDSAL Service.");
184             return;
185         }
186
187         DataBroker dataBroker = mdsalConsumer.getDataBroker();
188         if (dataBroker == null) {
189             logger.error("ERROR finding reference for DataBroker. Please check MD-SAL support on the Controller.");
190             return;
191         }
192
193         WriteTransaction modification = dataBroker.newWriteOnlyTransaction();
194         InstanceIdentifier<Flow> path1 = InstanceIdentifier.builder(Nodes.class)
195                 .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory
196                                .rev130819.nodes.Node.class, nodeBuilder.getKey())
197                 .augmentation(FlowCapableNode.class).child(Table.class,
198                                                            new TableKey(flowBuilder.getTableId())).child(Flow.class, flowBuilder.getKey()).build();
199         //modification.delete(LogicalDatastoreType.OPERATIONAL, nodeBuilderToInstanceId(nodeBuilder));
200         //modification.delete(LogicalDatastoreType.OPERATIONAL, path1);
201         //modification.delete(LogicalDatastoreType.CONFIGURATION, nodeBuilderToInstanceId(nodeBuilder));
202         modification.delete(LogicalDatastoreType.CONFIGURATION, path1);
203
204         CheckedFuture<Void, TransactionCommitFailedException> commitFuture = modification.submit();
205         try {
206             commitFuture.get();  // TODO: Make it async (See bug 1362)
207             logger.debug("Transaction success for deletion of Flow "+flowBuilder.getFlowName());
208         } catch (InterruptedException|ExecutionException e) {
209             logger.error(e.getMessage(), e);
210         }
211     }
212
213     public Flow getFlow(FlowBuilder flowBuilder, NodeBuilder nodeBuilder) {
214         Preconditions.checkNotNull(mdsalConsumer);
215         if (mdsalConsumer == null) {
216             logger.error("ERROR finding MDSAL Service. Its possible that writeFlow is called too soon ?");
217             return null;
218         }
219
220         DataBroker dataBroker = mdsalConsumer.getDataBroker();
221         if (dataBroker == null) {
222             logger.error("ERROR finding reference for DataBroker. Please check MD-SAL support on the Controller.");
223             return null;
224         }
225
226         InstanceIdentifier<Flow> path1 = InstanceIdentifier.builder(Nodes.class).child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory
227                 .rev130819.nodes.Node.class, nodeBuilder.getKey()).augmentation(FlowCapableNode.class).child(Table.class,
228                 new TableKey(flowBuilder.getTableId())).child(Flow.class, flowBuilder.getKey()).build();
229
230         ReadOnlyTransaction readTx = dataBroker.newReadOnlyTransaction();
231         try {
232             Optional<Flow> data = readTx.read(LogicalDatastoreType.CONFIGURATION, path1).get();
233             if (data.isPresent()) {
234                 return data.get();
235             }
236         } catch (InterruptedException|ExecutionException e) {
237             logger.error(e.getMessage(), e);
238         }
239
240         logger.debug("Cannot find data for Flow " + flowBuilder.getFlowName());
241         return null;
242     }
243
244     /**
245      * Program Default Pipeline Flow.
246      *
247      * @param nodeId Node on which the default pipeline flow is programmed.
248      */
249     protected void programDefaultPipelineRule(String nodeId) {
250         MatchBuilder matchBuilder = new MatchBuilder();
251         FlowBuilder flowBuilder = new FlowBuilder();
252         NodeBuilder nodeBuilder = createNodeBuilder(nodeId);
253
254         // Create the OF Actions and Instructions
255         InstructionsBuilder isb = new InstructionsBuilder();
256
257         // Instructions List Stores Individual Instructions
258         List<Instruction> instructions = Lists.newArrayList();
259
260         // Call the InstructionBuilder Methods Containing Actions
261         InstructionBuilder ib = this.getMutablePipelineInstructionBuilder();
262         ib.setOrder(0);
263         ib.setKey(new InstructionKey(0));
264         instructions.add(ib.build());
265
266         // Add InstructionBuilder to the Instruction(s)Builder List
267         isb.setInstruction(instructions);
268
269         // Add InstructionsBuilder to FlowBuilder
270         flowBuilder.setInstructions(isb.build());
271
272         String flowId = "DEFAULT_PIPELINE_FLOW";
273         flowBuilder.setId(new FlowId(flowId));
274         FlowKey key = new FlowKey(new FlowId(flowId));
275         flowBuilder.setMatch(matchBuilder.build());
276         flowBuilder.setPriority(0);
277         flowBuilder.setBarrier(true);
278         flowBuilder.setTableId(service.getTable());
279         flowBuilder.setKey(key);
280         flowBuilder.setFlowName(flowId);
281         flowBuilder.setHardTimeout(0);
282         flowBuilder.setIdleTimeout(0);
283         writeFlow(flowBuilder, nodeBuilder);
284     }
285
286     @Override
287     public void onNodeConnectorRemoved(NodeConnectorRemoved nodeConector) {
288     }
289
290     @Override
291     public void onNodeConnectorUpdated(NodeConnectorUpdated nodeConnector) {
292     }
293
294     @Override
295     public void onNodeRemoved(NodeRemoved node) {
296     }
297
298
299     @Override
300     public void run() {
301         try {
302             for (; ; ) {
303                 String nodeId = queue.take();
304                 this.programDefaultPipelineRule(nodeId);
305             }
306         } catch (InterruptedException e) {
307             logger.warn("Processing interrupted, terminating", e);
308         }
309
310         while (!queue.isEmpty()) {
311             queue.poll();
312         }
313
314     }
315
316     void enqueue(final String nodeId) {
317         try {
318             queue.put(nodeId);
319         } catch (InterruptedException e) {
320             logger.warn("Failed to enqueue operation {}", nodeId, e);
321         }
322     }
323
324     /**
325      * Process the Node update notification. Check for Openflow node and make sure if the bridge is part of the Pipeline before
326      * programming the Pipeline specific flows.
327      */
328     @Override
329     public void onNodeUpdated(NodeUpdated nodeUpdated) {
330         NodeRef ref = nodeUpdated.getNodeRef();
331         InstanceIdentifier<Node> identifier = (InstanceIdentifier<Node>) ref.getValue();
332         logger.info("GOT NOTIFICATION FOR "+identifier.toString());
333         final NodeKey key = identifier.firstKeyOf(Node.class, NodeKey.class);
334         final String nodeId = key.getId().getValue();
335         if (!this.isBridgeInPipeline(nodeId)) {
336             logger.debug("Bridge {} is not in pipeline", nodeId);
337             return;
338         }
339         if (key != null && key.getId().getValue().contains("openflow")) {
340             InstanceIdentifierBuilder<Node> builder = ((InstanceIdentifier<Node>) ref.getValue()).builder();
341             InstanceIdentifierBuilder<FlowCapableNode> augmentation = builder.augmentation(FlowCapableNode.class);
342             final InstanceIdentifier<FlowCapableNode> path = augmentation.build();
343             CheckedFuture readFuture = txChain.newReadWriteTransaction().read(LogicalDatastoreType.OPERATIONAL, path);
344             Futures.addCallback(readFuture, new FutureCallback<Optional<? extends DataObject>>() {
345                 @Override
346                 public void onSuccess(Optional<? extends DataObject> optional) {
347                     if (!optional.isPresent()) {
348                         enqueue(nodeId);
349                     }
350                 }
351
352                 @Override
353                 public void onFailure(Throwable throwable) {
354                     logger.debug(String.format("Can't retrieve node data for node %s. Writing node data with table0.", nodeId));
355                     enqueue(nodeId);
356                 }
357             });
358         }
359     }
360
361     @Override
362     public void onTransactionChainFailed(final TransactionChain<?, ?> chain, final AsyncTransaction<?, ?> transaction,
363             final Throwable cause) {
364         logger.error("Failed to export Flow Capable Inventory, Transaction {} failed.",transaction.getIdentifier(),cause);
365     }
366
367     @Override
368     public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
369     }
370 }