d17bf8da11dbbeb9f012589c49297faf535b71bf
[controller.git] / opendaylight / forwardingrulesmanager_mdsal / openflow / src / main / java / org / opendaylight / controller / forwardingrulesmanager_mdsal / consumer / impl / FlowConsumerImpl.java
1 package org.opendaylight.controller.forwardingrulesmanager_mdsal.consumer.impl;
2
3 import java.util.ArrayList;
4 import java.util.EnumSet;
5 import java.util.HashMap;
6 import java.util.List;
7 import java.util.Map;
8 import java.util.Map.Entry;
9 import java.util.Set;
10 import java.util.concurrent.ConcurrentHashMap;
11 import java.util.concurrent.ConcurrentMap;
12 import java.util.concurrent.Future;
13
14 import org.opendaylight.controller.clustering.services.CacheConfigException;
15 import org.opendaylight.controller.clustering.services.CacheExistException;
16 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
17 import org.opendaylight.controller.clustering.services.IClusterServices;
18 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
19 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction;
20 import org.opendaylight.controller.md.sal.common.api.data.DataModification;
21 import org.opendaylight.controller.sal.common.util.Rpcs;
22 import org.opendaylight.controller.sal.core.IContainer;
23 import org.opendaylight.controller.sal.utils.GlobalConstants;
24 import org.opendaylight.controller.sal.utils.ServiceHelper;
25 import org.opendaylight.controller.sal.utils.Status;
26 import org.opendaylight.controller.sal.utils.StatusCode;
27 import org.opendaylight.controller.switchmanager.ISwitchManager;
28 import org.opendaylight.controller.switchmanager.Switch;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.config.rev130819.Flows;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.config.rev130819.flows.Flow;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.config.rev130819.flows.FlowKey;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowAdded;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowUpdated;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeFlow;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInputBuilder;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowListener;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SwitchFlowRemoved;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowInputBuilder;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.flow.update.UpdatedFlowBuilder;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.instruction.list.Instruction;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
51 import org.opendaylight.yangtools.concepts.Registration;
52 import org.opendaylight.yangtools.yang.binding.DataObject;
53 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
54 import org.opendaylight.yangtools.yang.binding.NotificationListener;
55 import org.opendaylight.yangtools.yang.common.RpcResult;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
58
59 public class FlowConsumerImpl implements IForwardingRulesManager {
60     protected static final Logger logger = LoggerFactory.getLogger(FlowConsumerImpl.class);
61     private final FlowEventListener flowEventListener = new FlowEventListener();
62     private Registration<NotificationListener> listener1Reg;
63     private SalFlowService flowService;
64     // private FlowDataListener listener;
65     private FlowDataCommitHandler commitHandler;
66     private static ConcurrentHashMap<FlowKey, Flow> originalSwView;
67     private static ConcurrentMap<FlowKey, Flow> installedSwView;
68     private IClusterContainerServices clusterContainerService = null;
69     private IContainer container;
70     private static final String NAMEREGEX = "^[a-zA-Z0-9]+$";
71     private static ConcurrentMap<Integer, Flow> staticFlows;
72     private static ConcurrentMap<Integer, Integer> staticFlowsOrdinal;
73     /*
74      * Inactive flow list. This is for the global instance of FRM It will
75      * contain all the flow entries which were installed on the global container
76      * when the first container is created.
77      */
78     private static ConcurrentMap<FlowKey, Flow> inactiveFlows;
79
80     /*
81      * /* Per node indexing
82      */
83     private static ConcurrentMap<Node, List<Flow>> nodeFlows;
84     private boolean inContainerMode; // being used by global instance only
85
86     public FlowConsumerImpl() {
87         InstanceIdentifier<? extends DataObject> path = InstanceIdentifier.builder().node(Flows.class).toInstance();
88         flowService = FRMConsumerImpl.getProviderSession().getRpcService(SalFlowService.class);
89
90         if (null == flowService) {
91             logger.error("Consumer SAL Service is down or NULL. FRM may not function as intended");
92             System.out.println("Consumer SAL Service is down or NULL.");
93             return;
94         }
95
96         // listener = new FlowDataListener();
97
98         // if (null ==
99         // FRMConsumerImpl.getDataBrokerService().registerDataChangeListener(path,
100         // listener)) {
101         // logger.error("Failed to listen on flow data modifcation events");
102         // System.out.println("Consumer SAL Service is down or NULL.");
103         // return;
104         // }
105
106         // For switch events
107         listener1Reg = FRMConsumerImpl.getNotificationService().registerNotificationListener(flowEventListener);
108
109         if (null == listener1Reg) {
110             logger.error("Listener to listen on flow data modifcation events");
111             System.out.println("Consumer SAL Service is down or NULL.");
112             return;
113         }
114         // addFlowTest();
115         System.out.println("-------------------------------------------------------------------");
116         allocateCaches();
117         commitHandler = new FlowDataCommitHandler();
118         FRMConsumerImpl.getDataProviderService().registerCommitHandler(path, commitHandler);
119         clusterContainerService = (IClusterContainerServices) ServiceHelper.getGlobalInstance(
120                 IClusterContainerServices.class, this);
121         container = (IContainer) ServiceHelper.getGlobalInstance(IContainer.class, this);
122         /*
123          * If we are not the first cluster node to come up, do not initialize
124          * the static flow entries ordinal
125          */
126         if (staticFlowsOrdinal.size() == 0) {
127             staticFlowsOrdinal.put(0, Integer.valueOf(0));
128         }
129     }
130
131     private void allocateCaches() {
132
133         if (this.clusterContainerService == null) {
134             logger.warn("Un-initialized clusterContainerService, can't create cache");
135             return;
136         }
137
138         try {
139             clusterContainerService.createCache("frm.originalSwView",
140                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
141             clusterContainerService.createCache("frm.installedSwView",
142                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
143             clusterContainerService
144                     .createCache("frm.staticFlows", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
145             clusterContainerService.createCache("frm.staticFlowsOrdinal",
146                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
147             clusterContainerService.createCache("frm.inactiveFlows",
148                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
149             clusterContainerService.createCache("frm.nodeFlows", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
150             clusterContainerService.createCache("frm.groupFlows", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
151         } catch (CacheConfigException cce) {
152             logger.error("CacheConfigException");
153         } catch (CacheExistException cce) {
154             logger.error("CacheExistException");
155         }
156     }
157
158     private void addFlowTest() {
159         try {
160             NodeRef nodeOne = createNodeRef("foo:node:1");
161             AddFlowInputBuilder input1 = new AddFlowInputBuilder();
162
163             input1.setNode(nodeOne);
164             AddFlowInput firstMsg = input1.build();
165
166             if (null != flowService) {
167                 System.out.println(flowService.toString());
168             } else {
169                 System.out.println("ConsumerFlowService is NULL");
170             }
171             @SuppressWarnings("unused")
172             Future<RpcResult<AddFlowOutput>> result1 = flowService.addFlow(firstMsg);
173
174         } catch (Exception e) {
175             // TODO Auto-generated catch block
176             e.printStackTrace();
177         }
178     }
179
180     /**
181      * Adds flow to the southbound plugin and our internal database
182      *
183      * @param path
184      * @param dataObject
185      */
186     private void addFlow(InstanceIdentifier<?> path, Flow dataObject) {
187
188         AddFlowInputBuilder input = new AddFlowInputBuilder();
189         List<Instruction> inst = (dataObject).getInstructions().getInstruction();
190         input.setNode((dataObject).getNode());
191         input.setPriority((dataObject).getPriority());
192         input.setMatch((dataObject).getMatch());
193         input.setCookie((dataObject).getCookie());
194         input.setInstructions((dataObject).getInstructions());
195         dataObject.getMatch().getLayer3Match();
196         for (int i = 0; i < inst.size(); i++) {
197             System.out.println("i = " + i + inst.get(i).getInstruction().toString());
198             System.out.println("i = " + i + inst.get(i).toString());
199         }
200
201         System.out.println("Instruction list" + (dataObject).getInstructions().getInstruction().toString());
202
203         // updating the staticflow cache
204         Integer ordinal = staticFlowsOrdinal.get(0);
205         staticFlowsOrdinal.put(0, ++ordinal);
206         staticFlows.put(ordinal, dataObject);
207
208         // We send flow to the sounthbound plugin
209         flowService.addFlow(input.build());
210         updateLocalDatabase((NodeFlow) dataObject, true);
211     }
212
213     /**
214      * Removes flow to the southbound plugin and our internal database
215      *
216      * @param path
217      * @param dataObject
218      */
219     private void removeFlow(InstanceIdentifier<?> path, Flow dataObject) {
220
221         RemoveFlowInputBuilder input = new RemoveFlowInputBuilder();
222         List<Instruction> inst = (dataObject).getInstructions().getInstruction();
223         input.setNode((dataObject).getNode());
224         input.setPriority((dataObject).getPriority());
225         input.setMatch((dataObject).getMatch());
226         input.setCookie((dataObject).getCookie());
227         input.setInstructions((dataObject).getInstructions());
228         dataObject.getMatch().getLayer3Match();
229         for (int i = 0; i < inst.size(); i++) {
230             System.out.println("i = " + i + inst.get(i).getInstruction().toString());
231             System.out.println("i = " + i + inst.get(i).toString());
232         }
233
234         System.out.println("Instruction list" + (dataObject).getInstructions().getInstruction().toString());
235
236         // updating the staticflow cache
237         Integer ordinal = staticFlowsOrdinal.get(0);
238         staticFlowsOrdinal.put(0, ++ordinal);
239         staticFlows.put(ordinal, dataObject);
240
241         // We send flow to the sounthbound plugin
242         flowService.removeFlow(input.build());
243         updateLocalDatabase((NodeFlow) dataObject, false);
244     }
245
246     /**
247      * Update flow to the southbound plugin and our internal database
248      *
249      * @param path
250      * @param dataObject
251      */
252     private void updateFlow(InstanceIdentifier<?> path, Flow dataObject) {
253
254         UpdateFlowInputBuilder input = new UpdateFlowInputBuilder();
255         UpdatedFlowBuilder updatedflowbuilder = new UpdatedFlowBuilder();
256         updatedflowbuilder.fieldsFrom(dataObject);
257         input.setUpdatedFlow(updatedflowbuilder.build());
258
259         // updating the staticflow cache
260         Integer ordinal = staticFlowsOrdinal.get(0);
261         staticFlowsOrdinal.put(0, ++ordinal);
262         staticFlows.put(ordinal, dataObject);
263
264         // We send flow to the sounthbound plugin
265         flowService.updateFlow(input.build());
266         updateLocalDatabase((NodeFlow) dataObject, true);
267     }
268
269     @SuppressWarnings("unchecked")
270     private void commitToPlugin(internalTransaction transaction) {
271         for (Entry<InstanceIdentifier<?>, Flow> entry : transaction.additions.entrySet()) {
272             System.out.println("Coming add cc in FlowDatacommitHandler");
273             addFlow(entry.getKey(), entry.getValue());
274         }
275         for (@SuppressWarnings("unused")
276         Entry<InstanceIdentifier<?>, Flow> entry : transaction.updates.entrySet()) {
277             System.out.println("Coming update cc in FlowDatacommitHandler");
278             updateFlow(entry.getKey(), entry.getValue());
279         }
280
281         for (Entry<InstanceIdentifier<?>, Flow> entry : transaction.removals.entrySet()) {
282             System.out.println("Coming remove cc in FlowDatacommitHandler");
283             removeFlow(entry.getKey(), entry.getValue());
284         }
285
286     }
287
288     private final class FlowDataCommitHandler implements DataCommitHandler<InstanceIdentifier<?>, DataObject> {
289
290         @SuppressWarnings("unchecked")
291         @Override
292         public DataCommitTransaction requestCommit(DataModification<InstanceIdentifier<?>, DataObject> modification) {
293             // We should verify transaction
294             System.out.println("Coming in FlowDatacommitHandler");
295             internalTransaction transaction = new internalTransaction(modification);
296             transaction.prepareUpdate();
297             return transaction;
298         }
299     }
300
301     private final class internalTransaction implements DataCommitTransaction<InstanceIdentifier<?>, DataObject> {
302
303         private final DataModification<InstanceIdentifier<?>, DataObject> modification;
304
305         @Override
306         public DataModification<InstanceIdentifier<?>, DataObject> getModification() {
307             return modification;
308         }
309
310         public internalTransaction(DataModification<InstanceIdentifier<?>, DataObject> modification) {
311             this.modification = modification;
312         }
313
314         Map<InstanceIdentifier<?>, Flow> additions = new HashMap<>();
315         Map<InstanceIdentifier<?>, Flow> updates = new HashMap<>();
316         Map<InstanceIdentifier<?>, Flow> removals = new HashMap<>();
317
318         /**
319          * We create a plan which flows will be added, which will be updated and
320          * which will be removed based on our internal state.
321          *
322          */
323         void prepareUpdate() {
324
325             Set<Entry<InstanceIdentifier<?>, DataObject>> puts = modification.getUpdatedConfigurationData().entrySet();
326             for (Entry<InstanceIdentifier<?>, DataObject> entry : puts) {
327
328                 // validating the DataObject
329
330                 Status status = validate(container, (NodeFlow) entry);
331                 if (!status.isSuccess()) {
332                     logger.warn("Invalid Configuration for flow {}. The failure is {}", entry, status.getDescription());
333                     String error = "Invalid Configuration (" + status.getDescription() + ")";
334                     logger.error(error);
335                     return;
336                 }
337                 // Presence check
338                 if (flowEntryExists((NodeFlow) entry)) {
339                     String error = "Entry with this name on specified table already exists";
340                     logger.warn("Entry with this name on specified table already exists: {}", entry);
341                     logger.error(error);
342                     return;
343                 }
344                 if (originalSwView.containsKey(entry)) {
345                     logger.warn("Operation Rejected: A flow with same match and priority exists on the target node");
346                     logger.trace("Aborting to install {}", entry);
347                     continue;
348                 }
349                 if (!FRMUtil.validateMatch((NodeFlow) entry)) {
350                     logger.error("Not a valid Match");
351                     return;
352                 }
353                 if (!FRMUtil.validateInstructions((NodeFlow) entry)) {
354                     logger.error("Not a valid Instruction");
355                     return;
356                 }
357                 if (entry.getValue() instanceof Flow) {
358                     Flow flow = (Flow) entry.getValue();
359                     preparePutEntry(entry.getKey(), flow);
360                 }
361
362             }
363
364             // removals = modification.getRemovedConfigurationData();
365             Set<InstanceIdentifier<?>> removedData = modification.getRemovedConfigurationData();
366             for (InstanceIdentifier<?> removal : removedData) {
367                 DataObject value = modification.getOriginalConfigurationData().get(removal);
368                 if (value instanceof Flow) {
369                     removals.put(removal, (Flow) value);
370                 }
371             }
372
373         }
374
375         private void preparePutEntry(InstanceIdentifier<?> key, Flow flow) {
376             Flow original = originalSwView.get(key);
377             if (original != null) {
378                 // It is update for us
379                 System.out.println("Coming update  in FlowDatacommitHandler");
380                 updates.put(key, flow);
381             } else {
382                 // It is addition for us
383                 System.out.println("Coming add in FlowDatacommitHandler");
384                 additions.put(key, flow);
385             }
386         }
387
388         /**
389          * We are OK to go with execution of plan
390          *
391          */
392         @Override
393         public RpcResult<Void> finish() throws IllegalStateException {
394
395             commitToPlugin(this);
396             // We return true if internal transaction is successful.
397             // return Rpcs.getRpcResult(true, null, Collections.emptySet());
398             return Rpcs.getRpcResult(true, null, null);
399         }
400
401         /**
402          *
403          * We should rollback our preparation
404          *
405          */
406         @Override
407         public RpcResult<Void> rollback() throws IllegalStateException {
408             // NOOP - we did not modified any internal state during
409             // requestCommit phase
410             // return Rpcs.getRpcResult(true, null, Collections.emptySet());
411             return Rpcs.getRpcResult(true, null, null);
412
413         }
414
415         public Status validate(IContainer container, NodeFlow dataObject) {
416
417             // container validation
418             Switch sw = null;
419             Node node = null;
420             String containerName = (container == null) ? GlobalConstants.DEFAULT.toString() : container.getName();
421             ISwitchManager switchManager = (ISwitchManager) ServiceHelper.getInstance(ISwitchManager.class,
422                     containerName, this);
423             // flow Name validation
424             if (dataObject.getFlowName() == null || dataObject.getFlowName().trim().isEmpty()
425                     || !dataObject.getFlowName().matches(NAMEREGEX)) {
426                 return new Status(StatusCode.BADREQUEST, "Invalid Flow name");
427             }
428             // Node Validation
429             if (dataObject.getNode() == null) {
430                 return new Status(StatusCode.BADREQUEST, "Node is null");
431             }
432
433             if (switchManager != null) {
434                 for (Switch device : switchManager.getNetworkDevices()) {
435                     node = (Node) device.getNode();
436                     if (device.getNode().equals(dataObject.getNode())) {
437                         sw = device;
438                         break;
439                     }
440                 }
441                 if (sw == null) {
442                     return new Status(StatusCode.BADREQUEST, String.format("Node %s not found", node));
443                 }
444             } else {
445                 logger.debug("switchmanager is not set yet");
446             }
447
448             if (dataObject.getPriority() != null) {
449                 if (dataObject.getPriority() < 0 || dataObject.getPriority() > 65535) {
450                     return new Status(StatusCode.BADREQUEST, String.format("priority %s is not in the range 0 - 65535",
451                             dataObject.getPriority()));
452                 }
453             }
454
455             return new Status(StatusCode.SUCCESS);
456         }
457
458         private boolean flowEntryExists(NodeFlow config) {
459             // Flow name has to be unique on per table id basis
460             for (ConcurrentMap.Entry<FlowKey, Flow> entry : originalSwView.entrySet()) {
461                 if (entry.getValue().getFlowName().equals(config.getFlowName())
462                         && entry.getValue().getTableId().equals(config.getTableId())) {
463                     return true;
464                 }
465             }
466             return false;
467         }
468     }
469
470     final class FlowEventListener implements SalFlowListener {
471
472         List<FlowAdded> addedFlows = new ArrayList<>();
473         List<FlowRemoved> removedFlows = new ArrayList<>();
474         List<FlowUpdated> updatedFlows = new ArrayList<>();
475
476         @Override
477         public void onFlowAdded(FlowAdded notification) {
478             System.out.println("added flow..........................");
479             addedFlows.add(notification);
480         }
481
482         @Override
483         public void onFlowRemoved(FlowRemoved notification) {
484             removedFlows.add(notification);
485         };
486
487         @Override
488         public void onFlowUpdated(FlowUpdated notification) {
489             updatedFlows.add(notification);
490         }
491
492         @Override
493         public void onSwitchFlowRemoved(SwitchFlowRemoved notification) {
494             // TODO
495         };
496
497     }
498
499     // Commented out DataChangeListene - to be used by Stats
500
501     // final class FlowDataListener implements DataChangeListener {
502     // private SalFlowService flowService;
503     //
504     // public FlowDataListener() {
505     //
506     // }
507     //
508     // @Override
509     // public void onDataChanged(
510     // DataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
511     // System.out.println("Coming in onDataChange..............");
512     // @SuppressWarnings("unchecked")
513     // Collection<DataObject> additions = (Collection<DataObject>)
514     // change.getCreatedConfigurationData();
515     // // we can check for getCreated, getDeleted or getUpdated from DataChange
516     // Event class
517     // for (DataObject dataObject : additions) {
518     // if (dataObject instanceof NodeFlow) {
519     // NodeRef nodeOne = createNodeRef("foo:node:1");
520     // // validating the dataObject here
521     // AddFlowInputBuilder input = new AddFlowInputBuilder();
522     // input.setNode(((NodeFlow) dataObject).getNode());
523     // input.setNode(nodeOne);
524     // // input.setPriority(((NodeFlow) dataObject).getPriority());
525     // //input.setMatch(((NodeFlow) dataObject).getMatch());
526     // //input.setFlowTable(((NodeFlow) dataObject).getFlowTable());
527     // //input.setCookie(((NodeFlow) dataObject).getCookie());
528     // //input.setAction(((NodeFlow) dataObject).getAction());
529     //
530     // @SuppressWarnings("unused")
531     // Future<RpcResult<java.lang.Void>> result =
532     // flowService.addFlow(input.build());
533     // }
534     // }
535     // }
536     // }
537
538     private static void updateLocalDatabase(NodeFlow entry, boolean add) {
539
540         updateSwViewes(entry, add);
541
542         updateNodeFlowsDB(entry, add);
543
544     }
545
546     /*
547      * Update the node mapped flows database
548      */
549     private static void updateSwViewes(NodeFlow entry, boolean add) {
550         if (add) {
551             FlowConsumerImpl.originalSwView.put((FlowKey) entry, (Flow) entry);
552             installedSwView.put((FlowKey) entry, (Flow) entry);
553         } else {
554             originalSwView.remove(entry);
555             installedSwView.remove(entry);
556
557         }
558     }
559
560     @Override
561     public List<DataObject> get() {
562
563         List<DataObject> orderedList = new ArrayList<DataObject>();
564         ConcurrentMap<Integer, Flow> flowMap = staticFlows;
565         int maxKey = staticFlowsOrdinal.get(0).intValue();
566         for (int i = 0; i <= maxKey; i++) {
567             Flow entry = flowMap.get(i);
568             if (entry != null) {
569                 orderedList.add(entry);
570             }
571         }
572         return orderedList;
573     }
574
575     @Override
576     public DataObject getWithName(String name, org.opendaylight.controller.sal.core.Node n) {
577         if (this instanceof FlowConsumerImpl) {
578             for (ConcurrentMap.Entry<Integer, Flow> flowEntry : staticFlows.entrySet()) {
579                 Flow flow = flowEntry.getValue();
580                 if (flow.getNode().equals(n) && flow.getFlowName().equals(name)) {
581
582                     return flowEntry.getValue();
583                 }
584             }
585         }
586         return null;
587     }
588
589     /*
590      * Update the node mapped flows database
591      */
592     private static void updateNodeFlowsDB(NodeFlow entry, boolean add) {
593         Node node = (Node) entry.getNode();
594
595         List<Flow> nodeIndeces = nodeFlows.get(node);
596         if (nodeIndeces == null) {
597             if (!add) {
598                 return;
599             } else {
600                 nodeIndeces = new ArrayList<Flow>();
601             }
602         }
603
604         if (add) {
605             nodeIndeces.add((Flow) entry);
606         } else {
607             nodeIndeces.remove(entry);
608         }
609
610         // Update cache across cluster
611         if (nodeIndeces.isEmpty()) {
612             nodeFlows.remove(node);
613         } else {
614             nodeFlows.put(node, nodeIndeces);
615         }
616     }
617
618     private static NodeRef createNodeRef(String string) {
619         NodeKey key = new NodeKey(new NodeId(string));
620         InstanceIdentifier<Node> path = InstanceIdentifier.builder().node(Nodes.class).node(Node.class, key)
621                 .toInstance();
622
623         return new NodeRef(path);
624     }
625 }