1 package org.opendaylight.controller.forwardingrulesmanager.consumer.impl;
3 import java.util.ArrayList;
4 import java.util.EnumSet;
5 import java.util.HashMap;
8 import java.util.Map.Entry;
10 import java.util.concurrent.ConcurrentHashMap;
11 import java.util.concurrent.ConcurrentMap;
12 import java.util.concurrent.Future;
14 import org.opendaylight.controller.clustering.services.CacheConfigException;
15 import org.opendaylight.controller.clustering.services.CacheExistException;
16 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
17 import org.opendaylight.controller.clustering.services.IClusterServices;
18 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
19 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction;
20 import org.opendaylight.controller.md.sal.common.api.data.DataModification;
21 import org.opendaylight.controller.sal.common.util.Rpcs;
22 import org.opendaylight.controller.sal.core.IContainer;
23 import org.opendaylight.controller.sal.utils.GlobalConstants;
24 import org.opendaylight.controller.sal.utils.ServiceHelper;
25 import org.opendaylight.controller.sal.utils.Status;
26 import org.opendaylight.controller.sal.utils.StatusCode;
27 import org.opendaylight.controller.switchmanager.ISwitchManager;
28 import org.opendaylight.controller.switchmanager.Switch;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.config.rev130819.Flows;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.config.rev130819.flows.Flow;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.config.rev130819.flows.FlowKey;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowAdded;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowUpdated;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeErrorNotification;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeExperimenterErrorNotification;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeFlow;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInputBuilder;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowListener;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SwitchFlowRemoved;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowInputBuilder;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.flow.update.UpdatedFlowBuilder;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.instruction.list.Instruction;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
53 import org.opendaylight.yangtools.concepts.Registration;
54 import org.opendaylight.yangtools.yang.binding.DataObject;
55 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
56 import org.opendaylight.yangtools.yang.binding.NotificationListener;
57 import org.opendaylight.yangtools.yang.common.RpcResult;
58 import org.slf4j.Logger;
59 import org.slf4j.LoggerFactory;
61 public class FlowConsumerImpl implements IForwardingRulesManager {
62 protected static final Logger logger = LoggerFactory.getLogger(FlowConsumerImpl.class);
63 private final FlowEventListener flowEventListener = new FlowEventListener();
64 private Registration<NotificationListener> listener1Reg;
65 private SalFlowService flowService;
66 // private FlowDataListener listener;
67 private FlowDataCommitHandler commitHandler;
68 private static ConcurrentHashMap<FlowKey, Flow> originalSwView;
69 private static ConcurrentMap<FlowKey, Flow> installedSwView;
70 private IClusterContainerServices clusterContainerService = null;
71 private IContainer container;
72 private static final String NAMEREGEX = "^[a-zA-Z0-9]+$";
73 private static ConcurrentMap<Integer, Flow> staticFlows;
74 private static ConcurrentMap<Integer, Integer> staticFlowsOrdinal = new ConcurrentHashMap<Integer, Integer>();
76 * Inactive flow list. This is for the global instance of FRM It will
77 * contain all the flow entries which were installed on the global container
78 * when the first container is created.
80 private static ConcurrentMap<FlowKey, Flow> inactiveFlows;
83 * /* Per node indexing
85 private static ConcurrentMap<Node, List<Flow>> nodeFlows;
86 private boolean inContainerMode; // being used by global instance only
88 public FlowConsumerImpl() {
89 InstanceIdentifier<? extends DataObject> path = InstanceIdentifier.builder().node(Flows.class).toInstance();
90 flowService = FRMConsumerImpl.getProviderSession().getRpcService(SalFlowService.class);
92 if (null == flowService) {
93 logger.error("Consumer SAL Service is down or NULL. FRM may not function as intended");
94 System.out.println("Consumer SAL Service is down or NULL.");
98 // listener = new FlowDataListener();
101 // FRMConsumerImpl.getDataBrokerService().registerDataChangeListener(path,
103 // logger.error("Failed to listen on flow data modifcation events");
104 // System.out.println("Consumer SAL Service is down or NULL.");
109 listener1Reg = FRMConsumerImpl.getNotificationService().registerNotificationListener(flowEventListener);
111 if (null == listener1Reg) {
112 logger.error("Listener to listen on flow data modifcation events");
113 System.out.println("Consumer SAL Service is down or NULL.");
117 System.out.println("-------------------------------------------------------------------");
119 commitHandler = new FlowDataCommitHandler();
120 FRMConsumerImpl.getDataProviderService().registerCommitHandler(path, commitHandler);
121 clusterContainerService = (IClusterContainerServices) ServiceHelper.getGlobalInstance(
122 IClusterContainerServices.class, this);
123 container = (IContainer) ServiceHelper.getGlobalInstance(IContainer.class, this);
125 * If we are not the first cluster node to come up, do not initialize
126 * the static flow entries ordinal
128 if (staticFlowsOrdinal.size() == 0) {
129 staticFlowsOrdinal.put(0, Integer.valueOf(0));
133 private void allocateCaches() {
135 if (this.clusterContainerService == null) {
136 logger.warn("Un-initialized clusterContainerService, can't create cache");
141 clusterContainerService.createCache("frm.originalSwView",
142 EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
143 clusterContainerService.createCache("frm.installedSwView",
144 EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
145 clusterContainerService
146 .createCache("frm.staticFlows", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
147 clusterContainerService.createCache("frm.staticFlowsOrdinal",
148 EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
149 clusterContainerService.createCache("frm.inactiveFlows",
150 EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
151 clusterContainerService.createCache("frm.nodeFlows", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
152 clusterContainerService.createCache("frm.groupFlows", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
153 } catch (CacheConfigException cce) {
154 logger.error("CacheConfigException");
155 } catch (CacheExistException cce) {
156 logger.error("CacheExistException");
160 private void addFlowTest() {
162 NodeRef nodeOne = createNodeRef("foo:node:1");
163 AddFlowInputBuilder input1 = new AddFlowInputBuilder();
165 input1.setNode(nodeOne);
166 AddFlowInput firstMsg = input1.build();
168 if (null != flowService) {
169 System.out.println(flowService.toString());
171 System.out.println("ConsumerFlowService is NULL");
173 @SuppressWarnings("unused")
174 Future<RpcResult<AddFlowOutput>> result1 = flowService.addFlow(firstMsg);
176 } catch (Exception e) {
177 // TODO Auto-generated catch block
183 * Adds flow to the southbound plugin and our internal database
188 private void addFlow(InstanceIdentifier<?> path, Flow dataObject) {
190 AddFlowInputBuilder input = new AddFlowInputBuilder();
191 List<Instruction> inst = (dataObject).getInstructions().getInstruction();
192 input.setNode((dataObject).getNode());
193 input.setPriority((dataObject).getPriority());
194 input.setMatch((dataObject).getMatch());
195 input.setCookie((dataObject).getCookie());
196 input.setInstructions((dataObject).getInstructions());
197 dataObject.getMatch().getLayer3Match();
198 for (int i = 0; i < inst.size(); i++) {
199 System.out.println("i = " + i + inst.get(i).getInstruction().toString());
200 System.out.println("i = " + i + inst.get(i).toString());
203 System.out.println("Instruction list" + (dataObject).getInstructions().getInstruction().toString());
205 // updating the staticflow cache
206 Integer ordinal = staticFlowsOrdinal.get(0);
207 staticFlowsOrdinal.put(0, ++ordinal);
208 staticFlows.put(ordinal, dataObject);
210 // We send flow to the sounthbound plugin
211 flowService.addFlow(input.build());
212 updateLocalDatabase((NodeFlow) dataObject, true);
216 * Removes flow to the southbound plugin and our internal database
221 private void removeFlow(InstanceIdentifier<?> path, Flow dataObject) {
223 RemoveFlowInputBuilder input = new RemoveFlowInputBuilder();
224 List<Instruction> inst = (dataObject).getInstructions().getInstruction();
225 input.setNode((dataObject).getNode());
226 input.setPriority((dataObject).getPriority());
227 input.setMatch((dataObject).getMatch());
228 input.setCookie((dataObject).getCookie());
229 input.setInstructions((dataObject).getInstructions());
230 dataObject.getMatch().getLayer3Match();
231 for (int i = 0; i < inst.size(); i++) {
232 System.out.println("i = " + i + inst.get(i).getInstruction().toString());
233 System.out.println("i = " + i + inst.get(i).toString());
236 System.out.println("Instruction list" + (dataObject).getInstructions().getInstruction().toString());
238 // updating the staticflow cache
239 Integer ordinal = staticFlowsOrdinal.get(0);
240 staticFlowsOrdinal.put(0, ++ordinal);
241 staticFlows.put(ordinal, dataObject);
243 // We send flow to the sounthbound plugin
244 flowService.removeFlow(input.build());
245 updateLocalDatabase((NodeFlow) dataObject, false);
249 * Update flow to the southbound plugin and our internal database
254 private void updateFlow(InstanceIdentifier<?> path, Flow dataObject) {
256 UpdateFlowInputBuilder input = new UpdateFlowInputBuilder();
257 UpdatedFlowBuilder updatedflowbuilder = new UpdatedFlowBuilder();
258 updatedflowbuilder.fieldsFrom(dataObject);
259 input.setUpdatedFlow(updatedflowbuilder.build());
261 // updating the staticflow cache
262 Integer ordinal = staticFlowsOrdinal.get(0);
263 staticFlowsOrdinal.put(0, ++ordinal);
264 staticFlows.put(ordinal, dataObject);
266 // We send flow to the sounthbound plugin
267 flowService.updateFlow(input.build());
268 updateLocalDatabase((NodeFlow) dataObject, true);
271 @SuppressWarnings("unchecked")
272 private void commitToPlugin(internalTransaction transaction) {
273 for (Entry<InstanceIdentifier<?>, Flow> entry : transaction.additions.entrySet()) {
274 System.out.println("Coming add cc in FlowDatacommitHandler");
275 addFlow(entry.getKey(), entry.getValue());
277 for (@SuppressWarnings("unused")
278 Entry<InstanceIdentifier<?>, Flow> entry : transaction.updates.entrySet()) {
279 System.out.println("Coming update cc in FlowDatacommitHandler");
280 updateFlow(entry.getKey(), entry.getValue());
283 for (Entry<InstanceIdentifier<?>, Flow> entry : transaction.removals.entrySet()) {
284 System.out.println("Coming remove cc in FlowDatacommitHandler");
285 removeFlow(entry.getKey(), entry.getValue());
290 private final class FlowDataCommitHandler implements DataCommitHandler<InstanceIdentifier<?>, DataObject> {
292 @SuppressWarnings("unchecked")
294 public DataCommitTransaction requestCommit(DataModification<InstanceIdentifier<?>, DataObject> modification) {
295 // We should verify transaction
296 System.out.println("Coming in FlowDatacommitHandler");
297 internalTransaction transaction = new internalTransaction(modification);
298 transaction.prepareUpdate();
303 private final class internalTransaction implements DataCommitTransaction<InstanceIdentifier<?>, DataObject> {
305 private final DataModification<InstanceIdentifier<?>, DataObject> modification;
308 public DataModification<InstanceIdentifier<?>, DataObject> getModification() {
312 public internalTransaction(DataModification<InstanceIdentifier<?>, DataObject> modification) {
313 this.modification = modification;
316 Map<InstanceIdentifier<?>, Flow> additions = new HashMap<>();
317 Map<InstanceIdentifier<?>, Flow> updates = new HashMap<>();
318 Map<InstanceIdentifier<?>, Flow> removals = new HashMap<>();
321 * We create a plan which flows will be added, which will be updated and
322 * which will be removed based on our internal state.
325 void prepareUpdate() {
327 Set<Entry<InstanceIdentifier<?>, DataObject>> puts = modification.getUpdatedConfigurationData().entrySet();
328 for (Entry<InstanceIdentifier<?>, DataObject> entry : puts) {
330 // validating the DataObject
332 Status status = validate(container, (NodeFlow) entry);
333 if (!status.isSuccess()) {
334 logger.warn("Invalid Configuration for flow {}. The failure is {}", entry, status.getDescription());
335 String error = "Invalid Configuration (" + status.getDescription() + ")";
340 if (flowEntryExists((NodeFlow) entry)) {
341 String error = "Entry with this name on specified table already exists";
342 logger.warn("Entry with this name on specified table already exists: {}", entry);
346 if (originalSwView.containsKey(entry)) {
347 logger.warn("Operation Rejected: A flow with same match and priority exists on the target node");
348 logger.trace("Aborting to install {}", entry);
351 if (!FRMUtil.validateMatch((NodeFlow) entry)) {
352 logger.error("Not a valid Match");
355 if (!FRMUtil.validateInstructions((NodeFlow) entry)) {
356 logger.error("Not a valid Instruction");
359 if (entry.getValue() instanceof Flow) {
360 Flow flow = (Flow) entry.getValue();
361 preparePutEntry(entry.getKey(), flow);
366 // removals = modification.getRemovedConfigurationData();
367 Set<InstanceIdentifier<?>> removedData = modification.getRemovedConfigurationData();
368 for (InstanceIdentifier<?> removal : removedData) {
369 DataObject value = modification.getOriginalConfigurationData().get(removal);
370 if (value instanceof Flow) {
371 removals.put(removal, (Flow) value);
377 private void preparePutEntry(InstanceIdentifier<?> key, Flow flow) {
378 Flow original = originalSwView.get(key);
379 if (original != null) {
380 // It is update for us
381 System.out.println("Coming update in FlowDatacommitHandler");
382 updates.put(key, flow);
384 // It is addition for us
385 System.out.println("Coming add in FlowDatacommitHandler");
386 additions.put(key, flow);
391 * We are OK to go with execution of plan
395 public RpcResult<Void> finish() throws IllegalStateException {
397 commitToPlugin(this);
398 // We return true if internal transaction is successful.
399 // return Rpcs.getRpcResult(true, null, Collections.emptySet());
400 return Rpcs.getRpcResult(true, null, null);
405 * We should rollback our preparation
409 public RpcResult<Void> rollback() throws IllegalStateException {
410 // NOOP - we did not modified any internal state during
411 // requestCommit phase
412 // return Rpcs.getRpcResult(true, null, Collections.emptySet());
413 return Rpcs.getRpcResult(true, null, null);
417 public Status validate(IContainer container, NodeFlow dataObject) {
419 // container validation
422 String containerName = (container == null) ? GlobalConstants.DEFAULT.toString() : container.getName();
423 ISwitchManager switchManager = (ISwitchManager) ServiceHelper.getInstance(ISwitchManager.class,
424 containerName, this);
425 // flow Name validation
426 if (dataObject.getFlowName() == null || dataObject.getFlowName().trim().isEmpty()
427 || !dataObject.getFlowName().matches(NAMEREGEX)) {
428 return new Status(StatusCode.BADREQUEST, "Invalid Flow name");
431 if (dataObject.getNode() == null) {
432 return new Status(StatusCode.BADREQUEST, "Node is null");
435 if (switchManager != null) {
436 for (Switch device : switchManager.getNetworkDevices()) {
437 node = (Node) device.getNode();
438 if (device.getNode().equals(dataObject.getNode())) {
444 return new Status(StatusCode.BADREQUEST, String.format("Node %s not found", node));
447 logger.debug("switchmanager is not set yet");
450 if (dataObject.getPriority() != null) {
451 if (dataObject.getPriority() < 0 || dataObject.getPriority() > 65535) {
452 return new Status(StatusCode.BADREQUEST, String.format("priority %s is not in the range 0 - 65535",
453 dataObject.getPriority()));
457 return new Status(StatusCode.SUCCESS);
460 private boolean flowEntryExists(NodeFlow config) {
461 // Flow name has to be unique on per table id basis
462 for (ConcurrentMap.Entry<FlowKey, Flow> entry : originalSwView.entrySet()) {
463 if (entry.getValue().getFlowName().equals(config.getFlowName())
464 && entry.getValue().getTableId().equals(config.getTableId())) {
472 final class FlowEventListener implements SalFlowListener {
474 List<FlowAdded> addedFlows = new ArrayList<>();
475 List<FlowRemoved> removedFlows = new ArrayList<>();
476 List<FlowUpdated> updatedFlows = new ArrayList<>();
479 public void onFlowAdded(FlowAdded notification) {
480 System.out.println("added flow..........................");
481 addedFlows.add(notification);
485 public void onFlowRemoved(FlowRemoved notification) {
486 removedFlows.add(notification);
490 public void onFlowUpdated(FlowUpdated notification) {
491 updatedFlows.add(notification);
495 public void onSwitchFlowRemoved(SwitchFlowRemoved notification) {
500 public void onNodeErrorNotification(NodeErrorNotification notification) {
501 // TODO Auto-generated method stub
506 public void onNodeExperimenterErrorNotification(
507 NodeExperimenterErrorNotification notification) {
508 // TODO Auto-generated method stub
514 // Commented out DataChangeListene - to be used by Stats
516 // final class FlowDataListener implements DataChangeListener {
517 // private SalFlowService flowService;
519 // public FlowDataListener() {
524 // public void onDataChanged(
525 // DataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
526 // System.out.println("Coming in onDataChange..............");
527 // @SuppressWarnings("unchecked")
528 // Collection<DataObject> additions = (Collection<DataObject>)
529 // change.getCreatedConfigurationData();
530 // // we can check for getCreated, getDeleted or getUpdated from DataChange
532 // for (DataObject dataObject : additions) {
533 // if (dataObject instanceof NodeFlow) {
534 // NodeRef nodeOne = createNodeRef("foo:node:1");
535 // // validating the dataObject here
536 // AddFlowInputBuilder input = new AddFlowInputBuilder();
537 // input.setNode(((NodeFlow) dataObject).getNode());
538 // input.setNode(nodeOne);
539 // // input.setPriority(((NodeFlow) dataObject).getPriority());
540 // //input.setMatch(((NodeFlow) dataObject).getMatch());
541 // //input.setFlowTable(((NodeFlow) dataObject).getFlowTable());
542 // //input.setCookie(((NodeFlow) dataObject).getCookie());
543 // //input.setAction(((NodeFlow) dataObject).getAction());
545 // @SuppressWarnings("unused")
546 // Future<RpcResult<java.lang.Void>> result =
547 // flowService.addFlow(input.build());
553 private static void updateLocalDatabase(NodeFlow entry, boolean add) {
555 updateSwViewes(entry, add);
557 updateNodeFlowsDB(entry, add);
562 * Update the node mapped flows database
564 private static void updateSwViewes(NodeFlow entry, boolean add) {
566 FlowConsumerImpl.originalSwView.put((FlowKey) entry, (Flow) entry);
567 installedSwView.put((FlowKey) entry, (Flow) entry);
569 originalSwView.remove(entry);
570 installedSwView.remove(entry);
576 public List<DataObject> get() {
578 List<DataObject> orderedList = new ArrayList<DataObject>();
579 ConcurrentMap<Integer, Flow> flowMap = staticFlows;
580 int maxKey = staticFlowsOrdinal.get(0).intValue();
581 for (int i = 0; i <= maxKey; i++) {
582 Flow entry = flowMap.get(i);
584 orderedList.add(entry);
591 public DataObject getWithName(String name, org.opendaylight.controller.sal.core.Node n) {
592 if (this instanceof FlowConsumerImpl) {
593 for (ConcurrentMap.Entry<Integer, Flow> flowEntry : staticFlows.entrySet()) {
594 Flow flow = flowEntry.getValue();
595 if (flow.getNode().equals(n) && flow.getFlowName().equals(name)) {
597 return flowEntry.getValue();
605 * Update the node mapped flows database
607 private static void updateNodeFlowsDB(NodeFlow entry, boolean add) {
608 Node node = (Node) entry.getNode();
610 List<Flow> nodeIndeces = nodeFlows.get(node);
611 if (nodeIndeces == null) {
615 nodeIndeces = new ArrayList<Flow>();
620 nodeIndeces.add((Flow) entry);
622 nodeIndeces.remove(entry);
625 // Update cache across cluster
626 if (nodeIndeces.isEmpty()) {
627 nodeFlows.remove(node);
629 nodeFlows.put(node, nodeIndeces);
633 private static NodeRef createNodeRef(String string) {
634 NodeKey key = new NodeKey(new NodeId(string));
635 InstanceIdentifier<Node> path = InstanceIdentifier.builder().node(Nodes.class).node(Node.class, key)
638 return new NodeRef(path);