1 package org.opendaylight.controller.forwardingrulesmanager.consumer.impl;
3 import java.util.ArrayList;
4 import java.util.Collections;
5 import java.util.EnumSet;
6 import java.util.HashMap;
7 import java.util.HashSet;
10 import java.util.Map.Entry;
12 import java.util.concurrent.ConcurrentHashMap;
13 import java.util.concurrent.ConcurrentMap;
14 import java.util.concurrent.Future;
16 import org.opendaylight.controller.clustering.services.CacheConfigException;
17 import org.opendaylight.controller.clustering.services.CacheExistException;
18 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
19 import org.opendaylight.controller.clustering.services.IClusterServices;
20 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
21 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction;
22 import org.opendaylight.controller.md.sal.common.api.data.DataModification;
23 import org.opendaylight.controller.sal.common.util.Rpcs;
24 import org.opendaylight.controller.sal.core.IContainer;
25 import org.opendaylight.controller.sal.utils.ServiceHelper;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.config.rev130819.Flows;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.config.rev130819.flows.Flow;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.config.rev130819.flows.FlowKey;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowAdded;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowUpdated;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeErrorNotification;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeExperimenterErrorNotification;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeFlow;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInputBuilder;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowListener;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SwitchFlowRemoved;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowInputBuilder;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.flow.update.UpdatedFlowBuilder;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.instruction.list.Instruction;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
50 import org.opendaylight.yangtools.concepts.Registration;
51 import org.opendaylight.yangtools.yang.binding.DataObject;
52 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
53 import org.opendaylight.yangtools.yang.binding.NotificationListener;
54 import org.opendaylight.yangtools.yang.common.RpcError;
55 import org.opendaylight.yangtools.yang.common.RpcResult;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
59 public class FlowConsumerImpl implements IForwardingRulesManager {
60 protected static final Logger logger = LoggerFactory.getLogger(FlowConsumerImpl.class);
61 private final FlowEventListener flowEventListener = new FlowEventListener();
62 private Registration<NotificationListener> listener1Reg;
63 private SalFlowService flowService;
64 // private FlowDataListener listener;
65 private FlowDataCommitHandler commitHandler;
66 private static ConcurrentHashMap<FlowKey, Flow> originalSwView;
67 private static ConcurrentMap<FlowKey, Flow> installedSwView;
68 private IClusterContainerServices clusterContainerService = null;
69 private IContainer container;
70 private static final String NAMEREGEX = "^[a-zA-Z0-9]+$";
71 private static ConcurrentMap<Integer, Flow> staticFlows;
72 private static ConcurrentMap<Integer, Integer> staticFlowsOrdinal = new ConcurrentHashMap<Integer, Integer>();
74 * Inactive flow list. This is for the global instance of FRM It will
75 * contain all the flow entries which were installed on the global container
76 * when the first container is created.
78 private static ConcurrentMap<FlowKey, Flow> inactiveFlows;
81 * /* Per node indexing
83 private static ConcurrentMap<Node, List<Flow>> nodeFlows;
84 private boolean inContainerMode; // being used by global instance only
86 public FlowConsumerImpl() {
87 InstanceIdentifier<? extends DataObject> path = InstanceIdentifier.builder(Flows.class).toInstance();
88 flowService = FRMConsumerImpl.getProviderSession().getRpcService(SalFlowService.class);
90 if (null == flowService) {
91 logger.error("Consumer SAL Service is down or NULL. FRM may not function as intended");
92 System.out.println("Consumer SAL Service is down or NULL.");
96 // listener = new FlowDataListener();
99 // FRMConsumerImpl.getDataBrokerService().registerDataChangeListener(path,
101 // logger.error("Failed to listen on flow data modifcation events");
102 // System.out.println("Consumer SAL Service is down or NULL.");
107 listener1Reg = FRMConsumerImpl.getNotificationService().registerNotificationListener(flowEventListener);
109 if (null == listener1Reg) {
110 logger.error("Listener to listen on flow data modifcation events");
111 System.out.println("Consumer SAL Service is down or NULL.");
115 System.out.println("-------------------------------------------------------------------");
116 commitHandler = new FlowDataCommitHandler();
117 FRMConsumerImpl.getDataProviderService().registerCommitHandler(path, commitHandler);
118 clusterContainerService = (IClusterContainerServices) ServiceHelper.getGlobalInstance(
119 IClusterContainerServices.class, this);
122 * If we are not the first cluster node to come up, do not initialize
123 * the static flow entries ordinal
125 if (staticFlowsOrdinal.size() == 0) {
126 staticFlowsOrdinal.put(0, Integer.valueOf(0));
130 private void allocateCaches() {
132 if (this.clusterContainerService == null) {
133 logger.warn("Un-initialized clusterContainerService, can't create cache");
138 clusterContainerService.createCache("frm.originalSwView",
139 EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
140 clusterContainerService.createCache("frm.installedSwView",
141 EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
142 clusterContainerService
143 .createCache("frm.staticFlows", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
144 clusterContainerService.createCache("frm.staticFlowsOrdinal",
145 EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
146 clusterContainerService.createCache("frm.inactiveFlows",
147 EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
148 clusterContainerService.createCache("frm.nodeFlows", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
149 clusterContainerService.createCache("frm.groupFlows", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
150 } catch (CacheConfigException cce) {
151 logger.error("CacheConfigException");
152 } catch (CacheExistException cce) {
153 logger.error("CacheExistException");
157 private void addFlowTest() {
159 NodeRef nodeOne = createNodeRef("foo:node:1");
160 AddFlowInputBuilder input1 = new AddFlowInputBuilder();
162 input1.setNode(nodeOne);
163 AddFlowInput firstMsg = input1.build();
165 if (null != flowService) {
166 System.out.println(flowService.toString());
168 System.out.println("ConsumerFlowService is NULL");
170 @SuppressWarnings("unused")
171 Future<RpcResult<AddFlowOutput>> result1 = flowService.addFlow(firstMsg);
173 } catch (Exception e) {
174 // TODO Auto-generated catch block
180 * Adds flow to the southbound plugin and our internal database
185 private void addFlow(InstanceIdentifier<?> path, Flow dataObject) {
187 AddFlowInputBuilder input = new AddFlowInputBuilder();
188 List<Instruction> inst = (dataObject).getInstructions().getInstruction();
189 input.setNode((dataObject).getNode());
190 input.setPriority((dataObject).getPriority());
191 input.setMatch((dataObject).getMatch());
192 input.setCookie((dataObject).getCookie());
193 input.setInstructions((dataObject).getInstructions());
194 dataObject.getMatch().getLayer3Match();
195 for (int i = 0; i < inst.size(); i++) {
196 System.out.println("i = " + i + inst.get(i).getInstruction().toString());
197 System.out.println("i = " + i + inst.get(i).toString());
200 System.out.println("Instruction list" + (dataObject).getInstructions().getInstruction().toString());
202 // updating the staticflow cache
204 * Commented out... as in many other places... use of ClusteringServices is breaking things
205 * insufficient time to debug
206 Integer ordinal = staticFlowsOrdinal.get(0);
207 staticFlowsOrdinal.put(0, ++ordinal);
208 staticFlows.put(ordinal, dataObject);
211 // We send flow to the sounthbound plugin
212 flowService.addFlow(input.build());
214 * Commented out as this will also break due to improper use of ClusteringServices
215 updateLocalDatabase((NodeFlow) dataObject, true);
220 * Removes flow to the southbound plugin and our internal database
225 private void removeFlow(InstanceIdentifier<?> path, Flow dataObject) {
227 RemoveFlowInputBuilder input = new RemoveFlowInputBuilder();
228 List<Instruction> inst = (dataObject).getInstructions().getInstruction();
229 input.setNode((dataObject).getNode());
230 input.setPriority((dataObject).getPriority());
231 input.setMatch((dataObject).getMatch());
232 input.setCookie((dataObject).getCookie());
233 input.setInstructions((dataObject).getInstructions());
234 dataObject.getMatch().getLayer3Match();
235 for (int i = 0; i < inst.size(); i++) {
236 System.out.println("i = " + i + inst.get(i).getInstruction().toString());
237 System.out.println("i = " + i + inst.get(i).toString());
240 System.out.println("Instruction list" + (dataObject).getInstructions().getInstruction().toString());
242 // updating the staticflow cache
244 * Commented out due to problems caused by improper use of ClusteringServices
245 Integer ordinal = staticFlowsOrdinal.get(0);
246 staticFlowsOrdinal.put(0, ++ordinal);
247 staticFlows.put(ordinal, dataObject);
250 // We send flow to the sounthbound plugin
251 flowService.removeFlow(input.build());
253 * Commented out due to problems caused by improper use of ClusteringServices
254 updateLocalDatabase((NodeFlow) dataObject, false);
259 * Update flow to the southbound plugin and our internal database
264 private void updateFlow(InstanceIdentifier<?> path, Flow dataObject) {
266 UpdateFlowInputBuilder input = new UpdateFlowInputBuilder();
267 UpdatedFlowBuilder updatedflowbuilder = new UpdatedFlowBuilder();
268 updatedflowbuilder.fieldsFrom(dataObject);
269 input.setNode(dataObject.getNode());
270 input.setUpdatedFlow(updatedflowbuilder.build());
272 // updating the staticflow cache
274 * Commented out due to problems caused by improper use of ClusteringServices.
275 Integer ordinal = staticFlowsOrdinal.get(0);
276 staticFlowsOrdinal.put(0, ++ordinal);
277 staticFlows.put(ordinal, dataObject);
280 // We send flow to the sounthbound plugin
281 flowService.updateFlow(input.build());
283 * Commented out due to problems caused by improper use of ClusteringServices.
284 updateLocalDatabase((NodeFlow) dataObject, true);
288 @SuppressWarnings("unchecked")
289 private void commitToPlugin(internalTransaction transaction) {
290 Set<Entry<InstanceIdentifier<?>, DataObject>> createdEntries = transaction.getModification().getCreatedConfigurationData().entrySet();
293 * This little dance is because updatedEntries contains both created and modified entries
294 * The reason I created a new HashSet is because the collections we are returned are immutable.
296 Set<Entry<InstanceIdentifier<?>, DataObject>> updatedEntries = new HashSet<Entry<InstanceIdentifier<?>, DataObject>>();
297 updatedEntries.addAll(transaction.getModification().getUpdatedConfigurationData().entrySet());
298 updatedEntries.removeAll(createdEntries);
300 Set<InstanceIdentifier<?>> removeEntriesInstanceIdentifiers = transaction.getModification().getRemovedConfigurationData();
301 transaction.getModification().getOriginalConfigurationData();
302 for (Entry<InstanceIdentifier<?>, DataObject> entry : createdEntries) {
303 if(entry.getValue() instanceof Flow) {
304 System.out.println("Coming add cc in FlowDatacommitHandler");
305 addFlow(entry.getKey(), (Flow) entry.getValue());
308 for (@SuppressWarnings("unused")
309 Entry<InstanceIdentifier<?>, DataObject> entry : updatedEntries) {
310 if(entry.getValue() instanceof Flow) {
311 System.out.println("Coming update cc in FlowDatacommitHandler");
312 updateFlow(entry.getKey(), (Flow) entry.getValue());
316 for (InstanceIdentifier<?> instanceId : removeEntriesInstanceIdentifiers ) {
317 DataObject removeValue = transaction.getModification().getOriginalConfigurationData().get(instanceId);
318 if(removeValue instanceof Flow) {
319 System.out.println("Coming remove cc in FlowDatacommitHandler");
320 removeFlow(instanceId, (Flow) removeValue);
327 private final class FlowDataCommitHandler implements DataCommitHandler<InstanceIdentifier<?>, DataObject> {
329 @SuppressWarnings("unchecked")
331 public DataCommitTransaction requestCommit(DataModification<InstanceIdentifier<?>, DataObject> modification) {
332 // We should verify transaction
333 System.out.println("Coming in FlowDatacommitHandler");
334 internalTransaction transaction = new internalTransaction(modification);
335 transaction.prepareUpdate();
340 private final class internalTransaction implements DataCommitTransaction<InstanceIdentifier<?>, DataObject> {
342 private final DataModification<InstanceIdentifier<?>, DataObject> modification;
345 public DataModification<InstanceIdentifier<?>, DataObject> getModification() {
349 public internalTransaction(DataModification<InstanceIdentifier<?>, DataObject> modification) {
350 this.modification = modification;
353 Map<InstanceIdentifier<?>, Flow> additions = new HashMap<>();
354 Map<InstanceIdentifier<?>, Flow> updates = new HashMap<>();
355 Map<InstanceIdentifier<?>, Flow> removals = new HashMap<>();
358 * We create a plan which flows will be added, which will be updated and
359 * which will be removed based on our internal state.
362 void prepareUpdate() {
364 Set<Entry<InstanceIdentifier<?>, DataObject>> puts = modification.getUpdatedConfigurationData().entrySet();
365 for (Entry<InstanceIdentifier<?>, DataObject> entry : puts) {
367 // validating the DataObject
368 DataObject value = entry.getValue();
369 if(value instanceof Flow ) {
370 Flow flow = (Flow)value;
371 boolean status = validate(flow);
377 * This is breaking due to some improper use of caches...
379 if (flowEntryExists(flow)) {
380 String error = "Entry with this name on specified table already exists";
381 logger.warn("Entry with this name on specified table already exists: {}", entry);
385 if (originalSwView.containsKey(entry)) {
386 logger.warn("Operation Rejected: A flow with same match and priority exists on the target node");
387 logger.trace("Aborting to install {}", entry);
391 if (!FRMUtil.validateMatch(flow)) {
392 logger.error("Not a valid Match");
395 if (!FRMUtil.validateInstructions(flow)) {
396 logger.error("Not a valid Instruction");
400 * Commented out due to Clustering Services issues
401 * preparePutEntry(entry.getKey(), flow);
406 // removals = modification.getRemovedConfigurationData();
407 Set<InstanceIdentifier<?>> removedData = modification.getRemovedConfigurationData();
408 for (InstanceIdentifier<?> removal : removedData) {
409 DataObject value = modification.getOriginalConfigurationData().get(removal);
410 if (value instanceof Flow) {
411 removals.put(removal, (Flow) value);
417 private void preparePutEntry(InstanceIdentifier<?> key, Flow flow) {
418 Flow original = originalSwView.get(key);
419 if (original != null) {
420 // It is update for us
421 System.out.println("Coming update in FlowDatacommitHandler");
422 updates.put(key, flow);
424 // It is addition for us
425 System.out.println("Coming add in FlowDatacommitHandler");
426 additions.put(key, flow);
431 * We are OK to go with execution of plan
435 public RpcResult<Void> finish() throws IllegalStateException {
437 commitToPlugin(this);
438 // We return true if internal transaction is successful.
439 // return Rpcs.getRpcResult(true, null, Collections.emptySet());
440 return Rpcs.getRpcResult(true, null, Collections.<RpcError>emptySet());
445 * We should rollback our preparation
449 public RpcResult<Void> rollback() throws IllegalStateException {
450 // NOOP - we did not modified any internal state during
451 // requestCommit phase
452 // return Rpcs.getRpcResult(true, null, Collections.emptySet());
453 return Rpcs.getRpcResult(true, null, Collections.<RpcError>emptySet());
457 public boolean validate(Flow flow) {
459 String msg = ""; // Specific part of warn/error log
461 boolean result = true;
462 // flow Name validation
463 if (flow.getFlowName() == null || flow.getFlowName().trim().isEmpty()
464 || !flow.getFlowName().matches(NAMEREGEX)) {
465 msg = "Invalid Flow name";
469 if (result == true && flow.getNode() == null) {
470 msg = "Node is null";
474 // TODO: Validate we are seeking to program a flow against a valid Node
476 if (result == true && flow.getPriority() != null) {
477 if (flow.getPriority() < 0 || flow.getPriority() > 65535) {
478 msg = String.format("priority %s is not in the range 0 - 65535",
483 if (result == false) {
484 logger.warn("Invalid Configuration for flow {}. The failure is {}",flow,msg);
485 logger.error("Invalid Configuration ({})",msg);
490 private boolean flowEntryExists(Flow flow) {
491 // Flow name has to be unique on per table id basis
492 for (ConcurrentMap.Entry<FlowKey, Flow> entry : originalSwView.entrySet()) {
493 if (entry.getValue().getFlowName().equals(flow.getFlowName())
494 && entry.getValue().getTableId().equals(flow.getTableId())) {
502 final class FlowEventListener implements SalFlowListener {
504 List<FlowAdded> addedFlows = new ArrayList<>();
505 List<FlowRemoved> removedFlows = new ArrayList<>();
506 List<FlowUpdated> updatedFlows = new ArrayList<>();
509 public void onFlowAdded(FlowAdded notification) {
510 System.out.println("added flow..........................");
511 addedFlows.add(notification);
515 public void onFlowRemoved(FlowRemoved notification) {
516 removedFlows.add(notification);
520 public void onFlowUpdated(FlowUpdated notification) {
521 updatedFlows.add(notification);
525 public void onSwitchFlowRemoved(SwitchFlowRemoved notification) {
530 public void onNodeErrorNotification(NodeErrorNotification notification) {
531 // TODO Auto-generated method stub
536 public void onNodeExperimenterErrorNotification(NodeExperimenterErrorNotification notification) {
537 // TODO Auto-generated method stub
543 // Commented out DataChangeListene - to be used by Stats
545 // final class FlowDataListener implements DataChangeListener {
546 // private SalFlowService flowService;
548 // public FlowDataListener() {
553 // public void onDataChanged(
554 // DataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
555 // System.out.println("Coming in onDataChange..............");
556 // @SuppressWarnings("unchecked")
557 // Collection<DataObject> additions = (Collection<DataObject>)
558 // change.getCreatedConfigurationData();
559 // // we can check for getCreated, getDeleted or getUpdated from DataChange
561 // for (DataObject dataObject : additions) {
562 // if (dataObject instanceof NodeFlow) {
563 // NodeRef nodeOne = createNodeRef("foo:node:1");
564 // // validating the dataObject here
565 // AddFlowInputBuilder input = new AddFlowInputBuilder();
566 // input.setNode(((NodeFlow) dataObject).getNode());
567 // input.setNode(nodeOne);
568 // // input.setPriority(((NodeFlow) dataObject).getPriority());
569 // //input.setMatch(((NodeFlow) dataObject).getMatch());
570 // //input.setFlowTable(((NodeFlow) dataObject).getFlowTable());
571 // //input.setCookie(((NodeFlow) dataObject).getCookie());
572 // //input.setAction(((NodeFlow) dataObject).getAction());
574 // @SuppressWarnings("unused")
575 // Future<RpcResult<java.lang.Void>> result =
576 // flowService.addFlow(input.build());
582 private static void updateLocalDatabase(NodeFlow entry, boolean add) {
584 updateSwViewes(entry, add);
586 updateNodeFlowsDB(entry, add);
591 * Update the node mapped flows database
593 private static void updateSwViewes(NodeFlow entry, boolean add) {
595 FlowConsumerImpl.originalSwView.put((FlowKey) entry, (Flow) entry);
596 installedSwView.put((FlowKey) entry, (Flow) entry);
598 originalSwView.remove(entry);
599 installedSwView.remove(entry);
605 public List<DataObject> get() {
607 List<DataObject> orderedList = new ArrayList<DataObject>();
608 ConcurrentMap<Integer, Flow> flowMap = staticFlows;
609 int maxKey = staticFlowsOrdinal.get(0).intValue();
610 for (int i = 0; i <= maxKey; i++) {
611 Flow entry = flowMap.get(i);
613 orderedList.add(entry);
620 public DataObject getWithName(String name, org.opendaylight.controller.sal.core.Node n) {
621 if (this instanceof FlowConsumerImpl) {
622 for (ConcurrentMap.Entry<Integer, Flow> flowEntry : staticFlows.entrySet()) {
623 Flow flow = flowEntry.getValue();
624 if (flow.getNode().equals(n) && flow.getFlowName().equals(name)) {
626 return flowEntry.getValue();
634 * Update the node mapped flows database
636 private static void updateNodeFlowsDB(NodeFlow entry, boolean add) {
637 Node node = (Node) entry.getNode();
639 List<Flow> nodeIndeces = nodeFlows.get(node);
640 if (nodeIndeces == null) {
644 nodeIndeces = new ArrayList<Flow>();
649 nodeIndeces.add((Flow) entry);
651 nodeIndeces.remove(entry);
654 // Update cache across cluster
655 if (nodeIndeces.isEmpty()) {
656 nodeFlows.remove(node);
658 nodeFlows.put(node, nodeIndeces);
662 private static NodeRef createNodeRef(String string) {
663 NodeKey key = new NodeKey(new NodeId(string));
664 InstanceIdentifier<Node> path = InstanceIdentifier.builder().node(Nodes.class).node(Node.class, key)
667 return new NodeRef(path);