Added flow and group NSF.
[controller.git] / opendaylight / forwardingrulesmanager_mdsal / openflow / src / main / java / org / opendaylight / controller / forwardingrulesmanager_mdsal / consumer / impl / FlowConsumerImpl.java
1 package org.opendaylight.controller.forwardingrulesmanager_mdsal.consumer.impl;
2
3 import java.util.ArrayList;
4 import java.util.EnumSet;
5 import java.util.HashMap;
6 import java.util.List;
7 import java.util.Map;
8 import java.util.Map.Entry;
9 import java.util.Set;
10 import java.util.concurrent.ConcurrentHashMap;
11 import java.util.concurrent.ConcurrentMap;
12 import java.util.concurrent.Future;
13
14 import org.opendaylight.controller.clustering.services.CacheConfigException;
15 import org.opendaylight.controller.clustering.services.CacheExistException;
16 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
17 import org.opendaylight.controller.clustering.services.IClusterServices;
18 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
19 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction;
20 import org.opendaylight.controller.md.sal.common.api.data.DataModification;
21 import org.opendaylight.controller.sal.common.util.Rpcs;
22 import org.opendaylight.controller.sal.core.IContainer;
23 import org.opendaylight.controller.sal.utils.GlobalConstants;
24 import org.opendaylight.controller.sal.utils.ServiceHelper;
25 import org.opendaylight.controller.sal.utils.Status;
26 import org.opendaylight.controller.sal.utils.StatusCode;
27 import org.opendaylight.controller.switchmanager.ISwitchManager;
28 import org.opendaylight.controller.switchmanager.Switch;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.config.rev130819.Flows;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.config.rev130819.flows.Flow;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.config.rev130819.flows.FlowKey;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowAdded;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowUpdated;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeFlow;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInputBuilder;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowListener;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.instruction.list.Instruction;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
48 import org.opendaylight.yangtools.concepts.Registration;
49 import org.opendaylight.yangtools.yang.binding.DataObject;
50 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
51 import org.opendaylight.yangtools.yang.binding.NotificationListener;
52 import org.opendaylight.yangtools.yang.common.RpcResult;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55
56 public class FlowConsumerImpl {
57     protected static final Logger logger = LoggerFactory.getLogger(FlowConsumerImpl.class);
58     private FlowEventListener flowEventListener = new FlowEventListener();
59     private Registration<NotificationListener> listener1Reg;
60     private SalFlowService flowService;
61     // private FlowDataListener listener;
62     private FlowDataCommitHandler commitHandler;
63     private static ConcurrentHashMap<FlowKey, Flow> originalSwView;
64     private static ConcurrentMap<FlowKey, Flow> installedSwView;
65     private IClusterContainerServices clusterContainerService = null;
66     private IContainer container;
67     private static final String NAMEREGEX = "^[a-zA-Z0-9]+$";
68     private static ConcurrentMap<Integer, Flow> staticFlows;
69     private static ConcurrentMap<Integer, Integer> staticFlowsOrdinal;
70     /*
71      * Inactive flow list. This is for the global instance of FRM It will
72      * contain all the flow entries which were installed on the global container
73      * when the first container is created.
74      */
75     private static ConcurrentMap<FlowKey, Flow> inactiveFlows;
76
77     /*
78      * /* Per node indexing
79      */
80     private static ConcurrentMap<Node, List<Flow>> nodeFlows;
81     private boolean inContainerMode; // being used by global instance only
82
83     public FlowConsumerImpl() {
84         InstanceIdentifier<? extends DataObject> path = InstanceIdentifier.builder().node(Flows.class).toInstance();
85         flowService = FRMConsumerImpl.getProviderSession().getRpcService(SalFlowService.class);
86
87         if (null == flowService) {
88             logger.error("Consumer SAL Service is down or NULL. FRM may not function as intended");
89             System.out.println("Consumer SAL Service is down or NULL.");
90             return;
91         }
92
93         // listener = new FlowDataListener();
94
95         // if (null ==
96         // FRMConsumerImpl.getDataBrokerService().registerDataChangeListener(path,
97         // listener)) {
98         // logger.error("Failed to listen on flow data modifcation events");
99         // System.out.println("Consumer SAL Service is down or NULL.");
100         // return;
101         // }
102
103         // For switch events
104         listener1Reg = FRMConsumerImpl.getNotificationService().registerNotificationListener(flowEventListener);
105
106         if (null == listener1Reg) {
107             logger.error("Listener to listen on flow data modifcation events");
108             System.out.println("Consumer SAL Service is down or NULL.");
109             return;
110         }
111         // addFlowTest();
112         System.out.println("-------------------------------------------------------------------");
113         allocateCaches();
114         commitHandler = new FlowDataCommitHandler();
115         FRMConsumerImpl.getDataProviderService().registerCommitHandler(path, commitHandler);
116         clusterContainerService = (IClusterContainerServices) ServiceHelper.getGlobalInstance(
117                 IClusterContainerServices.class, this);
118         container = (IContainer) ServiceHelper.getGlobalInstance(IContainer.class, this);
119         /*
120          * If we are not the first cluster node to come up, do not initialize
121          * the static flow entries ordinal
122          */
123         if (staticFlowsOrdinal.size() == 0) {
124             staticFlowsOrdinal.put(0, Integer.valueOf(0));
125         }
126     }
127
128     private void allocateCaches() {
129
130         if (this.clusterContainerService == null) {
131             logger.warn("Un-initialized clusterContainerService, can't create cache");
132             return;
133         }
134
135         try {
136             clusterContainerService.createCache("frm.originalSwView",
137                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
138             clusterContainerService.createCache("frm.installedSwView",
139                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
140             clusterContainerService
141                     .createCache("frm.staticFlows", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
142             clusterContainerService.createCache("frm.staticFlowsOrdinal",
143                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
144             clusterContainerService.createCache("frm.inactiveFlows",
145                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
146             clusterContainerService.createCache("frm.nodeFlows", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
147             clusterContainerService.createCache("frm.groupFlows", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
148         } catch (CacheConfigException cce) {
149             logger.error("CacheConfigException");
150         } catch (CacheExistException cce) {
151             logger.error("CacheExistException");
152         }
153     }
154
155     private void addFlowTest() {
156         try {
157             NodeRef nodeOne = createNodeRef("foo:node:1");
158             AddFlowInputBuilder input1 = new AddFlowInputBuilder();
159
160             input1.setNode(nodeOne);
161             AddFlowInput firstMsg = input1.build();
162
163             if (null != flowService) {
164                 System.out.println(flowService.toString());
165             } else {
166                 System.out.println("ConsumerFlowService is NULL");
167             }
168             @SuppressWarnings("unused")
169             Future<RpcResult<AddFlowOutput>> result1 = flowService.addFlow(firstMsg);
170
171         } catch (Exception e) {
172             // TODO Auto-generated catch block
173             e.printStackTrace();
174         }
175     }
176
177     /**
178      * Adds flow to the southbound plugin and our internal database
179      *
180      * @param path
181      * @param dataObject
182      */
183     private void addFlow(InstanceIdentifier<?> path, Flow dataObject) {
184
185         AddFlowInputBuilder input = new AddFlowInputBuilder();
186         List<Instruction> inst = (dataObject).getInstructions().getInstruction();
187         input.setNode((dataObject).getNode());
188         input.setPriority((dataObject).getPriority());
189         input.setMatch((dataObject).getMatch());
190         input.setCookie((dataObject).getCookie());
191         input.setInstructions((dataObject).getInstructions());
192         dataObject.getMatch().getLayer3Match();
193         for (int i = 0; i < inst.size(); i++) {
194             System.out.println("i = " + i + inst.get(i).getInstruction().toString());
195             System.out.println("i = " + i + inst.get(i).toString());
196         }
197
198         System.out.println("Instruction list" + (dataObject).getInstructions().getInstruction().toString());
199
200         // updating the staticflow cache
201         Integer ordinal = staticFlowsOrdinal.get(0);
202         staticFlowsOrdinal.put(0, ++ordinal);
203         staticFlows.put(ordinal, (Flow) dataObject);
204
205         // We send flow to the sounthbound plugin
206         flowService.addFlow(input.build());
207         updateLocalDatabase((NodeFlow) dataObject, true);
208     }
209
210     /**
211      * Removes flow to the southbound plugin and our internal database
212      *
213      * @param path
214      * @param dataObject
215      */
216     private void removeFlow(InstanceIdentifier<?> path, Flow dataObject) {
217
218         RemoveFlowInputBuilder input = new RemoveFlowInputBuilder();
219         List<Instruction> inst = (dataObject).getInstructions().getInstruction();
220         input.setNode((dataObject).getNode());
221         input.setPriority((dataObject).getPriority());
222         input.setMatch((dataObject).getMatch());
223         input.setCookie((dataObject).getCookie());
224         input.setInstructions((dataObject).getInstructions());
225         dataObject.getMatch().getLayer3Match();
226         for (int i = 0; i < inst.size(); i++) {
227             System.out.println("i = " + i + inst.get(i).getInstruction().toString());
228             System.out.println("i = " + i + inst.get(i).toString());
229         }
230
231         System.out.println("Instruction list" + (dataObject).getInstructions().getInstruction().toString());
232
233         // updating the staticflow cache
234         Integer ordinal = staticFlowsOrdinal.get(0);
235         staticFlowsOrdinal.put(0, ++ordinal);
236         staticFlows.put(ordinal, dataObject);
237
238         // We send flow to the sounthbound plugin
239         flowService.removeFlow(input.build());
240         updateLocalDatabase((NodeFlow) dataObject, false);
241     }
242
243     @SuppressWarnings("unchecked")
244     private void commitToPlugin(internalTransaction transaction) {
245         for (Entry<InstanceIdentifier<?>, Flow> entry : transaction.additions.entrySet()) {
246             System.out.println("Coming add cc in FlowDatacommitHandler");
247             addFlow(entry.getKey(), entry.getValue());
248         }
249         for (@SuppressWarnings("unused")
250         Entry<InstanceIdentifier<?>, Flow> entry : transaction.updates.entrySet()) {
251             System.out.println("Coming update cc in FlowDatacommitHandler");
252             // updateFlow(entry.getKey(),entry.getValue());
253         }
254
255         for (Entry<InstanceIdentifier<?>, Flow> entry : transaction.removals.entrySet()) {
256             System.out.println("Coming remove cc in FlowDatacommitHandler");
257             removeFlow(entry.getKey(), entry.getValue());
258         }
259
260     }
261
262     private final class FlowDataCommitHandler implements DataCommitHandler<InstanceIdentifier<?>, DataObject> {
263
264         @SuppressWarnings("unchecked")
265         @Override
266         public DataCommitTransaction requestCommit(DataModification<InstanceIdentifier<?>, DataObject> modification) {
267             // We should verify transaction
268             System.out.println("Coming in FlowDatacommitHandler");
269             internalTransaction transaction = new internalTransaction(modification);
270             transaction.prepareUpdate();
271             return transaction;
272         }
273     }
274
275     private final class internalTransaction implements DataCommitTransaction<InstanceIdentifier<?>, DataObject> {
276
277         private final DataModification<InstanceIdentifier<?>, DataObject> modification;
278
279         @Override
280         public DataModification<InstanceIdentifier<?>, DataObject> getModification() {
281             return modification;
282         }
283
284         public internalTransaction(DataModification<InstanceIdentifier<?>, DataObject> modification) {
285             this.modification = modification;
286         }
287
288         Map<InstanceIdentifier<?>, Flow> additions = new HashMap<>();
289         Map<InstanceIdentifier<?>, Flow> updates = new HashMap<>();
290         Map<InstanceIdentifier<?>, Flow> removals = new HashMap<>();
291
292         /**
293          * We create a plan which flows will be added, which will be updated and
294          * which will be removed based on our internal state.
295          *
296          */
297         void prepareUpdate() {
298
299             Set<Entry<InstanceIdentifier<?>, DataObject>> puts = modification.getUpdatedConfigurationData().entrySet();
300             for (Entry<InstanceIdentifier<?>, DataObject> entry : puts) {
301
302                 // validating the DataObject
303
304                 Status status = validate(container, (NodeFlow) entry);
305                 if (!status.isSuccess()) {
306                     logger.warn("Invalid Configuration for flow {}. The failure is {}", entry, status.getDescription());
307                     String error = "Invalid Configuration (" + status.getDescription() + ")";
308                     logger.error(error);
309                     return;
310                 }
311                 // Presence check
312                 if (flowEntryExists((NodeFlow) entry)) {
313                     String error = "Entry with this name on specified table already exists";
314                     logger.warn("Entry with this name on specified table already exists: {}", entry);
315                     logger.error(error);
316                     return;
317                 }
318                 if (originalSwView.containsKey((FlowKey) entry)) {
319                     logger.warn("Operation Rejected: A flow with same match and priority exists on the target node");
320                     logger.trace("Aborting to install {}", entry);
321                     continue;
322                 }
323                 if (!FRMUtil.validateMatch((NodeFlow) entry)) {
324                     logger.error("Not a valid Match");
325                     return;
326                 }
327                 if (!FRMUtil.validateInstructions((NodeFlow) entry)) {
328                     logger.error("Not a valid Instruction");
329                     return;
330                 }
331                 if (entry.getValue() instanceof Flow) {
332                     Flow flow = (Flow) entry.getValue();
333                     preparePutEntry(entry.getKey(), flow);
334                 }
335
336             }
337
338             // removals = modification.getRemovedConfigurationData();
339             Set<InstanceIdentifier<?>> removedData = modification.getRemovedConfigurationData();
340             for (InstanceIdentifier<?> removal : removedData) {
341                 DataObject value = modification.getOriginalConfigurationData().get(removal);
342                 if (value instanceof Flow) {
343                     removals.put(removal, (Flow) value);
344                 }
345             }
346
347         }
348
349         private void preparePutEntry(InstanceIdentifier<?> key, Flow flow) {
350             Flow original = originalSwView.get(key);
351             if (original != null) {
352                 // It is update for us
353                 System.out.println("Coming update  in FlowDatacommitHandler");
354                 updates.put(key, flow);
355             } else {
356                 // It is addition for us
357                 System.out.println("Coming add in FlowDatacommitHandler");
358                 additions.put(key, flow);
359             }
360         }
361
362         /**
363          * We are OK to go with execution of plan
364          *
365          */
366         @Override
367         public RpcResult<Void> finish() throws IllegalStateException {
368
369             commitToPlugin(this);
370             // We return true if internal transaction is successful.
371             // return Rpcs.getRpcResult(true, null, Collections.emptySet());
372             return Rpcs.getRpcResult(true, null, null);
373         }
374
375         /**
376          *
377          * We should rollback our preparation
378          *
379          */
380         @Override
381         public RpcResult<Void> rollback() throws IllegalStateException {
382             // NOOP - we did not modified any internal state during
383             // requestCommit phase
384             // return Rpcs.getRpcResult(true, null, Collections.emptySet());
385             return Rpcs.getRpcResult(true, null, null);
386
387         }
388
389         public Status validate(IContainer container, NodeFlow dataObject) {
390
391             // container validation
392             Switch sw = null;
393             Node node = null;
394             String containerName = (container == null) ? GlobalConstants.DEFAULT.toString() : container.getName();
395             ISwitchManager switchManager = (ISwitchManager) ServiceHelper.getInstance(ISwitchManager.class,
396                     containerName, this);
397             // flow Name validation
398             if (dataObject.getFlowName() == null || dataObject.getFlowName().trim().isEmpty()
399                     || !dataObject.getFlowName().matches(NAMEREGEX)) {
400                 return new Status(StatusCode.BADREQUEST, "Invalid Flow name");
401             }
402             // Node Validation
403             if (dataObject.getNode() == null) {
404                 return new Status(StatusCode.BADREQUEST, "Node is null");
405             }
406
407             if (switchManager != null) {
408                 for (Switch device : switchManager.getNetworkDevices()) {
409                     node = (Node) device.getNode();
410                     if (device.getNode().equals(dataObject.getNode())) {
411                         sw = device;
412                         break;
413                     }
414                 }
415                 if (sw == null) {
416                     return new Status(StatusCode.BADREQUEST, String.format("Node %s not found", node));
417                 }
418             } else {
419                 logger.debug("switchmanager is not set yet");
420             }
421
422             if (dataObject.getPriority() != null) {
423                 if (dataObject.getPriority() < 0 || dataObject.getPriority() > 65535) {
424                     return new Status(StatusCode.BADREQUEST, String.format("priority %s is not in the range 0 - 65535",
425                             dataObject.getPriority()));
426                 }
427             }
428
429             return new Status(StatusCode.SUCCESS);
430         }
431
432         private boolean flowEntryExists(NodeFlow config) {
433             // Flow name has to be unique on per table id basis
434             for (ConcurrentMap.Entry<FlowKey, Flow> entry : originalSwView.entrySet()) {
435                 if (entry.getValue().getFlowName().equals(config.getFlowName())
436                         && entry.getValue().getTableId().equals(config.getTableId())) {
437                     return true;
438                 }
439             }
440             return false;
441         }
442     }
443
444     final class FlowEventListener implements SalFlowListener {
445
446         List<FlowAdded> addedFlows = new ArrayList<>();
447         List<FlowRemoved> removedFlows = new ArrayList<>();
448         List<FlowUpdated> updatedFlows = new ArrayList<>();
449
450         @Override
451         public void onFlowAdded(FlowAdded notification) {
452             System.out.println("added flow..........................");
453             addedFlows.add(notification);
454         }
455
456         @Override
457         public void onFlowRemoved(FlowRemoved notification) {
458             removedFlows.add(notification);
459         };
460
461         @Override
462         public void onFlowUpdated(FlowUpdated notification) {
463             updatedFlows.add(notification);
464         }
465
466     }
467
468     // Commented out DataChangeListene - to be used by Stats
469
470     // final class FlowDataListener implements DataChangeListener {
471     // private SalFlowService flowService;
472     //
473     // public FlowDataListener() {
474     //
475     // }
476     //
477     // @Override
478     // public void onDataChanged(
479     // DataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
480     // System.out.println("Coming in onDataChange..............");
481     // @SuppressWarnings("unchecked")
482     // Collection<DataObject> additions = (Collection<DataObject>)
483     // change.getCreatedConfigurationData();
484     // // we can check for getCreated, getDeleted or getUpdated from DataChange
485     // Event class
486     // for (DataObject dataObject : additions) {
487     // if (dataObject instanceof NodeFlow) {
488     // NodeRef nodeOne = createNodeRef("foo:node:1");
489     // // validating the dataObject here
490     // AddFlowInputBuilder input = new AddFlowInputBuilder();
491     // input.setNode(((NodeFlow) dataObject).getNode());
492     // input.setNode(nodeOne);
493     // // input.setPriority(((NodeFlow) dataObject).getPriority());
494     // //input.setMatch(((NodeFlow) dataObject).getMatch());
495     // //input.setFlowTable(((NodeFlow) dataObject).getFlowTable());
496     // //input.setCookie(((NodeFlow) dataObject).getCookie());
497     // //input.setAction(((NodeFlow) dataObject).getAction());
498     //
499     // @SuppressWarnings("unused")
500     // Future<RpcResult<java.lang.Void>> result =
501     // flowService.addFlow(input.build());
502     // }
503     // }
504     // }
505     // }
506
507     private static void updateLocalDatabase(NodeFlow entry, boolean add) {
508
509         updateSwViewes(entry, add);
510
511         updateNodeFlowsDB(entry, add);
512
513     }
514
515     /*
516      * Update the node mapped flows database
517      */
518     private static void updateSwViewes(NodeFlow entry, boolean add) {
519         if (add) {
520             FlowConsumerImpl.originalSwView.put((FlowKey) entry, (Flow) entry);
521             installedSwView.put((FlowKey) entry, (Flow) entry);
522         } else {
523             originalSwView.remove((Flow) entry);
524             installedSwView.remove((FlowKey) entry);
525
526         }
527     }
528
529     /*
530      * Update the node mapped flows database
531      */
532     private static void updateNodeFlowsDB(NodeFlow entry, boolean add) {
533         Node node = (Node) entry.getNode();
534
535         List<Flow> nodeIndeces = nodeFlows.get(node);
536         if (nodeIndeces == null) {
537             if (!add) {
538                 return;
539             } else {
540                 nodeIndeces = new ArrayList<Flow>();
541             }
542         }
543
544         if (add) {
545             nodeIndeces.add((Flow) entry);
546         } else {
547             nodeIndeces.remove((Flow) entry);
548         }
549
550         // Update cache across cluster
551         if (nodeIndeces.isEmpty()) {
552             nodeFlows.remove(node);
553         } else {
554             nodeFlows.put(node, nodeIndeces);
555         }
556     }
557
558     private static NodeRef createNodeRef(String string) {
559         NodeKey key = new NodeKey(new NodeId(string));
560         InstanceIdentifier<Node> path = InstanceIdentifier.builder().node(Nodes.class).node(Node.class, key)
561                 .toInstance();
562
563         return new NodeRef(path);
564     }
565 }