1 package org.opendaylight.controller.forwardingrulesmanager.consumer.impl;
3 import java.util.ArrayList;
4 import java.util.Collections;
5 import java.util.EnumSet;
6 import java.util.HashMap;
7 import java.util.HashSet;
10 import java.util.Map.Entry;
12 import java.util.concurrent.ConcurrentHashMap;
13 import java.util.concurrent.ConcurrentMap;
14 import java.util.concurrent.Future;
16 import org.opendaylight.controller.clustering.services.CacheConfigException;
17 import org.opendaylight.controller.clustering.services.CacheExistException;
18 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
19 import org.opendaylight.controller.clustering.services.IClusterServices;
20 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
21 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction;
22 import org.opendaylight.controller.md.sal.common.api.data.DataModification;
23 import org.opendaylight.controller.sal.common.util.Rpcs;
24 import org.opendaylight.controller.sal.core.IContainer;
25 import org.opendaylight.controller.sal.utils.ServiceHelper;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.config.rev130819.Flows;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.config.rev130819.flows.Flow;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.config.rev130819.flows.FlowKey;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowAdded;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowUpdated;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeErrorNotification;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeExperimenterErrorNotification;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeFlow;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInputBuilder;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowListener;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SwitchFlowRemoved;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowInputBuilder;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.flow.update.UpdatedFlowBuilder;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.instruction.list.Instruction;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
50 import org.opendaylight.yangtools.concepts.Registration;
51 import org.opendaylight.yangtools.yang.binding.DataObject;
52 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
53 import org.opendaylight.yangtools.yang.binding.NotificationListener;
54 import org.opendaylight.yangtools.yang.common.RpcError;
55 import org.opendaylight.yangtools.yang.common.RpcResult;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
59 public class FlowConsumerImpl implements IForwardingRulesManager {
60 protected static final Logger logger = LoggerFactory.getLogger(FlowConsumerImpl.class);
61 private final FlowEventListener flowEventListener = new FlowEventListener();
62 private Registration<NotificationListener> listener1Reg;
63 private SalFlowService flowService;
64 // private FlowDataListener listener;
65 private FlowDataCommitHandler commitHandler;
66 private static ConcurrentHashMap<FlowKey, Flow> originalSwView;
67 private static ConcurrentMap<FlowKey, Flow> installedSwView;
68 private IClusterContainerServices clusterContainerService = null;
69 private IContainer container;
70 private static final String NAMEREGEX = "^[a-zA-Z0-9]+$";
71 private static ConcurrentMap<Integer, Flow> staticFlows;
72 private static ConcurrentMap<Integer, Integer> staticFlowsOrdinal = new ConcurrentHashMap<Integer, Integer>();
74 * Inactive flow list. This is for the global instance of FRM It will
75 * contain all the flow entries which were installed on the global container
76 * when the first container is created.
78 private static ConcurrentMap<FlowKey, Flow> inactiveFlows;
81 * /* Per node indexing
83 private static ConcurrentMap<Node, List<Flow>> nodeFlows;
84 private boolean inContainerMode; // being used by global instance only
86 public FlowConsumerImpl() {
87 InstanceIdentifier<? extends DataObject> path = InstanceIdentifier.builder(Flows.class).toInstance();
88 flowService = FRMConsumerImpl.getProviderSession().getRpcService(SalFlowService.class);
90 if (null == flowService) {
91 logger.error("Consumer SAL Service is down or NULL. FRM may not function as intended");
92 System.out.println("Consumer SAL Service is down or NULL.");
96 // listener = new FlowDataListener();
99 // FRMConsumerImpl.getDataBrokerService().registerDataChangeListener(path,
101 // logger.error("Failed to listen on flow data modifcation events");
102 // System.out.println("Consumer SAL Service is down or NULL.");
107 listener1Reg = FRMConsumerImpl.getNotificationService().registerNotificationListener(flowEventListener);
109 if (null == listener1Reg) {
110 logger.error("Listener to listen on flow data modifcation events");
111 System.out.println("Consumer SAL Service is down or NULL.");
115 System.out.println("-------------------------------------------------------------------");
116 commitHandler = new FlowDataCommitHandler();
117 FRMConsumerImpl.getDataProviderService().registerCommitHandler(path, commitHandler);
118 clusterContainerService = (IClusterContainerServices) ServiceHelper.getGlobalInstance(
119 IClusterContainerServices.class, this);
122 * If we are not the first cluster node to come up, do not initialize
123 * the static flow entries ordinal
125 if (staticFlowsOrdinal.size() == 0) {
126 staticFlowsOrdinal.put(0, Integer.valueOf(0));
130 private void allocateCaches() {
132 if (this.clusterContainerService == null) {
133 logger.warn("Un-initialized clusterContainerService, can't create cache");
138 clusterContainerService.createCache("frm.originalSwView",
139 EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
140 clusterContainerService.createCache("frm.installedSwView",
141 EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
142 clusterContainerService
143 .createCache("frm.staticFlows", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
144 clusterContainerService.createCache("frm.staticFlowsOrdinal",
145 EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
146 clusterContainerService.createCache("frm.inactiveFlows",
147 EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
148 clusterContainerService.createCache("frm.nodeFlows", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
149 clusterContainerService.createCache("frm.groupFlows", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
150 } catch (CacheConfigException cce) {
151 logger.error("CacheConfigException");
152 } catch (CacheExistException cce) {
153 logger.error("CacheExistException");
157 private void addFlowTest() {
159 NodeRef nodeOne = createNodeRef("foo:node:1");
160 AddFlowInputBuilder input1 = new AddFlowInputBuilder();
162 input1.setNode(nodeOne);
163 AddFlowInput firstMsg = input1.build();
165 if (null != flowService) {
166 System.out.println(flowService.toString());
168 System.out.println("ConsumerFlowService is NULL");
170 @SuppressWarnings("unused")
171 Future<RpcResult<AddFlowOutput>> result1 = flowService.addFlow(firstMsg);
173 } catch (Exception e) {
174 // TODO Auto-generated catch block
180 * Adds flow to the southbound plugin and our internal database
185 private void addFlow(InstanceIdentifier<?> path, Flow dataObject) {
187 AddFlowInputBuilder input = new AddFlowInputBuilder();
189 List<Instruction> inst = (dataObject).getInstructions().getInstruction();
190 input.setNode((dataObject).getNode());
191 input.setPriority((dataObject).getPriority());
192 input.setMatch((dataObject).getMatch());
193 input.setCookie((dataObject).getCookie());
194 input.setInstructions((dataObject).getInstructions());
195 dataObject.getMatch().getLayer3Match();
196 for (int i = 0; i < inst.size(); i++) {
197 System.out.println("i = " + i + inst.get(i).getInstruction().toString());
198 System.out.println("i = " + i + inst.get(i).toString());
201 System.out.println("Instruction list" + (dataObject).getInstructions().getInstruction().toString());
203 // updating the staticflow cache
205 * Commented out... as in many other places... use of ClusteringServices is breaking things
206 * insufficient time to debug
207 Integer ordinal = staticFlowsOrdinal.get(0);
208 staticFlowsOrdinal.put(0, ++ordinal);
209 staticFlows.put(ordinal, dataObject);
212 // We send flow to the sounthbound plugin
213 flowService.addFlow(input.build());
215 * Commented out as this will also break due to improper use of ClusteringServices
216 updateLocalDatabase((NodeFlow) dataObject, true);
221 * Removes flow to the southbound plugin and our internal database
226 private void removeFlow(InstanceIdentifier<?> path, Flow dataObject) {
228 RemoveFlowInputBuilder input = new RemoveFlowInputBuilder();
229 List<Instruction> inst = (dataObject).getInstructions().getInstruction();
230 input.setNode((dataObject).getNode());
231 input.setPriority((dataObject).getPriority());
232 input.setMatch((dataObject).getMatch());
233 input.setCookie((dataObject).getCookie());
234 input.setInstructions((dataObject).getInstructions());
235 dataObject.getMatch().getLayer3Match();
236 for (int i = 0; i < inst.size(); i++) {
237 System.out.println("i = " + i + inst.get(i).getInstruction().toString());
238 System.out.println("i = " + i + inst.get(i).toString());
241 System.out.println("Instruction list" + (dataObject).getInstructions().getInstruction().toString());
243 // updating the staticflow cache
245 * Commented out due to problems caused by improper use of ClusteringServices
246 Integer ordinal = staticFlowsOrdinal.get(0);
247 staticFlowsOrdinal.put(0, ++ordinal);
248 staticFlows.put(ordinal, dataObject);
251 // We send flow to the sounthbound plugin
252 flowService.removeFlow(input.build());
254 * Commented out due to problems caused by improper use of ClusteringServices
255 updateLocalDatabase((NodeFlow) dataObject, false);
260 * Update flow to the southbound plugin and our internal database
265 private void updateFlow(InstanceIdentifier<?> path, Flow dataObject) {
267 UpdateFlowInputBuilder input = new UpdateFlowInputBuilder();
268 UpdatedFlowBuilder updatedflowbuilder = new UpdatedFlowBuilder();
269 updatedflowbuilder.fieldsFrom(dataObject);
270 input.setNode(dataObject.getNode());
271 input.setUpdatedFlow(updatedflowbuilder.build());
273 // updating the staticflow cache
275 * Commented out due to problems caused by improper use of ClusteringServices.
276 Integer ordinal = staticFlowsOrdinal.get(0);
277 staticFlowsOrdinal.put(0, ++ordinal);
278 staticFlows.put(ordinal, dataObject);
281 // We send flow to the sounthbound plugin
282 flowService.updateFlow(input.build());
284 * Commented out due to problems caused by improper use of ClusteringServices.
285 updateLocalDatabase((NodeFlow) dataObject, true);
289 @SuppressWarnings("unchecked")
290 private void commitToPlugin(internalTransaction transaction) {
291 Set<Entry<InstanceIdentifier<?>, DataObject>> createdEntries = transaction.getModification().getCreatedConfigurationData().entrySet();
294 * This little dance is because updatedEntries contains both created and modified entries
295 * The reason I created a new HashSet is because the collections we are returned are immutable.
297 Set<Entry<InstanceIdentifier<?>, DataObject>> updatedEntries = new HashSet<Entry<InstanceIdentifier<?>, DataObject>>();
298 updatedEntries.addAll(transaction.getModification().getUpdatedConfigurationData().entrySet());
299 updatedEntries.removeAll(createdEntries);
301 Set<InstanceIdentifier<?>> removeEntriesInstanceIdentifiers = transaction.getModification().getRemovedConfigurationData();
302 transaction.getModification().getOriginalConfigurationData();
303 for (Entry<InstanceIdentifier<?>, DataObject> entry : createdEntries) {
304 if(entry.getValue() instanceof Flow) {
305 System.out.println("Coming add cc in FlowDatacommitHandler");
306 addFlow(entry.getKey(), (Flow) entry.getValue());
309 for (@SuppressWarnings("unused")
310 Entry<InstanceIdentifier<?>, DataObject> entry : updatedEntries) {
311 if(entry.getValue() instanceof Flow) {
312 System.out.println("Coming update cc in FlowDatacommitHandler");
313 updateFlow(entry.getKey(), (Flow) entry.getValue());
317 for (InstanceIdentifier<?> instanceId : removeEntriesInstanceIdentifiers ) {
318 DataObject removeValue = transaction.getModification().getOriginalConfigurationData().get(instanceId);
319 if(removeValue instanceof Flow) {
320 System.out.println("Coming remove cc in FlowDatacommitHandler");
321 removeFlow(instanceId, (Flow) removeValue);
328 private final class FlowDataCommitHandler implements DataCommitHandler<InstanceIdentifier<?>, DataObject> {
330 @SuppressWarnings("unchecked")
332 public DataCommitTransaction requestCommit(DataModification<InstanceIdentifier<?>, DataObject> modification) {
333 // We should verify transaction
334 System.out.println("Coming in FlowDatacommitHandler");
335 internalTransaction transaction = new internalTransaction(modification);
336 transaction.prepareUpdate();
341 private final class internalTransaction implements DataCommitTransaction<InstanceIdentifier<?>, DataObject> {
343 private final DataModification<InstanceIdentifier<?>, DataObject> modification;
346 public DataModification<InstanceIdentifier<?>, DataObject> getModification() {
350 public internalTransaction(DataModification<InstanceIdentifier<?>, DataObject> modification) {
351 this.modification = modification;
354 Map<InstanceIdentifier<?>, Flow> additions = new HashMap<>();
355 Map<InstanceIdentifier<?>, Flow> updates = new HashMap<>();
356 Map<InstanceIdentifier<?>, Flow> removals = new HashMap<>();
359 * We create a plan which flows will be added, which will be updated and
360 * which will be removed based on our internal state.
363 void prepareUpdate() {
365 Set<Entry<InstanceIdentifier<?>, DataObject>> puts = modification.getUpdatedConfigurationData().entrySet();
366 for (Entry<InstanceIdentifier<?>, DataObject> entry : puts) {
368 // validating the DataObject
369 DataObject value = entry.getValue();
370 if(value instanceof Flow ) {
371 Flow flow = (Flow)value;
372 boolean status = validate(flow);
378 * This is breaking due to some improper use of caches...
380 if (flowEntryExists(flow)) {
381 String error = "Entry with this name on specified table already exists";
382 logger.warn("Entry with this name on specified table already exists: {}", entry);
386 if (originalSwView.containsKey(entry)) {
387 logger.warn("Operation Rejected: A flow with same match and priority exists on the target node");
388 logger.trace("Aborting to install {}", entry);
392 if (!FRMUtil.validateMatch(flow)) {
393 logger.error("Not a valid Match");
396 if (!FRMUtil.validateInstructions(flow)) {
397 logger.error("Not a valid Instruction");
401 * Commented out due to Clustering Services issues
402 * preparePutEntry(entry.getKey(), flow);
407 // removals = modification.getRemovedConfigurationData();
408 Set<InstanceIdentifier<?>> removedData = modification.getRemovedConfigurationData();
409 for (InstanceIdentifier<?> removal : removedData) {
410 DataObject value = modification.getOriginalConfigurationData().get(removal);
411 if (value instanceof Flow) {
412 removals.put(removal, (Flow) value);
418 private void preparePutEntry(InstanceIdentifier<?> key, Flow flow) {
419 Flow original = originalSwView.get(key);
420 if (original != null) {
421 // It is update for us
422 System.out.println("Coming update in FlowDatacommitHandler");
423 updates.put(key, flow);
425 // It is addition for us
426 System.out.println("Coming add in FlowDatacommitHandler");
427 additions.put(key, flow);
432 * We are OK to go with execution of plan
436 public RpcResult<Void> finish() throws IllegalStateException {
438 commitToPlugin(this);
439 // We return true if internal transaction is successful.
440 // return Rpcs.getRpcResult(true, null, Collections.emptySet());
441 return Rpcs.getRpcResult(true, null, Collections.<RpcError>emptySet());
446 * We should rollback our preparation
450 public RpcResult<Void> rollback() throws IllegalStateException {
451 // NOOP - we did not modified any internal state during
452 // requestCommit phase
453 // return Rpcs.getRpcResult(true, null, Collections.emptySet());
454 return Rpcs.getRpcResult(true, null, Collections.<RpcError>emptySet());
458 public boolean validate(Flow flow) {
460 String msg = ""; // Specific part of warn/error log
462 boolean result = true;
463 // flow Name validation
464 if (flow.getFlowName() == null || flow.getFlowName().trim().isEmpty()
465 || !flow.getFlowName().matches(NAMEREGEX)) {
466 msg = "Invalid Flow name";
470 if (result == true && flow.getNode() == null) {
471 msg = "Node is null";
475 // TODO: Validate we are seeking to program a flow against a valid Node
477 if (result == true && flow.getPriority() != null) {
478 if (flow.getPriority() < 0 || flow.getPriority() > 65535) {
479 msg = String.format("priority %s is not in the range 0 - 65535",
484 if (result == false) {
485 logger.warn("Invalid Configuration for flow {}. The failure is {}",flow,msg);
486 logger.error("Invalid Configuration ({})",msg);
491 private boolean flowEntryExists(Flow flow) {
492 // Flow name has to be unique on per table id basis
493 for (ConcurrentMap.Entry<FlowKey, Flow> entry : originalSwView.entrySet()) {
494 if (entry.getValue().getFlowName().equals(flow.getFlowName())
495 && entry.getValue().getTableId().equals(flow.getTableId())) {
503 final class FlowEventListener implements SalFlowListener {
505 List<FlowAdded> addedFlows = new ArrayList<>();
506 List<FlowRemoved> removedFlows = new ArrayList<>();
507 List<FlowUpdated> updatedFlows = new ArrayList<>();
510 public void onFlowAdded(FlowAdded notification) {
511 System.out.println("added flow..........................");
512 addedFlows.add(notification);
516 public void onFlowRemoved(FlowRemoved notification) {
517 removedFlows.add(notification);
521 public void onFlowUpdated(FlowUpdated notification) {
522 updatedFlows.add(notification);
526 public void onSwitchFlowRemoved(SwitchFlowRemoved notification) {
531 public void onNodeErrorNotification(NodeErrorNotification notification) {
532 // TODO Auto-generated method stub
537 public void onNodeExperimenterErrorNotification(NodeExperimenterErrorNotification notification) {
538 // TODO Auto-generated method stub
544 // Commented out DataChangeListene - to be used by Stats
546 // final class FlowDataListener implements DataChangeListener {
547 // private SalFlowService flowService;
549 // public FlowDataListener() {
554 // public void onDataChanged(
555 // DataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
556 // System.out.println("Coming in onDataChange..............");
557 // @SuppressWarnings("unchecked")
558 // Collection<DataObject> additions = (Collection<DataObject>)
559 // change.getCreatedConfigurationData();
560 // // we can check for getCreated, getDeleted or getUpdated from DataChange
562 // for (DataObject dataObject : additions) {
563 // if (dataObject instanceof NodeFlow) {
564 // NodeRef nodeOne = createNodeRef("foo:node:1");
565 // // validating the dataObject here
566 // AddFlowInputBuilder input = new AddFlowInputBuilder();
567 // input.setNode(((NodeFlow) dataObject).getNode());
568 // input.setNode(nodeOne);
569 // // input.setPriority(((NodeFlow) dataObject).getPriority());
570 // //input.setMatch(((NodeFlow) dataObject).getMatch());
571 // //input.setFlowTable(((NodeFlow) dataObject).getFlowTable());
572 // //input.setCookie(((NodeFlow) dataObject).getCookie());
573 // //input.setAction(((NodeFlow) dataObject).getAction());
575 // @SuppressWarnings("unused")
576 // Future<RpcResult<java.lang.Void>> result =
577 // flowService.addFlow(input.build());
583 private static void updateLocalDatabase(NodeFlow entry, boolean add) {
585 updateSwViewes(entry, add);
587 updateNodeFlowsDB(entry, add);
592 * Update the node mapped flows database
594 private static void updateSwViewes(NodeFlow entry, boolean add) {
596 FlowConsumerImpl.originalSwView.put((FlowKey) entry, (Flow) entry);
597 installedSwView.put((FlowKey) entry, (Flow) entry);
599 originalSwView.remove(entry);
600 installedSwView.remove(entry);
606 public List<DataObject> get() {
608 List<DataObject> orderedList = new ArrayList<DataObject>();
609 ConcurrentMap<Integer, Flow> flowMap = staticFlows;
610 int maxKey = staticFlowsOrdinal.get(0).intValue();
611 for (int i = 0; i <= maxKey; i++) {
612 Flow entry = flowMap.get(i);
614 orderedList.add(entry);
621 public DataObject getWithName(String name, org.opendaylight.controller.sal.core.Node n) {
622 if (this instanceof FlowConsumerImpl) {
623 for (ConcurrentMap.Entry<Integer, Flow> flowEntry : staticFlows.entrySet()) {
624 Flow flow = flowEntry.getValue();
625 if (flow.getNode().equals(n) && flow.getFlowName().equals(name)) {
627 return flowEntry.getValue();
635 * Update the node mapped flows database
637 private static void updateNodeFlowsDB(NodeFlow entry, boolean add) {
638 Node node = (Node) entry.getNode();
640 List<Flow> nodeIndeces = nodeFlows.get(node);
641 if (nodeIndeces == null) {
645 nodeIndeces = new ArrayList<Flow>();
650 nodeIndeces.add((Flow) entry);
652 nodeIndeces.remove(entry);
655 // Update cache across cluster
656 if (nodeIndeces.isEmpty()) {
657 nodeFlows.remove(node);
659 nodeFlows.put(node, nodeIndeces);
663 private static NodeRef createNodeRef(String string) {
664 NodeKey key = new NodeKey(new NodeId(string));
665 InstanceIdentifier<Node> path = InstanceIdentifier.builder().node(Nodes.class).node(Node.class, key)
668 return new NodeRef(path);