modified meter and group enum from leaf to typedef.
[controller.git] / opendaylight / md-sal / forwardingrules-manager / src / main / java / org / opendaylight / controller / forwardingrulesmanager / consumer / impl / FlowConsumerImpl.java
1 package org.opendaylight.controller.forwardingrulesmanager.consumer.impl;
2
3 import java.util.ArrayList;
4 import java.util.Collections;
5 import java.util.EnumSet;
6 import java.util.HashMap;
7 import java.util.HashSet;
8 import java.util.List;
9 import java.util.Map;
10 import java.util.Map.Entry;
11 import java.util.Set;
12 import java.util.concurrent.ConcurrentHashMap;
13 import java.util.concurrent.ConcurrentMap;
14 import java.util.concurrent.Future;
15
16 import org.opendaylight.controller.clustering.services.CacheConfigException;
17 import org.opendaylight.controller.clustering.services.CacheExistException;
18 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
19 import org.opendaylight.controller.clustering.services.IClusterServices;
20 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
21 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction;
22 import org.opendaylight.controller.md.sal.common.api.data.DataModification;
23 import org.opendaylight.controller.sal.common.util.Rpcs;
24 import org.opendaylight.controller.sal.core.IContainer;
25 import org.opendaylight.controller.sal.utils.ServiceHelper;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.config.rev130819.Flows;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.config.rev130819.flows.Flow;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.config.rev130819.flows.FlowKey;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowAdded;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowUpdated;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeErrorNotification;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeExperimenterErrorNotification;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeFlow;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInputBuilder;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowListener;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SwitchFlowRemoved;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowInputBuilder;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.flow.update.UpdatedFlowBuilder;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.instruction.list.Instruction;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
50 import org.opendaylight.yangtools.concepts.Registration;
51 import org.opendaylight.yangtools.yang.binding.DataObject;
52 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
53 import org.opendaylight.yangtools.yang.binding.NotificationListener;
54 import org.opendaylight.yangtools.yang.common.RpcError;
55 import org.opendaylight.yangtools.yang.common.RpcResult;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
58
59 public class FlowConsumerImpl implements IForwardingRulesManager {
60     protected static final Logger logger = LoggerFactory.getLogger(FlowConsumerImpl.class);
61     private final FlowEventListener flowEventListener = new FlowEventListener();
62     private Registration<NotificationListener> listener1Reg;
63     private SalFlowService flowService;
64     // private FlowDataListener listener;
65     private FlowDataCommitHandler commitHandler;
66     private static ConcurrentHashMap<FlowKey, Flow> originalSwView;
67     private static ConcurrentMap<FlowKey, Flow> installedSwView;
68     private IClusterContainerServices clusterContainerService = null;
69     private IContainer container;
70     private static final String NAMEREGEX = "^[a-zA-Z0-9]+$";
71     private static ConcurrentMap<Integer, Flow> staticFlows;
72     private static ConcurrentMap<Integer, Integer> staticFlowsOrdinal = new ConcurrentHashMap<Integer, Integer>();
73     /*
74      * Inactive flow list. This is for the global instance of FRM It will
75      * contain all the flow entries which were installed on the global container
76      * when the first container is created.
77      */
78     private static ConcurrentMap<FlowKey, Flow> inactiveFlows;
79
80     /*
81      * /* Per node indexing
82      */
83     private static ConcurrentMap<Node, List<Flow>> nodeFlows;
84     private boolean inContainerMode; // being used by global instance only
85
86     public FlowConsumerImpl() {
87         InstanceIdentifier<? extends DataObject> path = InstanceIdentifier.builder(Flows.class).toInstance();
88         flowService = FRMConsumerImpl.getProviderSession().getRpcService(SalFlowService.class);
89
90         if (null == flowService) {
91             logger.error("Consumer SAL Service is down or NULL. FRM may not function as intended");
92             System.out.println("Consumer SAL Service is down or NULL.");
93             return;
94         }
95
96         // listener = new FlowDataListener();
97
98         // if (null ==
99         // FRMConsumerImpl.getDataBrokerService().registerDataChangeListener(path,
100         // listener)) {
101         // logger.error("Failed to listen on flow data modifcation events");
102         // System.out.println("Consumer SAL Service is down or NULL.");
103         // return;
104         // }
105
106         // For switch events
107         listener1Reg = FRMConsumerImpl.getNotificationService().registerNotificationListener(flowEventListener);
108
109         if (null == listener1Reg) {
110             logger.error("Listener to listen on flow data modifcation events");
111             System.out.println("Consumer SAL Service is down or NULL.");
112             return;
113         }
114         // addFlowTest();
115         System.out.println("-------------------------------------------------------------------");
116         commitHandler = new FlowDataCommitHandler();
117         FRMConsumerImpl.getDataProviderService().registerCommitHandler(path, commitHandler);
118         clusterContainerService = (IClusterContainerServices) ServiceHelper.getGlobalInstance(
119                 IClusterContainerServices.class, this);
120         allocateCaches();
121         /*
122          * If we are not the first cluster node to come up, do not initialize
123          * the static flow entries ordinal
124          */
125         if (staticFlowsOrdinal.size() == 0) {
126             staticFlowsOrdinal.put(0, Integer.valueOf(0));
127         }
128     }
129
130     private void allocateCaches() {
131
132         if (this.clusterContainerService == null) {
133             logger.warn("Un-initialized clusterContainerService, can't create cache");
134             return;
135         }
136
137         try {
138             clusterContainerService.createCache("frm.originalSwView",
139                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
140             clusterContainerService.createCache("frm.installedSwView",
141                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
142             clusterContainerService
143                     .createCache("frm.staticFlows", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
144             clusterContainerService.createCache("frm.staticFlowsOrdinal",
145                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
146             clusterContainerService.createCache("frm.inactiveFlows",
147                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
148             clusterContainerService.createCache("frm.nodeFlows", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
149             clusterContainerService.createCache("frm.groupFlows", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
150         } catch (CacheConfigException cce) {
151             logger.error("CacheConfigException");
152         } catch (CacheExistException cce) {
153             logger.error("CacheExistException");
154         }
155     }
156
157     private void addFlowTest() {
158         try {
159             NodeRef nodeOne = createNodeRef("foo:node:1");
160             AddFlowInputBuilder input1 = new AddFlowInputBuilder();
161
162             input1.setNode(nodeOne);
163             AddFlowInput firstMsg = input1.build();
164
165             if (null != flowService) {
166                 System.out.println(flowService.toString());
167             } else {
168                 System.out.println("ConsumerFlowService is NULL");
169             }
170             @SuppressWarnings("unused")
171             Future<RpcResult<AddFlowOutput>> result1 = flowService.addFlow(firstMsg);
172
173         } catch (Exception e) {
174             // TODO Auto-generated catch block
175             e.printStackTrace();
176         }
177     }
178
179     /**
180      * Adds flow to the southbound plugin and our internal database
181      *
182      * @param path
183      * @param dataObject
184      */
185     private void addFlow(InstanceIdentifier<?> path, Flow dataObject) {
186
187         AddFlowInputBuilder input = new AddFlowInputBuilder();
188         
189         List<Instruction> inst = (dataObject).getInstructions().getInstruction();
190         input.setNode((dataObject).getNode());
191         input.setPriority((dataObject).getPriority());
192         input.setMatch((dataObject).getMatch());
193         input.setCookie((dataObject).getCookie());
194         input.setInstructions((dataObject).getInstructions());
195         dataObject.getMatch().getLayer3Match();
196         for (int i = 0; i < inst.size(); i++) {
197             System.out.println("i = " + i + inst.get(i).getInstruction().toString());
198             System.out.println("i = " + i + inst.get(i).toString());
199         }
200
201         System.out.println("Instruction list" + (dataObject).getInstructions().getInstruction().toString());
202
203         // updating the staticflow cache
204         /*
205          *  Commented out... as in many other places... use of ClusteringServices is breaking things
206          *  insufficient time to debug
207         Integer ordinal = staticFlowsOrdinal.get(0);
208         staticFlowsOrdinal.put(0, ++ordinal);
209         staticFlows.put(ordinal, dataObject);
210         */
211
212         // We send flow to the sounthbound plugin
213         flowService.addFlow(input.build());
214         /*
215          * Commented out as this will also break due to improper use of ClusteringServices
216         updateLocalDatabase((NodeFlow) dataObject, true);
217         */
218     }
219
220     /**
221      * Removes flow to the southbound plugin and our internal database
222      *
223      * @param path
224      * @param dataObject
225      */
226     private void removeFlow(InstanceIdentifier<?> path, Flow dataObject) {
227
228         RemoveFlowInputBuilder input = new RemoveFlowInputBuilder();
229         List<Instruction> inst = (dataObject).getInstructions().getInstruction();
230         input.setNode((dataObject).getNode());
231         input.setPriority((dataObject).getPriority());
232         input.setMatch((dataObject).getMatch());
233         input.setCookie((dataObject).getCookie());
234         input.setInstructions((dataObject).getInstructions());
235         dataObject.getMatch().getLayer3Match();
236         for (int i = 0; i < inst.size(); i++) {
237             System.out.println("i = " + i + inst.get(i).getInstruction().toString());
238             System.out.println("i = " + i + inst.get(i).toString());
239         }
240
241         System.out.println("Instruction list" + (dataObject).getInstructions().getInstruction().toString());
242
243         // updating the staticflow cache
244         /*
245          * Commented out due to problems caused by improper use of ClusteringServices
246         Integer ordinal = staticFlowsOrdinal.get(0);
247         staticFlowsOrdinal.put(0, ++ordinal);
248         staticFlows.put(ordinal, dataObject);
249         */
250
251         // We send flow to the sounthbound plugin
252         flowService.removeFlow(input.build());
253         /*
254          * Commented out due to problems caused by improper use of ClusteringServices
255         updateLocalDatabase((NodeFlow) dataObject, false);
256         */
257     }
258
259     /**
260      * Update flow to the southbound plugin and our internal database
261      *
262      * @param path
263      * @param dataObject
264      */
265     private void updateFlow(InstanceIdentifier<?> path, Flow dataObject) {
266
267         UpdateFlowInputBuilder input = new UpdateFlowInputBuilder();
268         UpdatedFlowBuilder updatedflowbuilder = new UpdatedFlowBuilder();
269         updatedflowbuilder.fieldsFrom(dataObject);
270         input.setNode(dataObject.getNode());
271         input.setUpdatedFlow(updatedflowbuilder.build());
272
273         // updating the staticflow cache
274         /*
275          * Commented out due to problems caused by improper use of ClusteringServices.
276         Integer ordinal = staticFlowsOrdinal.get(0);
277         staticFlowsOrdinal.put(0, ++ordinal);
278         staticFlows.put(ordinal, dataObject);
279         */
280
281         // We send flow to the sounthbound plugin
282         flowService.updateFlow(input.build());
283         /*
284          * Commented out due to problems caused by improper use of ClusteringServices.
285         updateLocalDatabase((NodeFlow) dataObject, true);
286         */
287     }
288
289     @SuppressWarnings("unchecked")
290     private void commitToPlugin(internalTransaction transaction) {
291         Set<Entry<InstanceIdentifier<?>, DataObject>> createdEntries = transaction.getModification().getCreatedConfigurationData().entrySet();
292
293         /*
294          * This little dance is because updatedEntries contains both created and modified entries
295          * The reason I created a new HashSet is because the collections we are returned are immutable.
296          */
297         Set<Entry<InstanceIdentifier<?>, DataObject>> updatedEntries = new HashSet<Entry<InstanceIdentifier<?>, DataObject>>();
298         updatedEntries.addAll(transaction.getModification().getUpdatedConfigurationData().entrySet());
299         updatedEntries.removeAll(createdEntries);
300
301         Set<InstanceIdentifier<?>> removeEntriesInstanceIdentifiers = transaction.getModification().getRemovedConfigurationData();
302         transaction.getModification().getOriginalConfigurationData();
303         for (Entry<InstanceIdentifier<?>, DataObject> entry : createdEntries) {
304             if(entry.getValue() instanceof Flow) {
305                 System.out.println("Coming add cc in FlowDatacommitHandler");
306                 addFlow(entry.getKey(), (Flow) entry.getValue());
307             }
308         }
309         for (@SuppressWarnings("unused")
310         Entry<InstanceIdentifier<?>, DataObject> entry : updatedEntries) {
311             if(entry.getValue() instanceof Flow) {
312                 System.out.println("Coming update cc in FlowDatacommitHandler");
313                 updateFlow(entry.getKey(), (Flow) entry.getValue());
314             }
315         }
316
317         for (InstanceIdentifier<?> instanceId : removeEntriesInstanceIdentifiers ) {
318             DataObject removeValue = transaction.getModification().getOriginalConfigurationData().get(instanceId);
319             if(removeValue instanceof Flow) {
320                 System.out.println("Coming remove cc in FlowDatacommitHandler");
321                 removeFlow(instanceId, (Flow) removeValue);
322
323             }
324         }
325
326     }
327
328     private final class FlowDataCommitHandler implements DataCommitHandler<InstanceIdentifier<?>, DataObject> {
329
330         @SuppressWarnings("unchecked")
331         @Override
332         public DataCommitTransaction requestCommit(DataModification<InstanceIdentifier<?>, DataObject> modification) {
333             // We should verify transaction
334             System.out.println("Coming in FlowDatacommitHandler");
335             internalTransaction transaction = new internalTransaction(modification);
336             transaction.prepareUpdate();
337             return transaction;
338         }
339     }
340
341     private final class internalTransaction implements DataCommitTransaction<InstanceIdentifier<?>, DataObject> {
342
343         private final DataModification<InstanceIdentifier<?>, DataObject> modification;
344
345         @Override
346         public DataModification<InstanceIdentifier<?>, DataObject> getModification() {
347             return modification;
348         }
349
350         public internalTransaction(DataModification<InstanceIdentifier<?>, DataObject> modification) {
351             this.modification = modification;
352         }
353
354         Map<InstanceIdentifier<?>, Flow> additions = new HashMap<>();
355         Map<InstanceIdentifier<?>, Flow> updates = new HashMap<>();
356         Map<InstanceIdentifier<?>, Flow> removals = new HashMap<>();
357
358         /**
359          * We create a plan which flows will be added, which will be updated and
360          * which will be removed based on our internal state.
361          *
362          */
363         void prepareUpdate() {
364
365             Set<Entry<InstanceIdentifier<?>, DataObject>> puts = modification.getUpdatedConfigurationData().entrySet();
366             for (Entry<InstanceIdentifier<?>, DataObject> entry : puts) {
367
368                 // validating the DataObject
369                 DataObject value = entry.getValue();
370                 if(value instanceof Flow ) {
371                     Flow flow = (Flow)value;
372                     boolean status = validate(flow);
373                     if (!status) {
374                         return;
375                     }
376                     // Presence check
377                     /*
378                      * This is breaking due to some improper use of caches...
379                      *
380                     if (flowEntryExists(flow)) {
381                         String error = "Entry with this name on specified table already exists";
382                         logger.warn("Entry with this name on specified table already exists: {}", entry);
383                         logger.error(error);
384                         return;
385                     }
386                     if (originalSwView.containsKey(entry)) {
387                         logger.warn("Operation Rejected: A flow with same match and priority exists on the target node");
388                         logger.trace("Aborting to install {}", entry);
389                         continue;
390                     }
391                     */
392                     if (!FRMUtil.validateMatch(flow)) {
393                         logger.error("Not a valid Match");
394                         return;
395                     }
396                     if (!FRMUtil.validateInstructions(flow)) {
397                         logger.error("Not a valid Instruction");
398                         return;
399                     }
400                     /*
401                      * Commented out due to Clustering Services issues
402                      * preparePutEntry(entry.getKey(), flow);
403                      */
404                 }
405             }
406
407             // removals = modification.getRemovedConfigurationData();
408             Set<InstanceIdentifier<?>> removedData = modification.getRemovedConfigurationData();
409             for (InstanceIdentifier<?> removal : removedData) {
410                 DataObject value = modification.getOriginalConfigurationData().get(removal);
411                 if (value instanceof Flow) {
412                     removals.put(removal, (Flow) value);
413                 }
414             }
415
416         }
417
418         private void preparePutEntry(InstanceIdentifier<?> key, Flow flow) {
419             Flow original = originalSwView.get(key);
420             if (original != null) {
421                 // It is update for us
422                 System.out.println("Coming update  in FlowDatacommitHandler");
423                 updates.put(key, flow);
424             } else {
425                 // It is addition for us
426                 System.out.println("Coming add in FlowDatacommitHandler");
427                 additions.put(key, flow);
428             }
429         }
430
431         /**
432          * We are OK to go with execution of plan
433          *
434          */
435         @Override
436         public RpcResult<Void> finish() throws IllegalStateException {
437
438             commitToPlugin(this);
439             // We return true if internal transaction is successful.
440             // return Rpcs.getRpcResult(true, null, Collections.emptySet());
441             return Rpcs.getRpcResult(true, null, Collections.<RpcError>emptySet());
442         }
443
444         /**
445          *
446          * We should rollback our preparation
447          *
448          */
449         @Override
450         public RpcResult<Void> rollback() throws IllegalStateException {
451             // NOOP - we did not modified any internal state during
452             // requestCommit phase
453             // return Rpcs.getRpcResult(true, null, Collections.emptySet());
454             return Rpcs.getRpcResult(true, null, Collections.<RpcError>emptySet());
455
456         }
457
458         public boolean validate(Flow flow) {
459
460             String msg = ""; // Specific part of warn/error log
461
462             boolean result  = true;
463             // flow Name validation
464             if (flow.getFlowName() == null || flow.getFlowName().trim().isEmpty()
465                     || !flow.getFlowName().matches(NAMEREGEX)) {
466                 msg = "Invalid Flow name";
467                 result = false;
468             }
469             // Node Validation
470             if (result == true && flow.getNode() == null) {
471                 msg = "Node is null";
472                 result = false;
473             }
474
475             // TODO: Validate we are seeking to program a flow against a valid Node
476
477             if (result == true && flow.getPriority() != null) {
478                 if (flow.getPriority() < 0 || flow.getPriority() > 65535) {
479                     msg = String.format("priority %s is not in the range 0 - 65535",
480                             flow.getPriority());
481                     result = false;
482                 }
483             }
484             if (result == false) {
485                 logger.warn("Invalid Configuration for flow {}. The failure is {}",flow,msg);
486                 logger.error("Invalid Configuration ({})",msg);
487             }
488             return result;
489         }
490
491         private boolean flowEntryExists(Flow flow) {
492             // Flow name has to be unique on per table id basis
493             for (ConcurrentMap.Entry<FlowKey, Flow> entry : originalSwView.entrySet()) {
494                 if (entry.getValue().getFlowName().equals(flow.getFlowName())
495                         && entry.getValue().getTableId().equals(flow.getTableId())) {
496                     return true;
497                 }
498             }
499             return false;
500         }
501     }
502
503     final class FlowEventListener implements SalFlowListener {
504
505         List<FlowAdded> addedFlows = new ArrayList<>();
506         List<FlowRemoved> removedFlows = new ArrayList<>();
507         List<FlowUpdated> updatedFlows = new ArrayList<>();
508
509         @Override
510         public void onFlowAdded(FlowAdded notification) {
511             System.out.println("added flow..........................");
512             addedFlows.add(notification);
513         }
514
515         @Override
516         public void onFlowRemoved(FlowRemoved notification) {
517             removedFlows.add(notification);
518         };
519
520         @Override
521         public void onFlowUpdated(FlowUpdated notification) {
522             updatedFlows.add(notification);
523         }
524
525         @Override
526         public void onSwitchFlowRemoved(SwitchFlowRemoved notification) {
527             // TODO
528         }
529
530         @Override
531         public void onNodeErrorNotification(NodeErrorNotification notification) {
532             // TODO Auto-generated method stub
533
534         }
535
536         @Override
537         public void onNodeExperimenterErrorNotification(NodeExperimenterErrorNotification notification) {
538             // TODO Auto-generated method stub
539
540         };
541
542     }
543
544     // Commented out DataChangeListene - to be used by Stats
545
546     // final class FlowDataListener implements DataChangeListener {
547     // private SalFlowService flowService;
548     //
549     // public FlowDataListener() {
550     //
551     // }
552     //
553     // @Override
554     // public void onDataChanged(
555     // DataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
556     // System.out.println("Coming in onDataChange..............");
557     // @SuppressWarnings("unchecked")
558     // Collection<DataObject> additions = (Collection<DataObject>)
559     // change.getCreatedConfigurationData();
560     // // we can check for getCreated, getDeleted or getUpdated from DataChange
561     // Event class
562     // for (DataObject dataObject : additions) {
563     // if (dataObject instanceof NodeFlow) {
564     // NodeRef nodeOne = createNodeRef("foo:node:1");
565     // // validating the dataObject here
566     // AddFlowInputBuilder input = new AddFlowInputBuilder();
567     // input.setNode(((NodeFlow) dataObject).getNode());
568     // input.setNode(nodeOne);
569     // // input.setPriority(((NodeFlow) dataObject).getPriority());
570     // //input.setMatch(((NodeFlow) dataObject).getMatch());
571     // //input.setFlowTable(((NodeFlow) dataObject).getFlowTable());
572     // //input.setCookie(((NodeFlow) dataObject).getCookie());
573     // //input.setAction(((NodeFlow) dataObject).getAction());
574     //
575     // @SuppressWarnings("unused")
576     // Future<RpcResult<java.lang.Void>> result =
577     // flowService.addFlow(input.build());
578     // }
579     // }
580     // }
581     // }
582
583     private static void updateLocalDatabase(NodeFlow entry, boolean add) {
584
585         updateSwViewes(entry, add);
586
587         updateNodeFlowsDB(entry, add);
588
589     }
590
591     /*
592      * Update the node mapped flows database
593      */
594     private static void updateSwViewes(NodeFlow entry, boolean add) {
595         if (add) {
596             FlowConsumerImpl.originalSwView.put((FlowKey) entry, (Flow) entry);
597             installedSwView.put((FlowKey) entry, (Flow) entry);
598         } else {
599             originalSwView.remove(entry);
600             installedSwView.remove(entry);
601
602         }
603     }
604
605     @Override
606     public List<DataObject> get() {
607
608         List<DataObject> orderedList = new ArrayList<DataObject>();
609         ConcurrentMap<Integer, Flow> flowMap = staticFlows;
610         int maxKey = staticFlowsOrdinal.get(0).intValue();
611         for (int i = 0; i <= maxKey; i++) {
612             Flow entry = flowMap.get(i);
613             if (entry != null) {
614                 orderedList.add(entry);
615             }
616         }
617         return orderedList;
618     }
619
620     @Override
621     public DataObject getWithName(String name, org.opendaylight.controller.sal.core.Node n) {
622         if (this instanceof FlowConsumerImpl) {
623             for (ConcurrentMap.Entry<Integer, Flow> flowEntry : staticFlows.entrySet()) {
624                 Flow flow = flowEntry.getValue();
625                 if (flow.getNode().equals(n) && flow.getFlowName().equals(name)) {
626
627                     return flowEntry.getValue();
628                 }
629             }
630         }
631         return null;
632     }
633
634     /*
635      * Update the node mapped flows database
636      */
637     private static void updateNodeFlowsDB(NodeFlow entry, boolean add) {
638         Node node = (Node) entry.getNode();
639
640         List<Flow> nodeIndeces = nodeFlows.get(node);
641         if (nodeIndeces == null) {
642             if (!add) {
643                 return;
644             } else {
645                 nodeIndeces = new ArrayList<Flow>();
646             }
647         }
648
649         if (add) {
650             nodeIndeces.add((Flow) entry);
651         } else {
652             nodeIndeces.remove(entry);
653         }
654
655         // Update cache across cluster
656         if (nodeIndeces.isEmpty()) {
657             nodeFlows.remove(node);
658         } else {
659             nodeFlows.put(node, nodeIndeces);
660         }
661     }
662
663     private static NodeRef createNodeRef(String string) {
664         NodeKey key = new NodeKey(new NodeId(string));
665         InstanceIdentifier<Node> path = InstanceIdentifier.builder().node(Nodes.class).node(Node.class, key)
666                 .toInstance();
667
668         return new NodeRef(path);
669     }
670 }