Merge "Few Validations for Match/Actios in FRMUtil"
[controller.git] / opendaylight / md-sal / forwardingrules-manager / src / main / java / org / opendaylight / controller / forwardingrulesmanager / consumer / impl / FlowConsumerImpl.java
1 package org.opendaylight.controller.forwardingrulesmanager.consumer.impl;
2
3 import java.util.ArrayList;
4 import java.util.Collections;
5 import java.util.EnumSet;
6 import java.util.HashMap;
7 import java.util.HashSet;
8 import java.util.List;
9 import java.util.Map;
10 import java.util.Map.Entry;
11 import java.util.Set;
12 import java.util.concurrent.ConcurrentHashMap;
13 import java.util.concurrent.ConcurrentMap;
14 import java.util.concurrent.Future;
15
16 import org.opendaylight.controller.clustering.services.CacheConfigException;
17 import org.opendaylight.controller.clustering.services.CacheExistException;
18 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
19 import org.opendaylight.controller.clustering.services.IClusterServices;
20 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
21 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction;
22 import org.opendaylight.controller.md.sal.common.api.data.DataModification;
23 import org.opendaylight.controller.sal.common.util.Rpcs;
24 import org.opendaylight.controller.sal.core.IContainer;
25 import org.opendaylight.controller.sal.utils.ServiceHelper;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.config.rev130819.Flows;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.config.rev130819.flows.Flow;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.config.rev130819.flows.FlowKey;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeFlow;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInputBuilder;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowInputBuilder;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.flow.update.UpdatedFlowBuilder;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
42 import org.opendaylight.yangtools.concepts.Registration;
43 import org.opendaylight.yangtools.yang.binding.DataObject;
44 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
45 import org.opendaylight.yangtools.yang.binding.NotificationListener;
46 import org.opendaylight.yangtools.yang.common.RpcError;
47 import org.opendaylight.yangtools.yang.common.RpcResult;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50
51 public class FlowConsumerImpl implements IForwardingRulesManager {
52     protected static final Logger logger = LoggerFactory.getLogger(FlowConsumerImpl.class);
53     // private final FlowEventListener flowEventListener = new
54     // FlowEventListener();
55     private Registration<NotificationListener> listener1Reg;
56     private SalFlowService flowService;
57     // private FlowDataListener listener;
58     private FlowDataCommitHandler commitHandler;
59     private static ConcurrentHashMap<FlowKey, Flow> originalSwView;
60     private static ConcurrentMap<FlowKey, Flow> installedSwView;
61     private IClusterContainerServices clusterContainerService = null;
62     private IContainer container;
63     private static final String NAMEREGEX = "^[a-zA-Z0-9]+$";
64     private static ConcurrentMap<Integer, Flow> staticFlows;
65     private static ConcurrentMap<Integer, Integer> staticFlowsOrdinal = new ConcurrentHashMap<Integer, Integer>();
66     /*
67      * Inactive flow list. This is for the global instance of FRM It will
68      * contain all the flow entries which were installed on the global container
69      * when the first container is created.
70      */
71     private static ConcurrentMap<FlowKey, Flow> inactiveFlows;
72
73     /*
74      * /* Per node indexing
75      */
76     private static ConcurrentMap<Node, List<Flow>> nodeFlows;
77     private boolean inContainerMode; // being used by global instance only
78
79     public FlowConsumerImpl() {
80         InstanceIdentifier<? extends DataObject> path = InstanceIdentifier.builder(Flows.class).toInstance();
81         flowService = FRMConsumerImpl.getProviderSession().getRpcService(SalFlowService.class);
82
83         if (null == flowService) {
84             logger.error("Consumer SAL Service is down or NULL. FRM may not function as intended");
85             return;
86         }
87
88         // listener = new FlowDataListener();
89
90         // if (null ==
91         // FRMConsumerImpl.getDataBrokerService().registerDataChangeListener(path,
92         // listener)) {
93         // logger.error("Failed to listen on flow data modifcation events");
94         // System.out.println("Consumer SAL Service is down or NULL.");
95         // return;
96         // }
97
98         // For switch events
99         // listener1Reg =
100         // FRMConsumerImpl.getNotificationService().registerNotificationListener(flowEventListener);
101
102         if (null == listener1Reg) {
103             logger.error("Listener to listen on flow data modifcation events");
104             return;
105         }
106         // addFlowTest();
107         commitHandler = new FlowDataCommitHandler();
108         FRMConsumerImpl.getDataProviderService().registerCommitHandler(path, commitHandler);
109         clusterContainerService = (IClusterContainerServices) ServiceHelper.getGlobalInstance(
110                 IClusterContainerServices.class, this);
111         allocateCaches();
112         /*
113          * If we are not the first cluster node to come up, do not initialize
114          * the static flow entries ordinal
115          */
116         if (staticFlowsOrdinal.size() == 0) {
117             staticFlowsOrdinal.put(0, Integer.valueOf(0));
118         }
119     }
120
121     private void allocateCaches() {
122
123         if (this.clusterContainerService == null) {
124             logger.warn("Un-initialized clusterContainerService, can't create cache");
125             return;
126         }
127
128         try {
129             clusterContainerService.createCache("frm.originalSwView",
130                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
131             clusterContainerService.createCache("frm.installedSwView",
132                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
133             clusterContainerService
134                     .createCache("frm.staticFlows", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
135             clusterContainerService.createCache("frm.staticFlowsOrdinal",
136                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
137             clusterContainerService.createCache("frm.inactiveFlows",
138                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
139             clusterContainerService.createCache("frm.nodeFlows", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
140             clusterContainerService.createCache("frm.groupFlows", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
141         } catch (CacheConfigException cce) {
142             logger.error("CacheConfigException");
143         } catch (CacheExistException cce) {
144             logger.error("CacheExistException");
145         }
146     }
147
148     private void addFlowTest() {
149         try {
150             NodeRef nodeOne = createNodeRef("foo:node:1");
151             AddFlowInputBuilder input1 = new AddFlowInputBuilder();
152
153             input1.setNode(nodeOne);
154             AddFlowInput firstMsg = input1.build();
155
156             if (null != flowService) {
157                 logger.error("ConsumerFlowService is NULL");
158             }
159             @SuppressWarnings("unused")
160             Future<RpcResult<AddFlowOutput>> result1 = flowService.addFlow(firstMsg);
161
162         } catch (Exception e) {
163             // TODO Auto-generated catch block
164             e.printStackTrace();
165         }
166     }
167
168     /**
169      * Adds flow to the southbound plugin and our internal database
170      *
171      * @param path
172      * @param dataObject
173      */
174     private void addFlow(InstanceIdentifier<?> path, Flow dataObject) {
175
176         AddFlowInputBuilder input = new AddFlowInputBuilder();
177
178         input.setNode((dataObject).getNode());
179         input.setPriority((dataObject).getPriority());
180         input.setMatch((dataObject).getMatch());
181         input.setCookie((dataObject).getCookie());
182         input.setInstructions((dataObject).getInstructions());
183         input.setBufferId(dataObject.getBufferId());
184         input.setTableId(dataObject.getTableId());
185         input.setOutPort(dataObject.getOutPort());
186         input.setOutGroup(dataObject.getOutGroup());
187         input.setIdleTimeout(dataObject.getIdleTimeout());
188         input.setHardTimeout(dataObject.getHardTimeout());
189         input.setFlowName(dataObject.getFlowName());
190         input.setFlags(dataObject.getFlags());
191         input.setCookieMask(dataObject.getCookieMask());
192         input.setContainerName(dataObject.getContainerName());
193         input.setBarrier(dataObject.isBarrier());
194         input.setInstallHw(dataObject.isInstallHw());
195         input.setStrict(dataObject.isStrict());
196
197         // updating the staticflow cache
198         /*
199          * Commented out... as in many other places... use of ClusteringServices
200          * is breaking things insufficient time to debug Integer ordinal =
201          * staticFlowsOrdinal.get(0); staticFlowsOrdinal.put(0, ++ordinal);
202          * staticFlows.put(ordinal, dataObject);
203          */
204
205         // We send flow to the sounthbound plugin
206
207         flowService.addFlow(input.build());
208
209         /*
210          * Commented out as this will also break due to improper use of
211          * ClusteringServices updateLocalDatabase((NodeFlow) dataObject, true);
212          */
213     }
214
215     /**
216      * Removes flow to the southbound plugin and our internal database
217      *
218      * @param path
219      * @param dataObject
220      */
221     private void removeFlow(InstanceIdentifier<?> path, Flow dataObject) {
222
223         RemoveFlowInputBuilder input = new RemoveFlowInputBuilder();
224         input.setNode((dataObject).getNode());
225         input.setPriority((dataObject).getPriority());
226         input.setMatch((dataObject).getMatch());
227         input.setCookie((dataObject).getCookie());
228         input.setInstructions((dataObject).getInstructions());
229         input.setBufferId(dataObject.getBufferId());
230         input.setTableId(dataObject.getTableId());
231         input.setOutPort(dataObject.getOutPort());
232         input.setOutGroup(dataObject.getOutGroup());
233         input.setIdleTimeout(dataObject.getIdleTimeout());
234         input.setHardTimeout(dataObject.getHardTimeout());
235         input.setFlowName(dataObject.getFlowName());
236         input.setFlags(dataObject.getFlags());
237         input.setCookieMask(dataObject.getCookieMask());
238         input.setContainerName(dataObject.getContainerName());
239         input.setBarrier(dataObject.isBarrier());
240         input.setInstallHw(dataObject.isInstallHw());
241         input.setStrict(dataObject.isStrict());
242         // updating the staticflow cache
243         /*
244          * Commented out due to problems caused by improper use of
245          * ClusteringServices Integer ordinal = staticFlowsOrdinal.get(0);
246          * staticFlowsOrdinal.put(0, ++ordinal); staticFlows.put(ordinal,
247          * dataObject);
248          */
249
250         // We send flow to the sounthbound plugin
251         flowService.removeFlow(input.build());
252
253         /*
254          * Commented out due to problems caused by improper use of
255          * ClusteringServices updateLocalDatabase((NodeFlow) dataObject, false);
256          */
257     }
258
259     /**
260      * Update flow to the southbound plugin and our internal database
261      *
262      * @param path
263      * @param dataObject
264      */
265     private void updateFlow(InstanceIdentifier<?> path, Flow dataObject) {
266
267         UpdateFlowInputBuilder input = new UpdateFlowInputBuilder();
268         UpdatedFlowBuilder updatedflowbuilder = new UpdatedFlowBuilder();
269         updatedflowbuilder.fieldsFrom(dataObject);
270         input.setNode(dataObject.getNode());
271         input.setUpdatedFlow(updatedflowbuilder.build());
272
273         // updating the staticflow cache
274         /*
275          * Commented out due to problems caused by improper use of
276          * ClusteringServices. Integer ordinal = staticFlowsOrdinal.get(0);
277          * staticFlowsOrdinal.put(0, ++ordinal); staticFlows.put(ordinal,
278          * dataObject);
279          */
280
281         // We send flow to the sounthbound plugin
282         flowService.updateFlow(input.build());
283
284         /*
285          * Commented out due to problems caused by improper use of
286          * ClusteringServices. updateLocalDatabase((NodeFlow) dataObject, true);
287          */
288     }
289
290     @SuppressWarnings("unchecked")
291     private void commitToPlugin(internalTransaction transaction) {
292         Set<Entry<InstanceIdentifier<?>, DataObject>> createdEntries = transaction.getModification()
293                 .getCreatedConfigurationData().entrySet();
294
295         /*
296          * This little dance is because updatedEntries contains both created and
297          * modified entries The reason I created a new HashSet is because the
298          * collections we are returned are immutable.
299          */
300         Set<Entry<InstanceIdentifier<?>, DataObject>> updatedEntries = new HashSet<Entry<InstanceIdentifier<?>, DataObject>>();
301         updatedEntries.addAll(transaction.getModification().getUpdatedConfigurationData().entrySet());
302         updatedEntries.removeAll(createdEntries);
303
304         Set<InstanceIdentifier<?>> removeEntriesInstanceIdentifiers = transaction.getModification()
305                 .getRemovedConfigurationData();
306         transaction.getModification().getOriginalConfigurationData();
307         for (Entry<InstanceIdentifier<?>, DataObject> entry : createdEntries) {
308             if (entry.getValue() instanceof Flow) {
309                 logger.debug("Coming add cc in FlowDatacommitHandler");
310                 Flow flow = (Flow) entry.getValue();
311                 boolean status = validate(flow);
312                 if (!status) {
313                     return;
314                 }
315                 addFlow(entry.getKey(), (Flow) entry.getValue());
316             }
317         }
318         for (@SuppressWarnings("unused")
319         Entry<InstanceIdentifier<?>, DataObject> entry : updatedEntries) {
320             if (entry.getValue() instanceof Flow) {
321                 logger.debug("Coming update cc in FlowDatacommitHandler");
322                 Flow flow = (Flow) entry.getValue();
323                 boolean status = validate(flow);
324                 if (!status) {
325                     return;
326                 }
327                 updateFlow(entry.getKey(), (Flow) entry.getValue());
328             }
329         }
330
331         for (InstanceIdentifier<?> instanceId : removeEntriesInstanceIdentifiers) {
332             DataObject removeValue = transaction.getModification().getOriginalConfigurationData().get(instanceId);
333             if (removeValue instanceof Flow) {
334                 logger.debug("Coming remove cc in FlowDatacommitHandler");
335                 Flow flow = (Flow) removeValue;
336                 boolean status = validate(flow);
337                 if (!status) {
338                     return;
339                 }
340                 removeFlow(instanceId, (Flow) removeValue);
341
342             }
343         }
344
345     }
346
347     private final class FlowDataCommitHandler implements DataCommitHandler<InstanceIdentifier<?>, DataObject> {
348
349         @SuppressWarnings("unchecked")
350         @Override
351         public DataCommitTransaction requestCommit(DataModification<InstanceIdentifier<?>, DataObject> modification) {
352             // We should verify transaction
353             logger.debug("Coming in FlowDatacommitHandler");
354             internalTransaction transaction = new internalTransaction(modification);
355             transaction.prepareUpdate();
356             return transaction;
357         }
358     }
359
360     private final class internalTransaction implements DataCommitTransaction<InstanceIdentifier<?>, DataObject> {
361
362         private final DataModification<InstanceIdentifier<?>, DataObject> modification;
363
364         @Override
365         public DataModification<InstanceIdentifier<?>, DataObject> getModification() {
366             return modification;
367         }
368
369         public internalTransaction(DataModification<InstanceIdentifier<?>, DataObject> modification) {
370             this.modification = modification;
371         }
372
373         Map<InstanceIdentifier<?>, Flow> additions = new HashMap<>();
374         Map<InstanceIdentifier<?>, Flow> updates = new HashMap<>();
375         Map<InstanceIdentifier<?>, Flow> removals = new HashMap<>();
376
377         /**
378          * We create a plan which flows will be added, which will be updated and
379          * which will be removed based on our internal state.
380          *
381          */
382         void prepareUpdate() {
383
384             Set<Entry<InstanceIdentifier<?>, DataObject>> puts = modification.getUpdatedConfigurationData().entrySet();
385             for (Entry<InstanceIdentifier<?>, DataObject> entry : puts) {
386             }
387
388             // removals = modification.getRemovedConfigurationData();
389             Set<InstanceIdentifier<?>> removedData = modification.getRemovedConfigurationData();
390             for (InstanceIdentifier<?> removal : removedData) {
391                 DataObject value = modification.getOriginalConfigurationData().get(removal);
392                 if (value instanceof Flow) {
393                     removals.put(removal, (Flow) value);
394                 }
395             }
396
397         }
398
399         private void preparePutEntry(InstanceIdentifier<?> key, Flow flow) {
400             Flow original = originalSwView.get(key);
401             if (original != null) {
402                 // It is update for us
403                 updates.put(key, flow);
404             } else {
405                 // It is addition for us
406                 additions.put(key, flow);
407             }
408         }
409
410         /**
411          * We are OK to go with execution of plan
412          *
413          */
414         @Override
415         public RpcResult<Void> finish() throws IllegalStateException {
416
417             commitToPlugin(this);
418             // We return true if internal transaction is successful.
419             // return Rpcs.getRpcResult(true, null, Collections.emptySet());
420             return Rpcs.getRpcResult(true, null, Collections.<RpcError> emptySet());
421         }
422
423         /**
424          *
425          * We should rollback our preparation
426          *
427          */
428         @Override
429         public RpcResult<Void> rollback() throws IllegalStateException {
430             // NOOP - we did not modified any internal state during
431             // requestCommit phase
432             // return Rpcs.getRpcResult(true, null, Collections.emptySet());
433             return Rpcs.getRpcResult(true, null, Collections.<RpcError> emptySet());
434
435         }
436
437         private boolean flowEntryExists(Flow flow) {
438             // Flow name has to be unique on per table id basis
439             for (ConcurrentMap.Entry<FlowKey, Flow> entry : originalSwView.entrySet()) {
440                 if (entry.getValue().getFlowName().equals(flow.getFlowName())
441                         && entry.getValue().getTableId().equals(flow.getTableId())) {
442                     return true;
443                 }
444             }
445             return false;
446         }
447     }
448
449     // Commented out DataChangeListene - to be used by Stats
450
451     // final class FlowDataListener implements DataChangeListener {
452     // private SalFlowService flowService;
453     //
454     // public FlowDataListener() {
455     //
456     // }
457     //
458     // @Override
459     // public void onDataChanged(
460     // DataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
461     // System.out.println("Coming in onDataChange..............");
462     // @SuppressWarnings("unchecked")
463     // Collection<DataObject> additions = (Collection<DataObject>)
464     // change.getCreatedConfigurationData();
465     // // we can check for getCreated, getDeleted or getUpdated from DataChange
466     // Event class
467     // for (DataObject dataObject : additions) {
468     // if (dataObject instanceof NodeFlow) {
469     // NodeRef nodeOne = createNodeRef("foo:node:1");
470     // // validating the dataObject here
471     // AddFlowInputBuilder input = new AddFlowInputBuilder();
472     // input.setNode(((NodeFlow) dataObject).getNode());
473     // input.setNode(nodeOne);
474     // // input.setPriority(((NodeFlow) dataObject).getPriority());
475     // //input.setMatch(((NodeFlow) dataObject).getMatch());
476     // //input.setFlowTable(((NodeFlow) dataObject).getFlowTable());
477     // //input.setCookie(((NodeFlow) dataObject).getCookie());
478     // //input.setAction(((NodeFlow) dataObject).getAction());
479     //
480     // @SuppressWarnings("unused")
481     // Future<RpcResult<java.lang.Void>> result =
482     // flowService.addFlow(input.build());
483     // }
484     // }
485     // }
486     // }
487
488     public boolean validate(Flow flow) {
489
490         String msg = ""; // Specific part of warn/error log
491
492         boolean result = true;
493         // flow Name validation
494         if (flow.getFlowName() == null || flow.getFlowName().trim().isEmpty() || !flow.getFlowName().matches(NAMEREGEX)) {
495             msg = "Invalid Flow name";
496             result = false;
497         }
498         // Node Validation
499         if (result == true && flow.getNode() == null) {
500             msg = "Node is null";
501             result = false;
502         }
503
504         // TODO: Validate we are seeking to program a flow against a valid
505         // Node
506
507         if (result == true && flow.getPriority() != null) {
508             if (flow.getPriority() < 0 || flow.getPriority() > 65535) {
509                 msg = String.format("priority %s is not in the range 0 - 65535", flow.getPriority());
510                 result = false;
511             }
512         }
513
514         // Presence check
515         /*
516          * This is breaking due to some improper use of caches...
517          *
518          * if (flowEntryExists(flow)) { String error =
519          * "Entry with this name on specified table already exists";
520          * logger.warn(
521          * "Entry with this name on specified table already exists: {}" ,
522          * entry); logger.error(error); return; } if
523          * (originalSwView.containsKey(entry)) { logger.warn(
524          * "Operation Rejected: A flow with same match and priority exists on the target node"
525          * ); logger.trace("Aborting to install {}", entry); continue; }
526          */
527         if (!FRMUtil.validateMatch(flow)) {
528             logger.error("Not a valid Match");
529             result = false;
530         }
531         if (!FRMUtil.validateInstructions(flow)) {
532             logger.error("Not a valid Instruction");
533             result = false;
534         }
535         if (result == false) {
536             logger.warn("Invalid Configuration for flow {}. The failure is {}", flow, msg);
537             logger.error("Invalid Configuration ({})", msg);
538         }
539         return result;
540     }
541
542     private static void updateLocalDatabase(NodeFlow entry, boolean add) {
543
544         updateSwViewes(entry, add);
545
546         updateNodeFlowsDB(entry, add);
547
548     }
549
550     /*
551      * Update the node mapped flows database
552      */
553     private static void updateSwViewes(NodeFlow entry, boolean add) {
554         if (add) {
555             FlowConsumerImpl.originalSwView.put((FlowKey) entry, (Flow) entry);
556             installedSwView.put((FlowKey) entry, (Flow) entry);
557         } else {
558             originalSwView.remove(entry);
559             installedSwView.remove(entry);
560
561         }
562     }
563
564     @Override
565     public List<DataObject> get() {
566
567         List<DataObject> orderedList = new ArrayList<DataObject>();
568         ConcurrentMap<Integer, Flow> flowMap = staticFlows;
569         int maxKey = staticFlowsOrdinal.get(0).intValue();
570         for (int i = 0; i <= maxKey; i++) {
571             Flow entry = flowMap.get(i);
572             if (entry != null) {
573                 orderedList.add(entry);
574             }
575         }
576         return orderedList;
577     }
578
579     @Override
580     public DataObject getWithName(String name, org.opendaylight.controller.sal.core.Node n) {
581         if (this instanceof FlowConsumerImpl) {
582             for (ConcurrentMap.Entry<Integer, Flow> flowEntry : staticFlows.entrySet()) {
583                 Flow flow = flowEntry.getValue();
584                 if (flow.getNode().equals(n) && flow.getFlowName().equals(name)) {
585
586                     return flowEntry.getValue();
587                 }
588             }
589         }
590         return null;
591     }
592
593     /*
594      * Update the node mapped flows database
595      */
596     private static void updateNodeFlowsDB(NodeFlow entry, boolean add) {
597         Node node = (Node) entry.getNode();
598
599         List<Flow> nodeIndeces = nodeFlows.get(node);
600         if (nodeIndeces == null) {
601             if (!add) {
602                 return;
603             } else {
604                 nodeIndeces = new ArrayList<Flow>();
605             }
606         }
607
608         if (add) {
609             nodeIndeces.add((Flow) entry);
610         } else {
611             nodeIndeces.remove(entry);
612         }
613
614         // Update cache across cluster
615         if (nodeIndeces.isEmpty()) {
616             nodeFlows.remove(node);
617         } else {
618             nodeFlows.put(node, nodeIndeces);
619         }
620     }
621
622     private static NodeRef createNodeRef(String string) {
623         NodeKey key = new NodeKey(new NodeId(string));
624         InstanceIdentifier<Node> path = InstanceIdentifier.builder().node(Nodes.class).node(Node.class, key)
625                 .toInstance();
626
627         return new NodeRef(path);
628     }
629 }