Merge "Added transactions statistics for BI Broker."
[controller.git] / opendaylight / md-sal / forwardingrules-manager / src / main / java / org / opendaylight / controller / forwardingrulesmanager / consumer / impl / FlowConsumerImpl.java
1 package org.opendaylight.controller.forwardingrulesmanager.consumer.impl;
2
3 import java.util.ArrayList;
4 import java.util.Collections;
5 import java.util.EnumSet;
6 import java.util.HashMap;
7 import java.util.HashSet;
8 import java.util.List;
9 import java.util.Map;
10 import java.util.Map.Entry;
11 import java.util.Set;
12 import java.util.concurrent.ConcurrentHashMap;
13 import java.util.concurrent.ConcurrentMap;
14 import java.util.concurrent.Future;
15
16 import org.opendaylight.controller.clustering.services.CacheConfigException;
17 import org.opendaylight.controller.clustering.services.CacheExistException;
18 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
19 import org.opendaylight.controller.clustering.services.IClusterServices;
20 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
21 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction;
22 import org.opendaylight.controller.md.sal.common.api.data.DataModification;
23 import org.opendaylight.controller.sal.common.util.Rpcs;
24 import org.opendaylight.controller.sal.core.IContainer;
25 import org.opendaylight.controller.sal.utils.ServiceHelper;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.config.rev130819.Flows;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.config.rev130819.flows.Flow;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.config.rev130819.flows.FlowKey;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowAdded;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowUpdated;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeErrorNotification;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeExperimenterErrorNotification;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeFlow;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInputBuilder;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowListener;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SwitchFlowRemoved;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowInputBuilder;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.flow.update.UpdatedFlowBuilder;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
49 import org.opendaylight.yangtools.concepts.Registration;
50 import org.opendaylight.yangtools.yang.binding.DataObject;
51 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
52 import org.opendaylight.yangtools.yang.binding.NotificationListener;
53 import org.opendaylight.yangtools.yang.common.RpcError;
54 import org.opendaylight.yangtools.yang.common.RpcResult;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
57
58 public class FlowConsumerImpl implements IForwardingRulesManager {
59     protected static final Logger logger = LoggerFactory.getLogger(FlowConsumerImpl.class);
60     private final FlowEventListener flowEventListener = new FlowEventListener();
61     private Registration<NotificationListener> listener1Reg;
62     private SalFlowService flowService;
63     // private FlowDataListener listener;
64     private FlowDataCommitHandler commitHandler;
65     private static ConcurrentHashMap<FlowKey, Flow> originalSwView;
66     private static ConcurrentMap<FlowKey, Flow> installedSwView;
67     private IClusterContainerServices clusterContainerService = null;
68     private IContainer container;
69     private static final String NAMEREGEX = "^[a-zA-Z0-9]+$";
70     private static ConcurrentMap<Integer, Flow> staticFlows;
71     private static ConcurrentMap<Integer, Integer> staticFlowsOrdinal = new ConcurrentHashMap<Integer, Integer>();
72     /*
73      * Inactive flow list. This is for the global instance of FRM It will
74      * contain all the flow entries which were installed on the global container
75      * when the first container is created.
76      */
77     private static ConcurrentMap<FlowKey, Flow> inactiveFlows;
78
79     /*
80      * /* Per node indexing
81      */
82     private static ConcurrentMap<Node, List<Flow>> nodeFlows;
83     private boolean inContainerMode; // being used by global instance only
84
85     public FlowConsumerImpl() {
86         InstanceIdentifier<? extends DataObject> path = InstanceIdentifier.builder(Flows.class).toInstance();
87         flowService = FRMConsumerImpl.getProviderSession().getRpcService(SalFlowService.class);
88
89         if (null == flowService) {
90             logger.error("Consumer SAL Service is down or NULL. FRM may not function as intended");
91             return;
92         }
93
94         // listener = new FlowDataListener();
95
96         // if (null ==
97         // FRMConsumerImpl.getDataBrokerService().registerDataChangeListener(path,
98         // listener)) {
99         // logger.error("Failed to listen on flow data modifcation events");
100         // System.out.println("Consumer SAL Service is down or NULL.");
101         // return;
102         // }
103
104         // For switch events
105         listener1Reg = FRMConsumerImpl.getNotificationService().registerNotificationListener(flowEventListener);
106
107         if (null == listener1Reg) {
108             logger.error("Listener to listen on flow data modifcation events");
109             return;
110         }
111         // addFlowTest();
112         commitHandler = new FlowDataCommitHandler();
113         FRMConsumerImpl.getDataProviderService().registerCommitHandler(path, commitHandler);
114         clusterContainerService = (IClusterContainerServices) ServiceHelper.getGlobalInstance(
115                 IClusterContainerServices.class, this);
116         allocateCaches();
117         /*
118          * If we are not the first cluster node to come up, do not initialize
119          * the static flow entries ordinal
120          */
121         if (staticFlowsOrdinal.size() == 0) {
122             staticFlowsOrdinal.put(0, Integer.valueOf(0));
123         }
124     }
125
126     private void allocateCaches() {
127
128         if (this.clusterContainerService == null) {
129             logger.warn("Un-initialized clusterContainerService, can't create cache");
130             return;
131         }
132
133         try {
134             clusterContainerService.createCache("frm.originalSwView",
135                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
136             clusterContainerService.createCache("frm.installedSwView",
137                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
138             clusterContainerService
139                     .createCache("frm.staticFlows", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
140             clusterContainerService.createCache("frm.staticFlowsOrdinal",
141                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
142             clusterContainerService.createCache("frm.inactiveFlows",
143                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
144             clusterContainerService.createCache("frm.nodeFlows", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
145             clusterContainerService.createCache("frm.groupFlows", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
146         } catch (CacheConfigException cce) {
147             logger.error("CacheConfigException");
148         } catch (CacheExistException cce) {
149             logger.error("CacheExistException");
150         }
151     }
152
153     private void addFlowTest() {
154         try {
155             NodeRef nodeOne = createNodeRef("foo:node:1");
156             AddFlowInputBuilder input1 = new AddFlowInputBuilder();
157
158             input1.setNode(nodeOne);
159             AddFlowInput firstMsg = input1.build();
160
161             if (null == flowService) {
162                 logger.error("ConsumerFlowService is NULL");
163             }
164             @SuppressWarnings("unused")
165             Future<RpcResult<AddFlowOutput>> result1 = flowService.addFlow(firstMsg);
166
167         } catch (Exception e) {
168             // TODO Auto-generated catch block
169             e.printStackTrace();
170         }
171     }
172
173     /**
174      * Adds flow to the southbound plugin and our internal database
175      *
176      * @param path
177      * @param dataObject
178      */
179     private void addFlow(InstanceIdentifier<?> path, Flow dataObject) {
180
181         AddFlowInputBuilder input = new AddFlowInputBuilder();
182
183         input.setNode((dataObject).getNode());
184         input.setPriority((dataObject).getPriority());
185         input.setMatch((dataObject).getMatch());
186         input.setCookie((dataObject).getCookie());
187         input.setInstructions((dataObject).getInstructions());
188         input.setBufferId(dataObject.getBufferId());
189         input.setTableId(dataObject.getTableId());
190         input.setOutPort(dataObject.getOutPort());
191         input.setOutGroup(dataObject.getOutGroup());
192         input.setIdleTimeout(dataObject.getIdleTimeout());
193         input.setHardTimeout(dataObject.getHardTimeout());
194         input.setFlowName(dataObject.getFlowName());
195         input.setFlags(dataObject.getFlags());
196         input.setCookieMask(dataObject.getCookieMask());
197         input.setContainerName(dataObject.getContainerName());
198         input.setBarrier(dataObject.isBarrier());
199         input.setInstallHw(dataObject.isInstallHw());
200         input.setStrict(dataObject.isStrict());
201
202         // updating the staticflow cache
203         /*
204          * Commented out... as in many other places... use of ClusteringServices
205          * is breaking things insufficient time to debug Integer ordinal =
206          * staticFlowsOrdinal.get(0); staticFlowsOrdinal.put(0, ++ordinal);
207          * staticFlows.put(ordinal, dataObject);
208          */
209
210         // We send flow to the sounthbound plugin
211
212         flowService.addFlow(input.build());
213
214         /*
215          * Commented out as this will also break due to improper use of
216          * ClusteringServices updateLocalDatabase((NodeFlow) dataObject, true);
217          */
218     }
219
220     /**
221      * Removes flow to the southbound plugin and our internal database
222      *
223      * @param path
224      * @param dataObject
225      */
226     private void removeFlow(InstanceIdentifier<?> path, Flow dataObject) {
227
228         RemoveFlowInputBuilder input = new RemoveFlowInputBuilder();
229         input.setNode((dataObject).getNode());
230         input.setPriority((dataObject).getPriority());
231         input.setMatch((dataObject).getMatch());
232         input.setCookie((dataObject).getCookie());
233         input.setInstructions((dataObject).getInstructions());
234         input.setBufferId(dataObject.getBufferId());
235         input.setTableId(dataObject.getTableId());
236         input.setOutPort(dataObject.getOutPort());
237         input.setOutGroup(dataObject.getOutGroup());
238         input.setIdleTimeout(dataObject.getIdleTimeout());
239         input.setHardTimeout(dataObject.getHardTimeout());
240         input.setFlowName(dataObject.getFlowName());
241         input.setFlags(dataObject.getFlags());
242         input.setCookieMask(dataObject.getCookieMask());
243         input.setContainerName(dataObject.getContainerName());
244         input.setBarrier(dataObject.isBarrier());
245         input.setInstallHw(dataObject.isInstallHw());
246         input.setStrict(dataObject.isStrict());
247         // updating the staticflow cache
248         /*
249          * Commented out due to problems caused by improper use of
250          * ClusteringServices Integer ordinal = staticFlowsOrdinal.get(0);
251          * staticFlowsOrdinal.put(0, ++ordinal); staticFlows.put(ordinal,
252          * dataObject);
253          */
254
255         // We send flow to the sounthbound plugin
256         flowService.removeFlow(input.build());
257
258         /*
259          * Commented out due to problems caused by improper use of
260          * ClusteringServices updateLocalDatabase((NodeFlow) dataObject, false);
261          */
262     }
263
264     /**
265      * Update flow to the southbound plugin and our internal database
266      *
267      * @param path
268      * @param dataObject
269      */
270     private void updateFlow(InstanceIdentifier<?> path, Flow dataObject) {
271
272         UpdateFlowInputBuilder input = new UpdateFlowInputBuilder();
273         UpdatedFlowBuilder updatedflowbuilder = new UpdatedFlowBuilder();
274         updatedflowbuilder.fieldsFrom(dataObject);
275         input.setNode(dataObject.getNode());
276         input.setUpdatedFlow(updatedflowbuilder.build());
277
278         // updating the staticflow cache
279         /*
280          * Commented out due to problems caused by improper use of
281          * ClusteringServices. Integer ordinal = staticFlowsOrdinal.get(0);
282          * staticFlowsOrdinal.put(0, ++ordinal); staticFlows.put(ordinal,
283          * dataObject);
284          */
285
286         // We send flow to the sounthbound plugin
287         flowService.updateFlow(input.build());
288
289         /*
290          * Commented out due to problems caused by improper use of
291          * ClusteringServices. updateLocalDatabase((NodeFlow) dataObject, true);
292          */
293     }
294
295     @SuppressWarnings("unchecked")
296     private void commitToPlugin(internalTransaction transaction) {
297         Set<Entry<InstanceIdentifier<?>, DataObject>> createdEntries = transaction.getModification()
298                 .getCreatedConfigurationData().entrySet();
299
300         /*
301          * This little dance is because updatedEntries contains both created and
302          * modified entries The reason I created a new HashSet is because the
303          * collections we are returned are immutable.
304          */
305         Set<Entry<InstanceIdentifier<?>, DataObject>> updatedEntries = new HashSet<Entry<InstanceIdentifier<?>, DataObject>>();
306         updatedEntries.addAll(transaction.getModification().getUpdatedConfigurationData().entrySet());
307         updatedEntries.removeAll(createdEntries);
308
309         Set<InstanceIdentifier<?>> removeEntriesInstanceIdentifiers = transaction.getModification()
310                 .getRemovedConfigurationData();
311         transaction.getModification().getOriginalConfigurationData();
312         for (Entry<InstanceIdentifier<?>, DataObject> entry : createdEntries) {
313             if (entry.getValue() instanceof Flow) {
314                 logger.debug("Coming add cc in FlowDatacommitHandler");
315                 Flow flow = (Flow) entry.getValue();
316                 boolean status = validate(flow);
317                 if (!status) {
318                     return;
319                 }
320                 addFlow(entry.getKey(), (Flow) entry.getValue());
321             }
322         }
323         for (@SuppressWarnings("unused")
324         Entry<InstanceIdentifier<?>, DataObject> entry : updatedEntries) {
325             if (entry.getValue() instanceof Flow) {
326                 logger.debug("Coming update cc in FlowDatacommitHandler");
327                 Flow flow = (Flow) entry.getValue();
328                 boolean status = validate(flow);
329                 if (!status) {
330                     return;
331                 }
332                 updateFlow(entry.getKey(), (Flow) entry.getValue());
333             }
334         }
335
336         for (InstanceIdentifier<?> instanceId : removeEntriesInstanceIdentifiers) {
337             DataObject removeValue = transaction.getModification().getOriginalConfigurationData().get(instanceId);
338             if (removeValue instanceof Flow) {
339                 logger.debug("Coming remove cc in FlowDatacommitHandler");
340                 Flow flow = (Flow) removeValue;
341                 boolean status = validate(flow);
342                 if (!status) {
343                     return;
344                 }
345                 removeFlow(instanceId, (Flow) removeValue);
346
347             }
348         }
349
350     }
351
352     private final class FlowDataCommitHandler implements DataCommitHandler<InstanceIdentifier<?>, DataObject> {
353
354         @SuppressWarnings("unchecked")
355         @Override
356         public DataCommitTransaction requestCommit(DataModification<InstanceIdentifier<?>, DataObject> modification) {
357             // We should verify transaction
358             logger.debug("Coming in FlowDatacommitHandler");
359             internalTransaction transaction = new internalTransaction(modification);
360             transaction.prepareUpdate();
361             return transaction;
362         }
363     }
364
365     private final class internalTransaction implements DataCommitTransaction<InstanceIdentifier<?>, DataObject> {
366
367         private final DataModification<InstanceIdentifier<?>, DataObject> modification;
368
369         @Override
370         public DataModification<InstanceIdentifier<?>, DataObject> getModification() {
371             return modification;
372         }
373
374         public internalTransaction(DataModification<InstanceIdentifier<?>, DataObject> modification) {
375             this.modification = modification;
376         }
377
378         Map<InstanceIdentifier<?>, Flow> additions = new HashMap<>();
379         Map<InstanceIdentifier<?>, Flow> updates = new HashMap<>();
380         Map<InstanceIdentifier<?>, Flow> removals = new HashMap<>();
381
382         /**
383          * We create a plan which flows will be added, which will be updated and
384          * which will be removed based on our internal state.
385          *
386          */
387         void prepareUpdate() {
388
389             Set<Entry<InstanceIdentifier<?>, DataObject>> puts = modification.getUpdatedConfigurationData().entrySet();
390             for (Entry<InstanceIdentifier<?>, DataObject> entry : puts) {
391             }
392
393             // removals = modification.getRemovedConfigurationData();
394             Set<InstanceIdentifier<?>> removedData = modification.getRemovedConfigurationData();
395             for (InstanceIdentifier<?> removal : removedData) {
396                 DataObject value = modification.getOriginalConfigurationData().get(removal);
397                 if (value instanceof Flow) {
398                     removals.put(removal, (Flow) value);
399                 }
400             }
401
402         }
403
404         private void preparePutEntry(InstanceIdentifier<?> key, Flow flow) {
405             Flow original = originalSwView.get(key);
406             if (original != null) {
407                 // It is update for us
408                 updates.put(key, flow);
409             } else {
410                 // It is addition for us
411                 additions.put(key, flow);
412             }
413         }
414
415         /**
416          * We are OK to go with execution of plan
417          *
418          */
419         @Override
420         public RpcResult<Void> finish() throws IllegalStateException {
421
422             commitToPlugin(this);
423             // We return true if internal transaction is successful.
424             // return Rpcs.getRpcResult(true, null, Collections.emptySet());
425             return Rpcs.getRpcResult(true, null, Collections.<RpcError> emptySet());
426         }
427
428         /**
429          *
430          * We should rollback our preparation
431          *
432          */
433         @Override
434         public RpcResult<Void> rollback() throws IllegalStateException {
435             // NOOP - we did not modified any internal state during
436             // requestCommit phase
437             // return Rpcs.getRpcResult(true, null, Collections.emptySet());
438             return Rpcs.getRpcResult(true, null, Collections.<RpcError> emptySet());
439
440         }
441
442         private boolean flowEntryExists(Flow flow) {
443             // Flow name has to be unique on per table id basis
444             for (ConcurrentMap.Entry<FlowKey, Flow> entry : originalSwView.entrySet()) {
445                 if (entry.getValue().getFlowName().equals(flow.getFlowName())
446                         && entry.getValue().getTableId().equals(flow.getTableId())) {
447                     return true;
448                 }
449             }
450             return false;
451         }
452     }
453
454     final class FlowEventListener implements SalFlowListener {
455
456         List<FlowAdded> addedFlows = new ArrayList<>();
457         List<FlowRemoved> removedFlows = new ArrayList<>();
458         List<FlowUpdated> updatedFlows = new ArrayList<>();
459
460         @Override
461         public void onFlowAdded(FlowAdded notification) {
462             addedFlows.add(notification);
463         }
464
465         @Override
466         public void onFlowRemoved(FlowRemoved notification) {
467             removedFlows.add(notification);
468         }
469
470         @Override
471         public void onFlowUpdated(FlowUpdated notification) {
472             updatedFlows.add(notification);
473         }
474
475         @Override
476         public void onNodeErrorNotification(NodeErrorNotification notification) {
477             // TODO Auto-generated method stub
478
479         }
480
481         @Override
482         public void onNodeExperimenterErrorNotification(NodeExperimenterErrorNotification notification) {
483             // TODO Auto-generated method stub
484
485         }
486
487         @Override
488         public void onSwitchFlowRemoved(SwitchFlowRemoved notification) {
489             // TODO Auto-generated method stub
490
491         }
492
493     }
494
495     // Commented out DataChangeListene - to be used by Stats
496
497     // final class FlowDataListener implements DataChangeListener {
498     // private SalFlowService flowService;
499     //
500     // public FlowDataListener() {
501     //
502     // }
503     //
504     // @Override
505     // public void onDataChanged(
506     // DataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
507     // System.out.println("Coming in onDataChange..............");
508     // @SuppressWarnings("unchecked")
509     // Collection<DataObject> additions = (Collection<DataObject>)
510     // change.getCreatedConfigurationData();
511     // // we can check for getCreated, getDeleted or getUpdated from DataChange
512     // Event class
513     // for (DataObject dataObject : additions) {
514     // if (dataObject instanceof NodeFlow) {
515     // NodeRef nodeOne = createNodeRef("foo:node:1");
516     // // validating the dataObject here
517     // AddFlowInputBuilder input = new AddFlowInputBuilder();
518     // input.setNode(((NodeFlow) dataObject).getNode());
519     // input.setNode(nodeOne);
520     // // input.setPriority(((NodeFlow) dataObject).getPriority());
521     // //input.setMatch(((NodeFlow) dataObject).getMatch());
522     // //input.setFlowTable(((NodeFlow) dataObject).getFlowTable());
523     // //input.setCookie(((NodeFlow) dataObject).getCookie());
524     // //input.setAction(((NodeFlow) dataObject).getAction());
525     //
526     // @SuppressWarnings("unused")
527     // Future<RpcResult<java.lang.Void>> result =
528     // flowService.addFlow(input.build());
529     // }
530     // }
531     // }
532     // }
533
534     public boolean validate(Flow flow) {
535
536         String msg = ""; // Specific part of warn/error log
537
538         boolean result = true;
539         // flow Name validation
540         if (flow.getFlowName() == null || flow.getFlowName().trim().isEmpty() || !flow.getFlowName().matches(NAMEREGEX)) {
541             msg = "Invalid Flow name";
542             result = false;
543         }
544         // Node Validation
545         if (result == true && flow.getNode() == null) {
546             msg = "Node is null";
547             result = false;
548         }
549
550         // TODO: Validate we are seeking to program a flow against a valid
551         // Node
552
553         if (result == true && flow.getPriority() != null) {
554             if (flow.getPriority() < 0 || flow.getPriority() > 65535) {
555                 msg = String.format("priority %s is not in the range 0 - 65535", flow.getPriority());
556                 result = false;
557             }
558         }
559
560         // Presence check
561         /*
562          * This is breaking due to some improper use of caches...
563          *
564          * if (flowEntryExists(flow)) { String error =
565          * "Entry with this name on specified table already exists";
566          * logger.warn(
567          * "Entry with this name on specified table already exists: {}" ,
568          * entry); logger.error(error); return; } if
569          * (originalSwView.containsKey(entry)) { logger.warn(
570          * "Operation Rejected: A flow with same match and priority exists on the target node"
571          * ); logger.trace("Aborting to install {}", entry); continue; }
572          */
573         if (!FRMUtil.validateMatch(flow)) {
574             logger.error("Not a valid Match");
575             result = false;
576         }
577         if (!FRMUtil.validateInstructions(flow)) {
578             logger.error("Not a valid Instruction");
579             result = false;
580         }
581         if (result == false) {
582             logger.warn("Invalid Configuration for flow {}. The failure is {}", flow, msg);
583             logger.error("Invalid Configuration ({})", msg);
584         }
585         return result;
586     }
587
588     private static void updateLocalDatabase(NodeFlow entry, boolean add) {
589
590         updateSwViewes(entry, add);
591
592         updateNodeFlowsDB(entry, add);
593
594     }
595
596     /*
597      * Update the node mapped flows database
598      */
599     private static void updateSwViewes(NodeFlow entry, boolean add) {
600         if (add) {
601             FlowConsumerImpl.originalSwView.put((FlowKey) entry, (Flow) entry);
602             installedSwView.put((FlowKey) entry, (Flow) entry);
603         } else {
604             originalSwView.remove(entry);
605             installedSwView.remove(entry);
606
607         }
608     }
609
610     @Override
611     public List<DataObject> get() {
612
613         List<DataObject> orderedList = new ArrayList<DataObject>();
614         ConcurrentMap<Integer, Flow> flowMap = staticFlows;
615         int maxKey = staticFlowsOrdinal.get(0).intValue();
616         for (int i = 0; i <= maxKey; i++) {
617             Flow entry = flowMap.get(i);
618             if (entry != null) {
619                 orderedList.add(entry);
620             }
621         }
622         return orderedList;
623     }
624
625     @Override
626     public DataObject getWithName(String name, org.opendaylight.controller.sal.core.Node n) {
627         if (this instanceof FlowConsumerImpl) {
628             for (ConcurrentMap.Entry<Integer, Flow> flowEntry : staticFlows.entrySet()) {
629                 Flow flow = flowEntry.getValue();
630                 if (flow.getNode().equals(n) && flow.getFlowName().equals(name)) {
631
632                     return flowEntry.getValue();
633                 }
634             }
635         }
636         return null;
637     }
638
639     /*
640      * Update the node mapped flows database
641      */
642     private static void updateNodeFlowsDB(NodeFlow entry, boolean add) {
643         Node node = (Node) entry.getNode();
644
645         List<Flow> nodeIndeces = nodeFlows.get(node);
646         if (nodeIndeces == null) {
647             if (!add) {
648                 return;
649             } else {
650                 nodeIndeces = new ArrayList<Flow>();
651             }
652         }
653
654         if (add) {
655             nodeIndeces.add((Flow) entry);
656         } else {
657             nodeIndeces.remove(entry);
658         }
659
660         // Update cache across cluster
661         if (nodeIndeces.isEmpty()) {
662             nodeFlows.remove(node);
663         } else {
664             nodeFlows.put(node, nodeIndeces);
665         }
666     }
667
668     private static NodeRef createNodeRef(String string) {
669         NodeKey key = new NodeKey(new NodeId(string));
670         InstanceIdentifier<Node> path = InstanceIdentifier.builder().node(Nodes.class).node(Node.class, key)
671                 .toInstance();
672
673         return new NodeRef(path);
674     }
675 }