Merge "TableFeatures Update,correcting InstanceIdentifier"
[controller.git] / opendaylight / md-sal / forwardingrules-manager / src / main / java / org / opendaylight / controller / forwardingrulesmanager / consumer / impl / FlowConsumerImpl.java
1 package org.opendaylight.controller.forwardingrulesmanager.consumer.impl;
2
3 import java.util.ArrayList;
4 import java.util.EnumSet;
5 import java.util.HashMap;
6 import java.util.List;
7 import java.util.Map;
8 import java.util.Map.Entry;
9 import java.util.Set;
10 import java.util.concurrent.ConcurrentHashMap;
11 import java.util.concurrent.ConcurrentMap;
12 import java.util.concurrent.Future;
13
14 import org.opendaylight.controller.clustering.services.CacheConfigException;
15 import org.opendaylight.controller.clustering.services.CacheExistException;
16 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
17 import org.opendaylight.controller.clustering.services.IClusterServices;
18 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
19 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction;
20 import org.opendaylight.controller.md.sal.common.api.data.DataModification;
21 import org.opendaylight.controller.sal.common.util.Rpcs;
22 import org.opendaylight.controller.sal.core.IContainer;
23 import org.opendaylight.controller.sal.utils.GlobalConstants;
24 import org.opendaylight.controller.sal.utils.ServiceHelper;
25 import org.opendaylight.controller.sal.utils.Status;
26 import org.opendaylight.controller.sal.utils.StatusCode;
27 import org.opendaylight.controller.switchmanager.ISwitchManager;
28 import org.opendaylight.controller.switchmanager.Switch;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.config.rev130819.Flows;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.config.rev130819.flows.Flow;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.config.rev130819.flows.FlowKey;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowAdded;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowUpdated;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeErrorNotification;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeExperimenterErrorNotification;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeFlow;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInputBuilder;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowListener;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SwitchFlowRemoved;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowInputBuilder;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.flow.update.UpdatedFlowBuilder;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.instruction.list.Instruction;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
53 import org.opendaylight.yangtools.concepts.Registration;
54 import org.opendaylight.yangtools.yang.binding.DataObject;
55 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
56 import org.opendaylight.yangtools.yang.binding.NotificationListener;
57 import org.opendaylight.yangtools.yang.common.RpcResult;
58 import org.slf4j.Logger;
59 import org.slf4j.LoggerFactory;
60
61 public class FlowConsumerImpl implements IForwardingRulesManager {
62     protected static final Logger logger = LoggerFactory.getLogger(FlowConsumerImpl.class);
63     private final FlowEventListener flowEventListener = new FlowEventListener();
64     private Registration<NotificationListener> listener1Reg;
65     private SalFlowService flowService;
66     // private FlowDataListener listener;
67     private FlowDataCommitHandler commitHandler;
68     private static ConcurrentHashMap<FlowKey, Flow> originalSwView;
69     private static ConcurrentMap<FlowKey, Flow> installedSwView;
70     private IClusterContainerServices clusterContainerService = null;
71     private IContainer container;
72     private static final String NAMEREGEX = "^[a-zA-Z0-9]+$";
73     private static ConcurrentMap<Integer, Flow> staticFlows;
74     private static ConcurrentMap<Integer, Integer> staticFlowsOrdinal = new ConcurrentHashMap<Integer, Integer>();
75     /*
76      * Inactive flow list. This is for the global instance of FRM It will
77      * contain all the flow entries which were installed on the global container
78      * when the first container is created.
79      */
80     private static ConcurrentMap<FlowKey, Flow> inactiveFlows;
81
82     /*
83      * /* Per node indexing
84      */
85     private static ConcurrentMap<Node, List<Flow>> nodeFlows;
86     private boolean inContainerMode; // being used by global instance only
87
88     public FlowConsumerImpl() {
89         InstanceIdentifier<? extends DataObject> path = InstanceIdentifier.builder(Flows.class).child(Flow.class)
90                 .toInstance();
91         flowService = FRMConsumerImpl.getProviderSession().getRpcService(SalFlowService.class);
92
93         if (null == flowService) {
94             logger.error("Consumer SAL Service is down or NULL. FRM may not function as intended");
95             System.out.println("Consumer SAL Service is down or NULL.");
96             return;
97         }
98
99         // listener = new FlowDataListener();
100
101         // if (null ==
102         // FRMConsumerImpl.getDataBrokerService().registerDataChangeListener(path,
103         // listener)) {
104         // logger.error("Failed to listen on flow data modifcation events");
105         // System.out.println("Consumer SAL Service is down or NULL.");
106         // return;
107         // }
108
109         // For switch events
110         listener1Reg = FRMConsumerImpl.getNotificationService().registerNotificationListener(flowEventListener);
111
112         if (null == listener1Reg) {
113             logger.error("Listener to listen on flow data modifcation events");
114             System.out.println("Consumer SAL Service is down or NULL.");
115             return;
116         }
117         // addFlowTest();
118         System.out.println("-------------------------------------------------------------------");
119         allocateCaches();
120         commitHandler = new FlowDataCommitHandler();
121         FRMConsumerImpl.getDataProviderService().registerCommitHandler(path, commitHandler);
122         clusterContainerService = (IClusterContainerServices) ServiceHelper.getGlobalInstance(
123                 IClusterContainerServices.class, this);
124         container = (IContainer) ServiceHelper.getGlobalInstance(IContainer.class, this);
125         /*
126          * If we are not the first cluster node to come up, do not initialize
127          * the static flow entries ordinal
128          */
129         if (staticFlowsOrdinal.size() == 0) {
130             staticFlowsOrdinal.put(0, Integer.valueOf(0));
131         }
132     }
133
134     private void allocateCaches() {
135
136         if (this.clusterContainerService == null) {
137             logger.warn("Un-initialized clusterContainerService, can't create cache");
138             return;
139         }
140
141         try {
142             clusterContainerService.createCache("frm.originalSwView",
143                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
144             clusterContainerService.createCache("frm.installedSwView",
145                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
146             clusterContainerService
147                     .createCache("frm.staticFlows", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
148             clusterContainerService.createCache("frm.staticFlowsOrdinal",
149                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
150             clusterContainerService.createCache("frm.inactiveFlows",
151                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
152             clusterContainerService.createCache("frm.nodeFlows", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
153             clusterContainerService.createCache("frm.groupFlows", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
154         } catch (CacheConfigException cce) {
155             logger.error("CacheConfigException");
156         } catch (CacheExistException cce) {
157             logger.error("CacheExistException");
158         }
159     }
160
161     private void addFlowTest() {
162         try {
163             NodeRef nodeOne = createNodeRef("foo:node:1");
164             AddFlowInputBuilder input1 = new AddFlowInputBuilder();
165
166             input1.setNode(nodeOne);
167             AddFlowInput firstMsg = input1.build();
168
169             if (null != flowService) {
170                 System.out.println(flowService.toString());
171             } else {
172                 System.out.println("ConsumerFlowService is NULL");
173             }
174             @SuppressWarnings("unused")
175             Future<RpcResult<AddFlowOutput>> result1 = flowService.addFlow(firstMsg);
176
177         } catch (Exception e) {
178             // TODO Auto-generated catch block
179             e.printStackTrace();
180         }
181     }
182
183     /**
184      * Adds flow to the southbound plugin and our internal database
185      *
186      * @param path
187      * @param dataObject
188      */
189     private void addFlow(InstanceIdentifier<?> path, Flow dataObject) {
190
191         AddFlowInputBuilder input = new AddFlowInputBuilder();
192         List<Instruction> inst = (dataObject).getInstructions().getInstruction();
193         input.setNode((dataObject).getNode());
194         input.setPriority((dataObject).getPriority());
195         input.setMatch((dataObject).getMatch());
196         input.setCookie((dataObject).getCookie());
197         input.setInstructions((dataObject).getInstructions());
198         dataObject.getMatch().getLayer3Match();
199         for (int i = 0; i < inst.size(); i++) {
200             System.out.println("i = " + i + inst.get(i).getInstruction().toString());
201             System.out.println("i = " + i + inst.get(i).toString());
202         }
203
204         System.out.println("Instruction list" + (dataObject).getInstructions().getInstruction().toString());
205
206         // updating the staticflow cache
207         Integer ordinal = staticFlowsOrdinal.get(0);
208         staticFlowsOrdinal.put(0, ++ordinal);
209         staticFlows.put(ordinal, dataObject);
210
211         // We send flow to the sounthbound plugin
212         flowService.addFlow(input.build());
213         updateLocalDatabase((NodeFlow) dataObject, true);
214     }
215
216     /**
217      * Removes flow to the southbound plugin and our internal database
218      *
219      * @param path
220      * @param dataObject
221      */
222     private void removeFlow(InstanceIdentifier<?> path, Flow dataObject) {
223
224         RemoveFlowInputBuilder input = new RemoveFlowInputBuilder();
225         List<Instruction> inst = (dataObject).getInstructions().getInstruction();
226         input.setNode((dataObject).getNode());
227         input.setPriority((dataObject).getPriority());
228         input.setMatch((dataObject).getMatch());
229         input.setCookie((dataObject).getCookie());
230         input.setInstructions((dataObject).getInstructions());
231         dataObject.getMatch().getLayer3Match();
232         for (int i = 0; i < inst.size(); i++) {
233             System.out.println("i = " + i + inst.get(i).getInstruction().toString());
234             System.out.println("i = " + i + inst.get(i).toString());
235         }
236
237         System.out.println("Instruction list" + (dataObject).getInstructions().getInstruction().toString());
238
239         // updating the staticflow cache
240         Integer ordinal = staticFlowsOrdinal.get(0);
241         staticFlowsOrdinal.put(0, ++ordinal);
242         staticFlows.put(ordinal, dataObject);
243
244         // We send flow to the sounthbound plugin
245         flowService.removeFlow(input.build());
246         updateLocalDatabase((NodeFlow) dataObject, false);
247     }
248
249     /**
250      * Update flow to the southbound plugin and our internal database
251      *
252      * @param path
253      * @param dataObject
254      */
255     private void updateFlow(InstanceIdentifier<?> path, Flow dataObject) {
256
257         UpdateFlowInputBuilder input = new UpdateFlowInputBuilder();
258         UpdatedFlowBuilder updatedflowbuilder = new UpdatedFlowBuilder();
259         updatedflowbuilder.fieldsFrom(dataObject);
260         input.setUpdatedFlow(updatedflowbuilder.build());
261
262         // updating the staticflow cache
263         Integer ordinal = staticFlowsOrdinal.get(0);
264         staticFlowsOrdinal.put(0, ++ordinal);
265         staticFlows.put(ordinal, dataObject);
266
267         // We send flow to the sounthbound plugin
268         flowService.updateFlow(input.build());
269         updateLocalDatabase((NodeFlow) dataObject, true);
270     }
271
272     @SuppressWarnings("unchecked")
273     private void commitToPlugin(internalTransaction transaction) {
274         for (Entry<InstanceIdentifier<?>, Flow> entry : transaction.additions.entrySet()) {
275             System.out.println("Coming add cc in FlowDatacommitHandler");
276             addFlow(entry.getKey(), entry.getValue());
277         }
278         for (@SuppressWarnings("unused")
279         Entry<InstanceIdentifier<?>, Flow> entry : transaction.updates.entrySet()) {
280             System.out.println("Coming update cc in FlowDatacommitHandler");
281             updateFlow(entry.getKey(), entry.getValue());
282         }
283
284         for (Entry<InstanceIdentifier<?>, Flow> entry : transaction.removals.entrySet()) {
285             System.out.println("Coming remove cc in FlowDatacommitHandler");
286             removeFlow(entry.getKey(), entry.getValue());
287         }
288
289     }
290
291     private final class FlowDataCommitHandler implements DataCommitHandler<InstanceIdentifier<?>, DataObject> {
292
293         @SuppressWarnings("unchecked")
294         @Override
295         public DataCommitTransaction requestCommit(DataModification<InstanceIdentifier<?>, DataObject> modification) {
296             // We should verify transaction
297             System.out.println("Coming in FlowDatacommitHandler");
298             internalTransaction transaction = new internalTransaction(modification);
299             transaction.prepareUpdate();
300             return transaction;
301         }
302     }
303
304     private final class internalTransaction implements DataCommitTransaction<InstanceIdentifier<?>, DataObject> {
305
306         private final DataModification<InstanceIdentifier<?>, DataObject> modification;
307
308         @Override
309         public DataModification<InstanceIdentifier<?>, DataObject> getModification() {
310             return modification;
311         }
312
313         public internalTransaction(DataModification<InstanceIdentifier<?>, DataObject> modification) {
314             this.modification = modification;
315         }
316
317         Map<InstanceIdentifier<?>, Flow> additions = new HashMap<>();
318         Map<InstanceIdentifier<?>, Flow> updates = new HashMap<>();
319         Map<InstanceIdentifier<?>, Flow> removals = new HashMap<>();
320
321         /**
322          * We create a plan which flows will be added, which will be updated and
323          * which will be removed based on our internal state.
324          *
325          */
326         void prepareUpdate() {
327
328             Set<Entry<InstanceIdentifier<?>, DataObject>> puts = modification.getUpdatedConfigurationData().entrySet();
329             for (Entry<InstanceIdentifier<?>, DataObject> entry : puts) {
330
331                 // validating the DataObject
332
333                 Status status = validate(container, (NodeFlow) entry);
334                 if (!status.isSuccess()) {
335                     logger.warn("Invalid Configuration for flow {}. The failure is {}", entry, status.getDescription());
336                     String error = "Invalid Configuration (" + status.getDescription() + ")";
337                     logger.error(error);
338                     return;
339                 }
340                 // Presence check
341                 if (flowEntryExists((NodeFlow) entry)) {
342                     String error = "Entry with this name on specified table already exists";
343                     logger.warn("Entry with this name on specified table already exists: {}", entry);
344                     logger.error(error);
345                     return;
346                 }
347                 if (originalSwView.containsKey(entry)) {
348                     logger.warn("Operation Rejected: A flow with same match and priority exists on the target node");
349                     logger.trace("Aborting to install {}", entry);
350                     continue;
351                 }
352                 if (!FRMUtil.validateMatch((NodeFlow) entry)) {
353                     logger.error("Not a valid Match");
354                     return;
355                 }
356                 if (!FRMUtil.validateInstructions((NodeFlow) entry)) {
357                     logger.error("Not a valid Instruction");
358                     return;
359                 }
360                 if (entry.getValue() instanceof Flow) {
361                     Flow flow = (Flow) entry.getValue();
362                     preparePutEntry(entry.getKey(), flow);
363                 }
364
365             }
366
367             // removals = modification.getRemovedConfigurationData();
368             Set<InstanceIdentifier<?>> removedData = modification.getRemovedConfigurationData();
369             for (InstanceIdentifier<?> removal : removedData) {
370                 DataObject value = modification.getOriginalConfigurationData().get(removal);
371                 if (value instanceof Flow) {
372                     removals.put(removal, (Flow) value);
373                 }
374             }
375
376         }
377
378         private void preparePutEntry(InstanceIdentifier<?> key, Flow flow) {
379             Flow original = originalSwView.get(key);
380             if (original != null) {
381                 // It is update for us
382                 System.out.println("Coming update  in FlowDatacommitHandler");
383                 updates.put(key, flow);
384             } else {
385                 // It is addition for us
386                 System.out.println("Coming add in FlowDatacommitHandler");
387                 additions.put(key, flow);
388             }
389         }
390
391         /**
392          * We are OK to go with execution of plan
393          *
394          */
395         @Override
396         public RpcResult<Void> finish() throws IllegalStateException {
397
398             commitToPlugin(this);
399             // We return true if internal transaction is successful.
400             // return Rpcs.getRpcResult(true, null, Collections.emptySet());
401             return Rpcs.getRpcResult(true, null, null);
402         }
403
404         /**
405          *
406          * We should rollback our preparation
407          *
408          */
409         @Override
410         public RpcResult<Void> rollback() throws IllegalStateException {
411             // NOOP - we did not modified any internal state during
412             // requestCommit phase
413             // return Rpcs.getRpcResult(true, null, Collections.emptySet());
414             return Rpcs.getRpcResult(true, null, null);
415
416         }
417
418         public Status validate(IContainer container, NodeFlow dataObject) {
419
420             // container validation
421             Switch sw = null;
422             Node node = null;
423             String containerName = (container == null) ? GlobalConstants.DEFAULT.toString() : container.getName();
424             ISwitchManager switchManager = (ISwitchManager) ServiceHelper.getInstance(ISwitchManager.class,
425                     containerName, this);
426             // flow Name validation
427             if (dataObject.getFlowName() == null || dataObject.getFlowName().trim().isEmpty()
428                     || !dataObject.getFlowName().matches(NAMEREGEX)) {
429                 return new Status(StatusCode.BADREQUEST, "Invalid Flow name");
430             }
431             // Node Validation
432             if (dataObject.getNode() == null) {
433                 return new Status(StatusCode.BADREQUEST, "Node is null");
434             }
435
436             if (switchManager != null) {
437                 for (Switch device : switchManager.getNetworkDevices()) {
438                     node = (Node) device.getNode();
439                     if (device.getNode().equals(dataObject.getNode())) {
440                         sw = device;
441                         break;
442                     }
443                 }
444                 if (sw == null) {
445                     return new Status(StatusCode.BADREQUEST, String.format("Node %s not found", node));
446                 }
447             } else {
448                 logger.debug("switchmanager is not set yet");
449             }
450
451             if (dataObject.getPriority() != null) {
452                 if (dataObject.getPriority() < 0 || dataObject.getPriority() > 65535) {
453                     return new Status(StatusCode.BADREQUEST, String.format("priority %s is not in the range 0 - 65535",
454                             dataObject.getPriority()));
455                 }
456             }
457
458             return new Status(StatusCode.SUCCESS);
459         }
460
461         private boolean flowEntryExists(NodeFlow config) {
462             // Flow name has to be unique on per table id basis
463             for (ConcurrentMap.Entry<FlowKey, Flow> entry : originalSwView.entrySet()) {
464                 if (entry.getValue().getFlowName().equals(config.getFlowName())
465                         && entry.getValue().getTableId().equals(config.getTableId())) {
466                     return true;
467                 }
468             }
469             return false;
470         }
471     }
472
473     final class FlowEventListener implements SalFlowListener {
474
475         List<FlowAdded> addedFlows = new ArrayList<>();
476         List<FlowRemoved> removedFlows = new ArrayList<>();
477         List<FlowUpdated> updatedFlows = new ArrayList<>();
478
479         @Override
480         public void onFlowAdded(FlowAdded notification) {
481             System.out.println("added flow..........................");
482             addedFlows.add(notification);
483         }
484
485         @Override
486         public void onFlowRemoved(FlowRemoved notification) {
487             removedFlows.add(notification);
488         };
489
490         @Override
491         public void onFlowUpdated(FlowUpdated notification) {
492             updatedFlows.add(notification);
493         }
494
495         @Override
496         public void onSwitchFlowRemoved(SwitchFlowRemoved notification) {
497             // TODO
498         }
499
500         @Override
501         public void onNodeErrorNotification(NodeErrorNotification notification) {
502             // TODO Auto-generated method stub
503
504         }
505
506         @Override
507         public void onNodeExperimenterErrorNotification(NodeExperimenterErrorNotification notification) {
508             // TODO Auto-generated method stub
509
510         };
511
512     }
513
514     // Commented out DataChangeListene - to be used by Stats
515
516     // final class FlowDataListener implements DataChangeListener {
517     // private SalFlowService flowService;
518     //
519     // public FlowDataListener() {
520     //
521     // }
522     //
523     // @Override
524     // public void onDataChanged(
525     // DataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
526     // System.out.println("Coming in onDataChange..............");
527     // @SuppressWarnings("unchecked")
528     // Collection<DataObject> additions = (Collection<DataObject>)
529     // change.getCreatedConfigurationData();
530     // // we can check for getCreated, getDeleted or getUpdated from DataChange
531     // Event class
532     // for (DataObject dataObject : additions) {
533     // if (dataObject instanceof NodeFlow) {
534     // NodeRef nodeOne = createNodeRef("foo:node:1");
535     // // validating the dataObject here
536     // AddFlowInputBuilder input = new AddFlowInputBuilder();
537     // input.setNode(((NodeFlow) dataObject).getNode());
538     // input.setNode(nodeOne);
539     // // input.setPriority(((NodeFlow) dataObject).getPriority());
540     // //input.setMatch(((NodeFlow) dataObject).getMatch());
541     // //input.setFlowTable(((NodeFlow) dataObject).getFlowTable());
542     // //input.setCookie(((NodeFlow) dataObject).getCookie());
543     // //input.setAction(((NodeFlow) dataObject).getAction());
544     //
545     // @SuppressWarnings("unused")
546     // Future<RpcResult<java.lang.Void>> result =
547     // flowService.addFlow(input.build());
548     // }
549     // }
550     // }
551     // }
552
553     private static void updateLocalDatabase(NodeFlow entry, boolean add) {
554
555         updateSwViewes(entry, add);
556
557         updateNodeFlowsDB(entry, add);
558
559     }
560
561     /*
562      * Update the node mapped flows database
563      */
564     private static void updateSwViewes(NodeFlow entry, boolean add) {
565         if (add) {
566             FlowConsumerImpl.originalSwView.put((FlowKey) entry, (Flow) entry);
567             installedSwView.put((FlowKey) entry, (Flow) entry);
568         } else {
569             originalSwView.remove(entry);
570             installedSwView.remove(entry);
571
572         }
573     }
574
575     @Override
576     public List<DataObject> get() {
577
578         List<DataObject> orderedList = new ArrayList<DataObject>();
579         ConcurrentMap<Integer, Flow> flowMap = staticFlows;
580         int maxKey = staticFlowsOrdinal.get(0).intValue();
581         for (int i = 0; i <= maxKey; i++) {
582             Flow entry = flowMap.get(i);
583             if (entry != null) {
584                 orderedList.add(entry);
585             }
586         }
587         return orderedList;
588     }
589
590     @Override
591     public DataObject getWithName(String name, org.opendaylight.controller.sal.core.Node n) {
592         if (this instanceof FlowConsumerImpl) {
593             for (ConcurrentMap.Entry<Integer, Flow> flowEntry : staticFlows.entrySet()) {
594                 Flow flow = flowEntry.getValue();
595                 if (flow.getNode().equals(n) && flow.getFlowName().equals(name)) {
596
597                     return flowEntry.getValue();
598                 }
599             }
600         }
601         return null;
602     }
603
604     /*
605      * Update the node mapped flows database
606      */
607     private static void updateNodeFlowsDB(NodeFlow entry, boolean add) {
608         Node node = (Node) entry.getNode();
609
610         List<Flow> nodeIndeces = nodeFlows.get(node);
611         if (nodeIndeces == null) {
612             if (!add) {
613                 return;
614             } else {
615                 nodeIndeces = new ArrayList<Flow>();
616             }
617         }
618
619         if (add) {
620             nodeIndeces.add((Flow) entry);
621         } else {
622             nodeIndeces.remove(entry);
623         }
624
625         // Update cache across cluster
626         if (nodeIndeces.isEmpty()) {
627             nodeFlows.remove(node);
628         } else {
629             nodeFlows.put(node, nodeIndeces);
630         }
631     }
632
633     private static NodeRef createNodeRef(String string) {
634         NodeKey key = new NodeKey(new NodeId(string));
635         InstanceIdentifier<Node> path = InstanceIdentifier.builder().node(Nodes.class).node(Node.class, key)
636                 .toInstance();
637
638         return new NodeRef(path);
639     }
640 }