Merge "Fixes - config-registry registered to OSGi, used modules's factory bundle...
[controller.git] / opendaylight / md-sal / forwardingrules-manager / src / main / java / org / opendaylight / controller / forwardingrulesmanager / consumer / impl / FlowConsumerImpl.java
1 package org.opendaylight.controller.forwardingrulesmanager.consumer.impl;
2
3 import java.util.ArrayList;
4 import java.util.Collections;
5 import java.util.EnumSet;
6 import java.util.HashMap;
7 import java.util.HashSet;
8 import java.util.List;
9 import java.util.Map;
10 import java.util.Map.Entry;
11 import java.util.Set;
12 import java.util.concurrent.ConcurrentHashMap;
13 import java.util.concurrent.ConcurrentMap;
14 import java.util.concurrent.Future;
15
16 import org.opendaylight.controller.clustering.services.CacheConfigException;
17 import org.opendaylight.controller.clustering.services.CacheExistException;
18 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
19 import org.opendaylight.controller.clustering.services.IClusterServices;
20 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
21 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction;
22 import org.opendaylight.controller.md.sal.common.api.data.DataModification;
23 import org.opendaylight.controller.sal.common.util.Rpcs;
24 import org.opendaylight.controller.sal.core.IContainer;
25 import org.opendaylight.controller.sal.utils.ServiceHelper;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.config.rev130819.Flows;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.config.rev130819.flows.Flow;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.config.rev130819.flows.FlowKey;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowAdded;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowUpdated;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeErrorNotification;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeExperimenterErrorNotification;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeFlow;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInputBuilder;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowListener;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SwitchFlowRemoved;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowInputBuilder;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.flow.update.UpdatedFlowBuilder;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.instruction.list.Instruction;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
50 import org.opendaylight.yangtools.concepts.Registration;
51 import org.opendaylight.yangtools.yang.binding.DataObject;
52 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
53 import org.opendaylight.yangtools.yang.binding.NotificationListener;
54 import org.opendaylight.yangtools.yang.common.RpcError;
55 import org.opendaylight.yangtools.yang.common.RpcResult;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
58
59 public class FlowConsumerImpl implements IForwardingRulesManager {
60     protected static final Logger logger = LoggerFactory.getLogger(FlowConsumerImpl.class);
61     private final FlowEventListener flowEventListener = new FlowEventListener();
62     private Registration<NotificationListener> listener1Reg;
63     private SalFlowService flowService;
64     // private FlowDataListener listener;
65     private FlowDataCommitHandler commitHandler;
66     private static ConcurrentHashMap<FlowKey, Flow> originalSwView;
67     private static ConcurrentMap<FlowKey, Flow> installedSwView;
68     private IClusterContainerServices clusterContainerService = null;
69     private IContainer container;
70     private static final String NAMEREGEX = "^[a-zA-Z0-9]+$";
71     private static ConcurrentMap<Integer, Flow> staticFlows;
72     private static ConcurrentMap<Integer, Integer> staticFlowsOrdinal = new ConcurrentHashMap<Integer, Integer>();
73     /*
74      * Inactive flow list. This is for the global instance of FRM It will
75      * contain all the flow entries which were installed on the global container
76      * when the first container is created.
77      */
78     private static ConcurrentMap<FlowKey, Flow> inactiveFlows;
79
80     /*
81      * /* Per node indexing
82      */
83     private static ConcurrentMap<Node, List<Flow>> nodeFlows;
84     private boolean inContainerMode; // being used by global instance only
85
86     public FlowConsumerImpl() {
87         InstanceIdentifier<? extends DataObject> path = InstanceIdentifier.builder(Flows.class).toInstance();
88         flowService = FRMConsumerImpl.getProviderSession().getRpcService(SalFlowService.class);
89
90         if (null == flowService) {
91             logger.error("Consumer SAL Service is down or NULL. FRM may not function as intended");
92             System.out.println("Consumer SAL Service is down or NULL.");
93             return;
94         }
95
96         // listener = new FlowDataListener();
97
98         // if (null ==
99         // FRMConsumerImpl.getDataBrokerService().registerDataChangeListener(path,
100         // listener)) {
101         // logger.error("Failed to listen on flow data modifcation events");
102         // System.out.println("Consumer SAL Service is down or NULL.");
103         // return;
104         // }
105
106         // For switch events
107         listener1Reg = FRMConsumerImpl.getNotificationService().registerNotificationListener(flowEventListener);
108
109         if (null == listener1Reg) {
110             logger.error("Listener to listen on flow data modifcation events");
111             System.out.println("Consumer SAL Service is down or NULL.");
112             return;
113         }
114         // addFlowTest();
115         System.out.println("-------------------------------------------------------------------");
116         commitHandler = new FlowDataCommitHandler();
117         FRMConsumerImpl.getDataProviderService().registerCommitHandler(path, commitHandler);
118         clusterContainerService = (IClusterContainerServices) ServiceHelper.getGlobalInstance(
119                 IClusterContainerServices.class, this);
120         allocateCaches();
121         /*
122          * If we are not the first cluster node to come up, do not initialize
123          * the static flow entries ordinal
124          */
125         if (staticFlowsOrdinal.size() == 0) {
126             staticFlowsOrdinal.put(0, Integer.valueOf(0));
127         }
128     }
129
130     private void allocateCaches() {
131
132         if (this.clusterContainerService == null) {
133             logger.warn("Un-initialized clusterContainerService, can't create cache");
134             return;
135         }
136
137         try {
138             clusterContainerService.createCache("frm.originalSwView",
139                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
140             clusterContainerService.createCache("frm.installedSwView",
141                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
142             clusterContainerService
143                     .createCache("frm.staticFlows", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
144             clusterContainerService.createCache("frm.staticFlowsOrdinal",
145                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
146             clusterContainerService.createCache("frm.inactiveFlows",
147                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
148             clusterContainerService.createCache("frm.nodeFlows", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
149             clusterContainerService.createCache("frm.groupFlows", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
150         } catch (CacheConfigException cce) {
151             logger.error("CacheConfigException");
152         } catch (CacheExistException cce) {
153             logger.error("CacheExistException");
154         }
155     }
156
157     private void addFlowTest() {
158         try {
159             NodeRef nodeOne = createNodeRef("foo:node:1");
160             AddFlowInputBuilder input1 = new AddFlowInputBuilder();
161
162             input1.setNode(nodeOne);
163             AddFlowInput firstMsg = input1.build();
164
165             if (null != flowService) {
166                 System.out.println(flowService.toString());
167             } else {
168                 System.out.println("ConsumerFlowService is NULL");
169             }
170             @SuppressWarnings("unused")
171             Future<RpcResult<AddFlowOutput>> result1 = flowService.addFlow(firstMsg);
172
173         } catch (Exception e) {
174             // TODO Auto-generated catch block
175             e.printStackTrace();
176         }
177     }
178
179     /**
180      * Adds flow to the southbound plugin and our internal database
181      *
182      * @param path
183      * @param dataObject
184      */
185     private void addFlow(InstanceIdentifier<?> path, Flow dataObject) {
186
187         AddFlowInputBuilder input = new AddFlowInputBuilder();
188         List<Instruction> inst = (dataObject).getInstructions().getInstruction();
189         input.setNode((dataObject).getNode());
190         input.setPriority((dataObject).getPriority());
191         input.setMatch((dataObject).getMatch());
192         input.setCookie((dataObject).getCookie());
193         input.setInstructions((dataObject).getInstructions());
194         dataObject.getMatch().getLayer3Match();
195         for (int i = 0; i < inst.size(); i++) {
196             System.out.println("i = " + i + inst.get(i).getInstruction().toString());
197             System.out.println("i = " + i + inst.get(i).toString());
198         }
199
200         System.out.println("Instruction list" + (dataObject).getInstructions().getInstruction().toString());
201
202         // updating the staticflow cache
203         /*
204          *  Commented out... as in many other places... use of ClusteringServices is breaking things
205          *  insufficient time to debug
206         Integer ordinal = staticFlowsOrdinal.get(0);
207         staticFlowsOrdinal.put(0, ++ordinal);
208         staticFlows.put(ordinal, dataObject);
209         */
210
211         // We send flow to the sounthbound plugin
212         flowService.addFlow(input.build());
213         /*
214          * Commented out as this will also break due to improper use of ClusteringServices
215         updateLocalDatabase((NodeFlow) dataObject, true);
216         */
217     }
218
219     /**
220      * Removes flow to the southbound plugin and our internal database
221      *
222      * @param path
223      * @param dataObject
224      */
225     private void removeFlow(InstanceIdentifier<?> path, Flow dataObject) {
226
227         RemoveFlowInputBuilder input = new RemoveFlowInputBuilder();
228         List<Instruction> inst = (dataObject).getInstructions().getInstruction();
229         input.setNode((dataObject).getNode());
230         input.setPriority((dataObject).getPriority());
231         input.setMatch((dataObject).getMatch());
232         input.setCookie((dataObject).getCookie());
233         input.setInstructions((dataObject).getInstructions());
234         dataObject.getMatch().getLayer3Match();
235         for (int i = 0; i < inst.size(); i++) {
236             System.out.println("i = " + i + inst.get(i).getInstruction().toString());
237             System.out.println("i = " + i + inst.get(i).toString());
238         }
239
240         System.out.println("Instruction list" + (dataObject).getInstructions().getInstruction().toString());
241
242         // updating the staticflow cache
243         /*
244          * Commented out due to problems caused by improper use of ClusteringServices
245         Integer ordinal = staticFlowsOrdinal.get(0);
246         staticFlowsOrdinal.put(0, ++ordinal);
247         staticFlows.put(ordinal, dataObject);
248         */
249
250         // We send flow to the sounthbound plugin
251         flowService.removeFlow(input.build());
252         /*
253          * Commented out due to problems caused by improper use of ClusteringServices
254         updateLocalDatabase((NodeFlow) dataObject, false);
255         */
256     }
257
258     /**
259      * Update flow to the southbound plugin and our internal database
260      *
261      * @param path
262      * @param dataObject
263      */
264     private void updateFlow(InstanceIdentifier<?> path, Flow dataObject) {
265
266         UpdateFlowInputBuilder input = new UpdateFlowInputBuilder();
267         UpdatedFlowBuilder updatedflowbuilder = new UpdatedFlowBuilder();
268         updatedflowbuilder.fieldsFrom(dataObject);
269         input.setNode(dataObject.getNode());
270         input.setUpdatedFlow(updatedflowbuilder.build());
271
272         // updating the staticflow cache
273         /*
274          * Commented out due to problems caused by improper use of ClusteringServices.
275         Integer ordinal = staticFlowsOrdinal.get(0);
276         staticFlowsOrdinal.put(0, ++ordinal);
277         staticFlows.put(ordinal, dataObject);
278         */
279
280         // We send flow to the sounthbound plugin
281         flowService.updateFlow(input.build());
282         /*
283          * Commented out due to problems caused by improper use of ClusteringServices.
284         updateLocalDatabase((NodeFlow) dataObject, true);
285         */
286     }
287
288     @SuppressWarnings("unchecked")
289     private void commitToPlugin(internalTransaction transaction) {
290         Set<Entry<InstanceIdentifier<?>, DataObject>> createdEntries = transaction.getModification().getCreatedConfigurationData().entrySet();
291
292         /*
293          * This little dance is because updatedEntries contains both created and modified entries
294          * The reason I created a new HashSet is because the collections we are returned are immutable.
295          */
296         Set<Entry<InstanceIdentifier<?>, DataObject>> updatedEntries = new HashSet<Entry<InstanceIdentifier<?>, DataObject>>();
297         updatedEntries.addAll(transaction.getModification().getUpdatedConfigurationData().entrySet());
298         updatedEntries.removeAll(createdEntries);
299
300         Set<InstanceIdentifier<?>> removeEntriesInstanceIdentifiers = transaction.getModification().getRemovedConfigurationData();
301         transaction.getModification().getOriginalConfigurationData();
302         for (Entry<InstanceIdentifier<?>, DataObject> entry : createdEntries) {
303             if(entry.getValue() instanceof Flow) {
304                 System.out.println("Coming add cc in FlowDatacommitHandler");
305                 addFlow(entry.getKey(), (Flow) entry.getValue());
306             }
307         }
308         for (@SuppressWarnings("unused")
309         Entry<InstanceIdentifier<?>, DataObject> entry : updatedEntries) {
310             if(entry.getValue() instanceof Flow) {
311                 System.out.println("Coming update cc in FlowDatacommitHandler");
312                 updateFlow(entry.getKey(), (Flow) entry.getValue());
313             }
314         }
315
316         for (InstanceIdentifier<?> instanceId : removeEntriesInstanceIdentifiers ) {
317             DataObject removeValue = transaction.getModification().getOriginalConfigurationData().get(instanceId);
318             if(removeValue instanceof Flow) {
319                 System.out.println("Coming remove cc in FlowDatacommitHandler");
320                 removeFlow(instanceId, (Flow) removeValue);
321
322             }
323         }
324
325     }
326
327     private final class FlowDataCommitHandler implements DataCommitHandler<InstanceIdentifier<?>, DataObject> {
328
329         @SuppressWarnings("unchecked")
330         @Override
331         public DataCommitTransaction requestCommit(DataModification<InstanceIdentifier<?>, DataObject> modification) {
332             // We should verify transaction
333             System.out.println("Coming in FlowDatacommitHandler");
334             internalTransaction transaction = new internalTransaction(modification);
335             transaction.prepareUpdate();
336             return transaction;
337         }
338     }
339
340     private final class internalTransaction implements DataCommitTransaction<InstanceIdentifier<?>, DataObject> {
341
342         private final DataModification<InstanceIdentifier<?>, DataObject> modification;
343
344         @Override
345         public DataModification<InstanceIdentifier<?>, DataObject> getModification() {
346             return modification;
347         }
348
349         public internalTransaction(DataModification<InstanceIdentifier<?>, DataObject> modification) {
350             this.modification = modification;
351         }
352
353         Map<InstanceIdentifier<?>, Flow> additions = new HashMap<>();
354         Map<InstanceIdentifier<?>, Flow> updates = new HashMap<>();
355         Map<InstanceIdentifier<?>, Flow> removals = new HashMap<>();
356
357         /**
358          * We create a plan which flows will be added, which will be updated and
359          * which will be removed based on our internal state.
360          *
361          */
362         void prepareUpdate() {
363
364             Set<Entry<InstanceIdentifier<?>, DataObject>> puts = modification.getUpdatedConfigurationData().entrySet();
365             for (Entry<InstanceIdentifier<?>, DataObject> entry : puts) {
366
367                 // validating the DataObject
368                 DataObject value = entry.getValue();
369                 if(value instanceof Flow ) {
370                     Flow flow = (Flow)value;
371                     boolean status = validate(flow);
372                     if (!status) {
373                         return;
374                     }
375                     // Presence check
376                     /*
377                      * This is breaking due to some improper use of caches...
378                      *
379                     if (flowEntryExists(flow)) {
380                         String error = "Entry with this name on specified table already exists";
381                         logger.warn("Entry with this name on specified table already exists: {}", entry);
382                         logger.error(error);
383                         return;
384                     }
385                     if (originalSwView.containsKey(entry)) {
386                         logger.warn("Operation Rejected: A flow with same match and priority exists on the target node");
387                         logger.trace("Aborting to install {}", entry);
388                         continue;
389                     }
390                     */
391                     if (!FRMUtil.validateMatch(flow)) {
392                         logger.error("Not a valid Match");
393                         return;
394                     }
395                     if (!FRMUtil.validateInstructions(flow)) {
396                         logger.error("Not a valid Instruction");
397                         return;
398                     }
399                     /*
400                      * Commented out due to Clustering Services issues
401                      * preparePutEntry(entry.getKey(), flow);
402                      */
403                 }
404             }
405
406             // removals = modification.getRemovedConfigurationData();
407             Set<InstanceIdentifier<?>> removedData = modification.getRemovedConfigurationData();
408             for (InstanceIdentifier<?> removal : removedData) {
409                 DataObject value = modification.getOriginalConfigurationData().get(removal);
410                 if (value instanceof Flow) {
411                     removals.put(removal, (Flow) value);
412                 }
413             }
414
415         }
416
417         private void preparePutEntry(InstanceIdentifier<?> key, Flow flow) {
418             Flow original = originalSwView.get(key);
419             if (original != null) {
420                 // It is update for us
421                 System.out.println("Coming update  in FlowDatacommitHandler");
422                 updates.put(key, flow);
423             } else {
424                 // It is addition for us
425                 System.out.println("Coming add in FlowDatacommitHandler");
426                 additions.put(key, flow);
427             }
428         }
429
430         /**
431          * We are OK to go with execution of plan
432          *
433          */
434         @Override
435         public RpcResult<Void> finish() throws IllegalStateException {
436
437             commitToPlugin(this);
438             // We return true if internal transaction is successful.
439             // return Rpcs.getRpcResult(true, null, Collections.emptySet());
440             return Rpcs.getRpcResult(true, null, Collections.<RpcError>emptySet());
441         }
442
443         /**
444          *
445          * We should rollback our preparation
446          *
447          */
448         @Override
449         public RpcResult<Void> rollback() throws IllegalStateException {
450             // NOOP - we did not modified any internal state during
451             // requestCommit phase
452             // return Rpcs.getRpcResult(true, null, Collections.emptySet());
453             return Rpcs.getRpcResult(true, null, Collections.<RpcError>emptySet());
454
455         }
456
457         public boolean validate(Flow flow) {
458
459             String msg = ""; // Specific part of warn/error log
460
461             boolean result  = true;
462             // flow Name validation
463             if (flow.getFlowName() == null || flow.getFlowName().trim().isEmpty()
464                     || !flow.getFlowName().matches(NAMEREGEX)) {
465                 msg = "Invalid Flow name";
466                 result = false;
467             }
468             // Node Validation
469             if (result == true && flow.getNode() == null) {
470                 msg = "Node is null";
471                 result = false;
472             }
473
474             // TODO: Validate we are seeking to program a flow against a valid Node
475
476             if (result == true && flow.getPriority() != null) {
477                 if (flow.getPriority() < 0 || flow.getPriority() > 65535) {
478                     msg = String.format("priority %s is not in the range 0 - 65535",
479                             flow.getPriority());
480                     result = false;
481                 }
482             }
483             if (result == false) {
484                 logger.warn("Invalid Configuration for flow {}. The failure is {}",flow,msg);
485                 logger.error("Invalid Configuration ({})",msg);
486             }
487             return result;
488         }
489
490         private boolean flowEntryExists(Flow flow) {
491             // Flow name has to be unique on per table id basis
492             for (ConcurrentMap.Entry<FlowKey, Flow> entry : originalSwView.entrySet()) {
493                 if (entry.getValue().getFlowName().equals(flow.getFlowName())
494                         && entry.getValue().getTableId().equals(flow.getTableId())) {
495                     return true;
496                 }
497             }
498             return false;
499         }
500     }
501
502     final class FlowEventListener implements SalFlowListener {
503
504         List<FlowAdded> addedFlows = new ArrayList<>();
505         List<FlowRemoved> removedFlows = new ArrayList<>();
506         List<FlowUpdated> updatedFlows = new ArrayList<>();
507
508         @Override
509         public void onFlowAdded(FlowAdded notification) {
510             System.out.println("added flow..........................");
511             addedFlows.add(notification);
512         }
513
514         @Override
515         public void onFlowRemoved(FlowRemoved notification) {
516             removedFlows.add(notification);
517         };
518
519         @Override
520         public void onFlowUpdated(FlowUpdated notification) {
521             updatedFlows.add(notification);
522         }
523
524         @Override
525         public void onSwitchFlowRemoved(SwitchFlowRemoved notification) {
526             // TODO
527         }
528
529         @Override
530         public void onNodeErrorNotification(NodeErrorNotification notification) {
531             // TODO Auto-generated method stub
532
533         }
534
535         @Override
536         public void onNodeExperimenterErrorNotification(NodeExperimenterErrorNotification notification) {
537             // TODO Auto-generated method stub
538
539         };
540
541     }
542
543     // Commented out DataChangeListene - to be used by Stats
544
545     // final class FlowDataListener implements DataChangeListener {
546     // private SalFlowService flowService;
547     //
548     // public FlowDataListener() {
549     //
550     // }
551     //
552     // @Override
553     // public void onDataChanged(
554     // DataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
555     // System.out.println("Coming in onDataChange..............");
556     // @SuppressWarnings("unchecked")
557     // Collection<DataObject> additions = (Collection<DataObject>)
558     // change.getCreatedConfigurationData();
559     // // we can check for getCreated, getDeleted or getUpdated from DataChange
560     // Event class
561     // for (DataObject dataObject : additions) {
562     // if (dataObject instanceof NodeFlow) {
563     // NodeRef nodeOne = createNodeRef("foo:node:1");
564     // // validating the dataObject here
565     // AddFlowInputBuilder input = new AddFlowInputBuilder();
566     // input.setNode(((NodeFlow) dataObject).getNode());
567     // input.setNode(nodeOne);
568     // // input.setPriority(((NodeFlow) dataObject).getPriority());
569     // //input.setMatch(((NodeFlow) dataObject).getMatch());
570     // //input.setFlowTable(((NodeFlow) dataObject).getFlowTable());
571     // //input.setCookie(((NodeFlow) dataObject).getCookie());
572     // //input.setAction(((NodeFlow) dataObject).getAction());
573     //
574     // @SuppressWarnings("unused")
575     // Future<RpcResult<java.lang.Void>> result =
576     // flowService.addFlow(input.build());
577     // }
578     // }
579     // }
580     // }
581
582     private static void updateLocalDatabase(NodeFlow entry, boolean add) {
583
584         updateSwViewes(entry, add);
585
586         updateNodeFlowsDB(entry, add);
587
588     }
589
590     /*
591      * Update the node mapped flows database
592      */
593     private static void updateSwViewes(NodeFlow entry, boolean add) {
594         if (add) {
595             FlowConsumerImpl.originalSwView.put((FlowKey) entry, (Flow) entry);
596             installedSwView.put((FlowKey) entry, (Flow) entry);
597         } else {
598             originalSwView.remove(entry);
599             installedSwView.remove(entry);
600
601         }
602     }
603
604     @Override
605     public List<DataObject> get() {
606
607         List<DataObject> orderedList = new ArrayList<DataObject>();
608         ConcurrentMap<Integer, Flow> flowMap = staticFlows;
609         int maxKey = staticFlowsOrdinal.get(0).intValue();
610         for (int i = 0; i <= maxKey; i++) {
611             Flow entry = flowMap.get(i);
612             if (entry != null) {
613                 orderedList.add(entry);
614             }
615         }
616         return orderedList;
617     }
618
619     @Override
620     public DataObject getWithName(String name, org.opendaylight.controller.sal.core.Node n) {
621         if (this instanceof FlowConsumerImpl) {
622             for (ConcurrentMap.Entry<Integer, Flow> flowEntry : staticFlows.entrySet()) {
623                 Flow flow = flowEntry.getValue();
624                 if (flow.getNode().equals(n) && flow.getFlowName().equals(name)) {
625
626                     return flowEntry.getValue();
627                 }
628             }
629         }
630         return null;
631     }
632
633     /*
634      * Update the node mapped flows database
635      */
636     private static void updateNodeFlowsDB(NodeFlow entry, boolean add) {
637         Node node = (Node) entry.getNode();
638
639         List<Flow> nodeIndeces = nodeFlows.get(node);
640         if (nodeIndeces == null) {
641             if (!add) {
642                 return;
643             } else {
644                 nodeIndeces = new ArrayList<Flow>();
645             }
646         }
647
648         if (add) {
649             nodeIndeces.add((Flow) entry);
650         } else {
651             nodeIndeces.remove(entry);
652         }
653
654         // Update cache across cluster
655         if (nodeIndeces.isEmpty()) {
656             nodeFlows.remove(node);
657         } else {
658             nodeFlows.put(node, nodeIndeces);
659         }
660     }
661
662     private static NodeRef createNodeRef(String string) {
663         NodeKey key = new NodeKey(new NodeId(string));
664         InstanceIdentifier<Node> path = InstanceIdentifier.builder().node(Nodes.class).node(Node.class, key)
665                 .toInstance();
666
667         return new NodeRef(path);
668     }
669 }