Flow removed switch event. Group and Meter update RPC
[controller.git] / opendaylight / forwardingrulesmanager_mdsal / openflow / src / main / java / org / opendaylight / controller / forwardingrulesmanager_mdsal / consumer / impl / FlowConsumerImpl.java
1 package org.opendaylight.controller.forwardingrulesmanager_mdsal.consumer.impl;
2
3 import java.util.ArrayList;
4 import java.util.EnumSet;
5 import java.util.HashMap;
6 import java.util.List;
7 import java.util.Map;
8 import java.util.Map.Entry;
9 import java.util.Set;
10 import java.util.concurrent.ConcurrentHashMap;
11 import java.util.concurrent.ConcurrentMap;
12 import java.util.concurrent.Future;
13
14 import org.opendaylight.controller.clustering.services.CacheConfigException;
15 import org.opendaylight.controller.clustering.services.CacheExistException;
16 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
17 import org.opendaylight.controller.clustering.services.IClusterServices;
18 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
19 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction;
20 import org.opendaylight.controller.md.sal.common.api.data.DataModification;
21 import org.opendaylight.controller.sal.common.util.Rpcs;
22 import org.opendaylight.controller.sal.core.IContainer;
23 import org.opendaylight.controller.sal.utils.GlobalConstants;
24 import org.opendaylight.controller.sal.utils.ServiceHelper;
25 import org.opendaylight.controller.sal.utils.Status;
26 import org.opendaylight.controller.sal.utils.StatusCode;
27 import org.opendaylight.controller.switchmanager.ISwitchManager;
28 import org.opendaylight.controller.switchmanager.Switch;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.config.rev130819.Flows;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.config.rev130819.flows.Flow;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.config.rev130819.flows.FlowKey;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowAdded;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowUpdated;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeFlow;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInputBuilder;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowListener;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SwitchFlowRemoved;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.instruction.list.Instruction;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
49 import org.opendaylight.yangtools.concepts.Registration;
50 import org.opendaylight.yangtools.yang.binding.DataObject;
51 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
52 import org.opendaylight.yangtools.yang.binding.NotificationListener;
53 import org.opendaylight.yangtools.yang.common.RpcResult;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
56
57 public class FlowConsumerImpl {
58     protected static final Logger logger = LoggerFactory.getLogger(FlowConsumerImpl.class);
59     private FlowEventListener flowEventListener = new FlowEventListener();
60     private Registration<NotificationListener> listener1Reg;
61     private SalFlowService flowService;
62     // private FlowDataListener listener;
63     private FlowDataCommitHandler commitHandler;
64     private static ConcurrentHashMap<FlowKey, Flow> originalSwView;
65     private static ConcurrentMap<FlowKey, Flow> installedSwView;
66     private IClusterContainerServices clusterContainerService = null;
67     private IContainer container;
68     private static final String NAMEREGEX = "^[a-zA-Z0-9]+$";
69     private static ConcurrentMap<Integer, Flow> staticFlows;
70     private static ConcurrentMap<Integer, Integer> staticFlowsOrdinal;
71     /*
72      * Inactive flow list. This is for the global instance of FRM It will
73      * contain all the flow entries which were installed on the global container
74      * when the first container is created.
75      */
76     private static ConcurrentMap<FlowKey, Flow> inactiveFlows;
77
78     /*
79      * /* Per node indexing
80      */
81     private static ConcurrentMap<Node, List<Flow>> nodeFlows;
82     private boolean inContainerMode; // being used by global instance only
83
84     public FlowConsumerImpl() {
85         InstanceIdentifier<? extends DataObject> path = InstanceIdentifier.builder().node(Flows.class).toInstance();
86         flowService = FRMConsumerImpl.getProviderSession().getRpcService(SalFlowService.class);
87
88         if (null == flowService) {
89             logger.error("Consumer SAL Service is down or NULL. FRM may not function as intended");
90             System.out.println("Consumer SAL Service is down or NULL.");
91             return;
92         }
93
94         // listener = new FlowDataListener();
95
96         // if (null ==
97         // FRMConsumerImpl.getDataBrokerService().registerDataChangeListener(path,
98         // listener)) {
99         // logger.error("Failed to listen on flow data modifcation events");
100         // System.out.println("Consumer SAL Service is down or NULL.");
101         // return;
102         // }
103
104         // For switch events
105         listener1Reg = FRMConsumerImpl.getNotificationService().registerNotificationListener(flowEventListener);
106
107         if (null == listener1Reg) {
108             logger.error("Listener to listen on flow data modifcation events");
109             System.out.println("Consumer SAL Service is down or NULL.");
110             return;
111         }
112         // addFlowTest();
113         System.out.println("-------------------------------------------------------------------");
114         allocateCaches();
115         commitHandler = new FlowDataCommitHandler();
116         FRMConsumerImpl.getDataProviderService().registerCommitHandler(path, commitHandler);
117         clusterContainerService = (IClusterContainerServices) ServiceHelper.getGlobalInstance(
118                 IClusterContainerServices.class, this);
119         container = (IContainer) ServiceHelper.getGlobalInstance(IContainer.class, this);
120         /*
121          * If we are not the first cluster node to come up, do not initialize
122          * the static flow entries ordinal
123          */
124         if (staticFlowsOrdinal.size() == 0) {
125             staticFlowsOrdinal.put(0, Integer.valueOf(0));
126         }
127     }
128
129     private void allocateCaches() {
130
131         if (this.clusterContainerService == null) {
132             logger.warn("Un-initialized clusterContainerService, can't create cache");
133             return;
134         }
135
136         try {
137             clusterContainerService.createCache("frm.originalSwView",
138                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
139             clusterContainerService.createCache("frm.installedSwView",
140                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
141             clusterContainerService
142                     .createCache("frm.staticFlows", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
143             clusterContainerService.createCache("frm.staticFlowsOrdinal",
144                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
145             clusterContainerService.createCache("frm.inactiveFlows",
146                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
147             clusterContainerService.createCache("frm.nodeFlows", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
148             clusterContainerService.createCache("frm.groupFlows", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
149         } catch (CacheConfigException cce) {
150             logger.error("CacheConfigException");
151         } catch (CacheExistException cce) {
152             logger.error("CacheExistException");
153         }
154     }
155
156     private void addFlowTest() {
157         try {
158             NodeRef nodeOne = createNodeRef("foo:node:1");
159             AddFlowInputBuilder input1 = new AddFlowInputBuilder();
160
161             input1.setNode(nodeOne);
162             AddFlowInput firstMsg = input1.build();
163
164             if (null != flowService) {
165                 System.out.println(flowService.toString());
166             } else {
167                 System.out.println("ConsumerFlowService is NULL");
168             }
169             @SuppressWarnings("unused")
170             Future<RpcResult<AddFlowOutput>> result1 = flowService.addFlow(firstMsg);
171
172         } catch (Exception e) {
173             // TODO Auto-generated catch block
174             e.printStackTrace();
175         }
176     }
177
178     /**
179      * Adds flow to the southbound plugin and our internal database
180      *
181      * @param path
182      * @param dataObject
183      */
184     private void addFlow(InstanceIdentifier<?> path, Flow dataObject) {
185
186         AddFlowInputBuilder input = new AddFlowInputBuilder();
187         List<Instruction> inst = (dataObject).getInstructions().getInstruction();
188         input.setNode((dataObject).getNode());
189         input.setPriority((dataObject).getPriority());
190         input.setMatch((dataObject).getMatch());
191         input.setCookie((dataObject).getCookie());
192         input.setInstructions((dataObject).getInstructions());
193         dataObject.getMatch().getLayer3Match();
194         for (int i = 0; i < inst.size(); i++) {
195             System.out.println("i = " + i + inst.get(i).getInstruction().toString());
196             System.out.println("i = " + i + inst.get(i).toString());
197         }
198
199         System.out.println("Instruction list" + (dataObject).getInstructions().getInstruction().toString());
200
201         // updating the staticflow cache
202         Integer ordinal = staticFlowsOrdinal.get(0);
203         staticFlowsOrdinal.put(0, ++ordinal);
204         staticFlows.put(ordinal, (Flow) dataObject);
205
206         // We send flow to the sounthbound plugin
207         flowService.addFlow(input.build());
208         updateLocalDatabase((NodeFlow) dataObject, true);
209     }
210
211     /**
212      * Removes flow to the southbound plugin and our internal database
213      *
214      * @param path
215      * @param dataObject
216      */
217     private void removeFlow(InstanceIdentifier<?> path, Flow dataObject) {
218
219         RemoveFlowInputBuilder input = new RemoveFlowInputBuilder();
220         List<Instruction> inst = (dataObject).getInstructions().getInstruction();
221         input.setNode((dataObject).getNode());
222         input.setPriority((dataObject).getPriority());
223         input.setMatch((dataObject).getMatch());
224         input.setCookie((dataObject).getCookie());
225         input.setInstructions((dataObject).getInstructions());
226         dataObject.getMatch().getLayer3Match();
227         for (int i = 0; i < inst.size(); i++) {
228             System.out.println("i = " + i + inst.get(i).getInstruction().toString());
229             System.out.println("i = " + i + inst.get(i).toString());
230         }
231
232         System.out.println("Instruction list" + (dataObject).getInstructions().getInstruction().toString());
233
234         // updating the staticflow cache
235         Integer ordinal = staticFlowsOrdinal.get(0);
236         staticFlowsOrdinal.put(0, ++ordinal);
237         staticFlows.put(ordinal, dataObject);
238
239         // We send flow to the sounthbound plugin
240         flowService.removeFlow(input.build());
241         updateLocalDatabase((NodeFlow) dataObject, false);
242     }
243
244     @SuppressWarnings("unchecked")
245     private void commitToPlugin(internalTransaction transaction) {
246         for (Entry<InstanceIdentifier<?>, Flow> entry : transaction.additions.entrySet()) {
247             System.out.println("Coming add cc in FlowDatacommitHandler");
248             addFlow(entry.getKey(), entry.getValue());
249         }
250         for (@SuppressWarnings("unused")
251         Entry<InstanceIdentifier<?>, Flow> entry : transaction.updates.entrySet()) {
252             System.out.println("Coming update cc in FlowDatacommitHandler");
253             // updateFlow(entry.getKey(),entry.getValue());
254         }
255
256         for (Entry<InstanceIdentifier<?>, Flow> entry : transaction.removals.entrySet()) {
257             System.out.println("Coming remove cc in FlowDatacommitHandler");
258             removeFlow(entry.getKey(), entry.getValue());
259         }
260
261     }
262
263     private final class FlowDataCommitHandler implements DataCommitHandler<InstanceIdentifier<?>, DataObject> {
264
265         @SuppressWarnings("unchecked")
266         @Override
267         public DataCommitTransaction requestCommit(DataModification<InstanceIdentifier<?>, DataObject> modification) {
268             // We should verify transaction
269             System.out.println("Coming in FlowDatacommitHandler");
270             internalTransaction transaction = new internalTransaction(modification);
271             transaction.prepareUpdate();
272             return transaction;
273         }
274     }
275
276     private final class internalTransaction implements DataCommitTransaction<InstanceIdentifier<?>, DataObject> {
277
278         private final DataModification<InstanceIdentifier<?>, DataObject> modification;
279
280         @Override
281         public DataModification<InstanceIdentifier<?>, DataObject> getModification() {
282             return modification;
283         }
284
285         public internalTransaction(DataModification<InstanceIdentifier<?>, DataObject> modification) {
286             this.modification = modification;
287         }
288
289         Map<InstanceIdentifier<?>, Flow> additions = new HashMap<>();
290         Map<InstanceIdentifier<?>, Flow> updates = new HashMap<>();
291         Map<InstanceIdentifier<?>, Flow> removals = new HashMap<>();
292
293         /**
294          * We create a plan which flows will be added, which will be updated and
295          * which will be removed based on our internal state.
296          *
297          */
298         void prepareUpdate() {
299
300             Set<Entry<InstanceIdentifier<?>, DataObject>> puts = modification.getUpdatedConfigurationData().entrySet();
301             for (Entry<InstanceIdentifier<?>, DataObject> entry : puts) {
302
303                 // validating the DataObject
304
305                 Status status = validate(container, (NodeFlow) entry);
306                 if (!status.isSuccess()) {
307                     logger.warn("Invalid Configuration for flow {}. The failure is {}", entry, status.getDescription());
308                     String error = "Invalid Configuration (" + status.getDescription() + ")";
309                     logger.error(error);
310                     return;
311                 }
312                 // Presence check
313                 if (flowEntryExists((NodeFlow) entry)) {
314                     String error = "Entry with this name on specified table already exists";
315                     logger.warn("Entry with this name on specified table already exists: {}", entry);
316                     logger.error(error);
317                     return;
318                 }
319                 if (originalSwView.containsKey((FlowKey) entry)) {
320                     logger.warn("Operation Rejected: A flow with same match and priority exists on the target node");
321                     logger.trace("Aborting to install {}", entry);
322                     continue;
323                 }
324                 if (!FRMUtil.validateMatch((NodeFlow) entry)) {
325                     logger.error("Not a valid Match");
326                     return;
327                 }
328                 if (!FRMUtil.validateInstructions((NodeFlow) entry)) {
329                     logger.error("Not a valid Instruction");
330                     return;
331                 }
332                 if (entry.getValue() instanceof Flow) {
333                     Flow flow = (Flow) entry.getValue();
334                     preparePutEntry(entry.getKey(), flow);
335                 }
336
337             }
338
339             // removals = modification.getRemovedConfigurationData();
340             Set<InstanceIdentifier<?>> removedData = modification.getRemovedConfigurationData();
341             for (InstanceIdentifier<?> removal : removedData) {
342                 DataObject value = modification.getOriginalConfigurationData().get(removal);
343                 if (value instanceof Flow) {
344                     removals.put(removal, (Flow) value);
345                 }
346             }
347
348         }
349
350         private void preparePutEntry(InstanceIdentifier<?> key, Flow flow) {
351             Flow original = originalSwView.get(key);
352             if (original != null) {
353                 // It is update for us
354                 System.out.println("Coming update  in FlowDatacommitHandler");
355                 updates.put(key, flow);
356             } else {
357                 // It is addition for us
358                 System.out.println("Coming add in FlowDatacommitHandler");
359                 additions.put(key, flow);
360             }
361         }
362
363         /**
364          * We are OK to go with execution of plan
365          *
366          */
367         @Override
368         public RpcResult<Void> finish() throws IllegalStateException {
369
370             commitToPlugin(this);
371             // We return true if internal transaction is successful.
372             // return Rpcs.getRpcResult(true, null, Collections.emptySet());
373             return Rpcs.getRpcResult(true, null, null);
374         }
375
376         /**
377          *
378          * We should rollback our preparation
379          *
380          */
381         @Override
382         public RpcResult<Void> rollback() throws IllegalStateException {
383             // NOOP - we did not modified any internal state during
384             // requestCommit phase
385             // return Rpcs.getRpcResult(true, null, Collections.emptySet());
386             return Rpcs.getRpcResult(true, null, null);
387
388         }
389
390         public Status validate(IContainer container, NodeFlow dataObject) {
391
392             // container validation
393             Switch sw = null;
394             Node node = null;
395             String containerName = (container == null) ? GlobalConstants.DEFAULT.toString() : container.getName();
396             ISwitchManager switchManager = (ISwitchManager) ServiceHelper.getInstance(ISwitchManager.class,
397                     containerName, this);
398             // flow Name validation
399             if (dataObject.getFlowName() == null || dataObject.getFlowName().trim().isEmpty()
400                     || !dataObject.getFlowName().matches(NAMEREGEX)) {
401                 return new Status(StatusCode.BADREQUEST, "Invalid Flow name");
402             }
403             // Node Validation
404             if (dataObject.getNode() == null) {
405                 return new Status(StatusCode.BADREQUEST, "Node is null");
406             }
407
408             if (switchManager != null) {
409                 for (Switch device : switchManager.getNetworkDevices()) {
410                     node = (Node) device.getNode();
411                     if (device.getNode().equals(dataObject.getNode())) {
412                         sw = device;
413                         break;
414                     }
415                 }
416                 if (sw == null) {
417                     return new Status(StatusCode.BADREQUEST, String.format("Node %s not found", node));
418                 }
419             } else {
420                 logger.debug("switchmanager is not set yet");
421             }
422
423             if (dataObject.getPriority() != null) {
424                 if (dataObject.getPriority() < 0 || dataObject.getPriority() > 65535) {
425                     return new Status(StatusCode.BADREQUEST, String.format("priority %s is not in the range 0 - 65535",
426                             dataObject.getPriority()));
427                 }
428             }
429
430             return new Status(StatusCode.SUCCESS);
431         }
432
433         private boolean flowEntryExists(NodeFlow config) {
434             // Flow name has to be unique on per table id basis
435             for (ConcurrentMap.Entry<FlowKey, Flow> entry : originalSwView.entrySet()) {
436                 if (entry.getValue().getFlowName().equals(config.getFlowName())
437                         && entry.getValue().getTableId().equals(config.getTableId())) {
438                     return true;
439                 }
440             }
441             return false;
442         }
443     }
444
445     final class FlowEventListener implements SalFlowListener {
446
447         List<FlowAdded> addedFlows = new ArrayList<>();
448         List<FlowRemoved> removedFlows = new ArrayList<>();
449         List<FlowUpdated> updatedFlows = new ArrayList<>();
450
451         @Override
452         public void onFlowAdded(FlowAdded notification) {
453             System.out.println("added flow..........................");
454             addedFlows.add(notification);
455         }
456
457         @Override
458         public void onFlowRemoved(FlowRemoved notification) {
459             removedFlows.add(notification);
460         };
461
462         @Override
463         public void onFlowUpdated(FlowUpdated notification) {
464             updatedFlows.add(notification);
465         }
466         
467         @Override
468         public void onSwitchFlowRemoved(SwitchFlowRemoved notification) {
469             //TODO
470         };
471
472     }
473
474     // Commented out DataChangeListene - to be used by Stats
475
476     // final class FlowDataListener implements DataChangeListener {
477     // private SalFlowService flowService;
478     //
479     // public FlowDataListener() {
480     //
481     // }
482     //
483     // @Override
484     // public void onDataChanged(
485     // DataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
486     // System.out.println("Coming in onDataChange..............");
487     // @SuppressWarnings("unchecked")
488     // Collection<DataObject> additions = (Collection<DataObject>)
489     // change.getCreatedConfigurationData();
490     // // we can check for getCreated, getDeleted or getUpdated from DataChange
491     // Event class
492     // for (DataObject dataObject : additions) {
493     // if (dataObject instanceof NodeFlow) {
494     // NodeRef nodeOne = createNodeRef("foo:node:1");
495     // // validating the dataObject here
496     // AddFlowInputBuilder input = new AddFlowInputBuilder();
497     // input.setNode(((NodeFlow) dataObject).getNode());
498     // input.setNode(nodeOne);
499     // // input.setPriority(((NodeFlow) dataObject).getPriority());
500     // //input.setMatch(((NodeFlow) dataObject).getMatch());
501     // //input.setFlowTable(((NodeFlow) dataObject).getFlowTable());
502     // //input.setCookie(((NodeFlow) dataObject).getCookie());
503     // //input.setAction(((NodeFlow) dataObject).getAction());
504     //
505     // @SuppressWarnings("unused")
506     // Future<RpcResult<java.lang.Void>> result =
507     // flowService.addFlow(input.build());
508     // }
509     // }
510     // }
511     // }
512
513     private static void updateLocalDatabase(NodeFlow entry, boolean add) {
514
515         updateSwViewes(entry, add);
516
517         updateNodeFlowsDB(entry, add);
518
519     }
520
521     /*
522      * Update the node mapped flows database
523      */
524     private static void updateSwViewes(NodeFlow entry, boolean add) {
525         if (add) {
526             FlowConsumerImpl.originalSwView.put((FlowKey) entry, (Flow) entry);
527             installedSwView.put((FlowKey) entry, (Flow) entry);
528         } else {
529             originalSwView.remove((Flow) entry);
530             installedSwView.remove((FlowKey) entry);
531
532         }
533     }
534
535     /*
536      * Update the node mapped flows database
537      */
538     private static void updateNodeFlowsDB(NodeFlow entry, boolean add) {
539         Node node = (Node) entry.getNode();
540
541         List<Flow> nodeIndeces = nodeFlows.get(node);
542         if (nodeIndeces == null) {
543             if (!add) {
544                 return;
545             } else {
546                 nodeIndeces = new ArrayList<Flow>();
547             }
548         }
549
550         if (add) {
551             nodeIndeces.add((Flow) entry);
552         } else {
553             nodeIndeces.remove((Flow) entry);
554         }
555
556         // Update cache across cluster
557         if (nodeIndeces.isEmpty()) {
558             nodeFlows.remove(node);
559         } else {
560             nodeFlows.put(node, nodeIndeces);
561         }
562     }
563
564     private static NodeRef createNodeRef(String string) {
565         NodeKey key = new NodeKey(new NodeId(string));
566         InstanceIdentifier<Node> path = InstanceIdentifier.builder().node(Nodes.class).node(Node.class, key)
567                 .toInstance();
568
569         return new NodeRef(path);
570     }
571 }