1 package org.opendaylight.controller.forwardingrulesmanager_mdsal.consumer.impl;
3 import java.util.ArrayList;
4 import java.util.EnumSet;
5 import java.util.HashMap;
8 import java.util.Map.Entry;
10 import java.util.concurrent.ConcurrentHashMap;
11 import java.util.concurrent.ConcurrentMap;
12 import java.util.concurrent.Future;
14 import org.opendaylight.controller.clustering.services.CacheConfigException;
15 import org.opendaylight.controller.clustering.services.CacheExistException;
16 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
17 import org.opendaylight.controller.clustering.services.IClusterServices;
18 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
19 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction;
20 import org.opendaylight.controller.md.sal.common.api.data.DataModification;
21 import org.opendaylight.controller.sal.common.util.Rpcs;
22 import org.opendaylight.controller.sal.core.IContainer;
23 import org.opendaylight.controller.sal.utils.GlobalConstants;
24 import org.opendaylight.controller.sal.utils.ServiceHelper;
25 import org.opendaylight.controller.sal.utils.Status;
26 import org.opendaylight.controller.sal.utils.StatusCode;
27 import org.opendaylight.controller.switchmanager.ISwitchManager;
28 import org.opendaylight.controller.switchmanager.Switch;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.config.rev130819.Flows;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.config.rev130819.flows.Flow;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.config.rev130819.flows.FlowKey;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowAdded;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowUpdated;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeFlow;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInputBuilder;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowListener;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SwitchFlowRemoved;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.instruction.list.Instruction;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
49 import org.opendaylight.yangtools.concepts.Registration;
50 import org.opendaylight.yangtools.yang.binding.DataObject;
51 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
52 import org.opendaylight.yangtools.yang.binding.NotificationListener;
53 import org.opendaylight.yangtools.yang.common.RpcResult;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
57 public class FlowConsumerImpl {
58 protected static final Logger logger = LoggerFactory.getLogger(FlowConsumerImpl.class);
59 private FlowEventListener flowEventListener = new FlowEventListener();
60 private Registration<NotificationListener> listener1Reg;
61 private SalFlowService flowService;
62 // private FlowDataListener listener;
63 private FlowDataCommitHandler commitHandler;
64 private static ConcurrentHashMap<FlowKey, Flow> originalSwView;
65 private static ConcurrentMap<FlowKey, Flow> installedSwView;
66 private IClusterContainerServices clusterContainerService = null;
67 private IContainer container;
68 private static final String NAMEREGEX = "^[a-zA-Z0-9]+$";
69 private static ConcurrentMap<Integer, Flow> staticFlows;
70 private static ConcurrentMap<Integer, Integer> staticFlowsOrdinal;
72 * Inactive flow list. This is for the global instance of FRM It will
73 * contain all the flow entries which were installed on the global container
74 * when the first container is created.
76 private static ConcurrentMap<FlowKey, Flow> inactiveFlows;
79 * /* Per node indexing
81 private static ConcurrentMap<Node, List<Flow>> nodeFlows;
82 private boolean inContainerMode; // being used by global instance only
84 public FlowConsumerImpl() {
85 InstanceIdentifier<? extends DataObject> path = InstanceIdentifier.builder().node(Flows.class).toInstance();
86 flowService = FRMConsumerImpl.getProviderSession().getRpcService(SalFlowService.class);
88 if (null == flowService) {
89 logger.error("Consumer SAL Service is down or NULL. FRM may not function as intended");
90 System.out.println("Consumer SAL Service is down or NULL.");
94 // listener = new FlowDataListener();
97 // FRMConsumerImpl.getDataBrokerService().registerDataChangeListener(path,
99 // logger.error("Failed to listen on flow data modifcation events");
100 // System.out.println("Consumer SAL Service is down or NULL.");
105 listener1Reg = FRMConsumerImpl.getNotificationService().registerNotificationListener(flowEventListener);
107 if (null == listener1Reg) {
108 logger.error("Listener to listen on flow data modifcation events");
109 System.out.println("Consumer SAL Service is down or NULL.");
113 System.out.println("-------------------------------------------------------------------");
115 commitHandler = new FlowDataCommitHandler();
116 FRMConsumerImpl.getDataProviderService().registerCommitHandler(path, commitHandler);
117 clusterContainerService = (IClusterContainerServices) ServiceHelper.getGlobalInstance(
118 IClusterContainerServices.class, this);
119 container = (IContainer) ServiceHelper.getGlobalInstance(IContainer.class, this);
121 * If we are not the first cluster node to come up, do not initialize
122 * the static flow entries ordinal
124 if (staticFlowsOrdinal.size() == 0) {
125 staticFlowsOrdinal.put(0, Integer.valueOf(0));
129 private void allocateCaches() {
131 if (this.clusterContainerService == null) {
132 logger.warn("Un-initialized clusterContainerService, can't create cache");
137 clusterContainerService.createCache("frm.originalSwView",
138 EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
139 clusterContainerService.createCache("frm.installedSwView",
140 EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
141 clusterContainerService
142 .createCache("frm.staticFlows", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
143 clusterContainerService.createCache("frm.staticFlowsOrdinal",
144 EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
145 clusterContainerService.createCache("frm.inactiveFlows",
146 EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
147 clusterContainerService.createCache("frm.nodeFlows", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
148 clusterContainerService.createCache("frm.groupFlows", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
149 } catch (CacheConfigException cce) {
150 logger.error("CacheConfigException");
151 } catch (CacheExistException cce) {
152 logger.error("CacheExistException");
156 private void addFlowTest() {
158 NodeRef nodeOne = createNodeRef("foo:node:1");
159 AddFlowInputBuilder input1 = new AddFlowInputBuilder();
161 input1.setNode(nodeOne);
162 AddFlowInput firstMsg = input1.build();
164 if (null != flowService) {
165 System.out.println(flowService.toString());
167 System.out.println("ConsumerFlowService is NULL");
169 @SuppressWarnings("unused")
170 Future<RpcResult<AddFlowOutput>> result1 = flowService.addFlow(firstMsg);
172 } catch (Exception e) {
173 // TODO Auto-generated catch block
179 * Adds flow to the southbound plugin and our internal database
184 private void addFlow(InstanceIdentifier<?> path, Flow dataObject) {
186 AddFlowInputBuilder input = new AddFlowInputBuilder();
187 List<Instruction> inst = (dataObject).getInstructions().getInstruction();
188 input.setNode((dataObject).getNode());
189 input.setPriority((dataObject).getPriority());
190 input.setMatch((dataObject).getMatch());
191 input.setCookie((dataObject).getCookie());
192 input.setInstructions((dataObject).getInstructions());
193 dataObject.getMatch().getLayer3Match();
194 for (int i = 0; i < inst.size(); i++) {
195 System.out.println("i = " + i + inst.get(i).getInstruction().toString());
196 System.out.println("i = " + i + inst.get(i).toString());
199 System.out.println("Instruction list" + (dataObject).getInstructions().getInstruction().toString());
201 // updating the staticflow cache
202 Integer ordinal = staticFlowsOrdinal.get(0);
203 staticFlowsOrdinal.put(0, ++ordinal);
204 staticFlows.put(ordinal, (Flow) dataObject);
206 // We send flow to the sounthbound plugin
207 flowService.addFlow(input.build());
208 updateLocalDatabase((NodeFlow) dataObject, true);
212 * Removes flow to the southbound plugin and our internal database
217 private void removeFlow(InstanceIdentifier<?> path, Flow dataObject) {
219 RemoveFlowInputBuilder input = new RemoveFlowInputBuilder();
220 List<Instruction> inst = (dataObject).getInstructions().getInstruction();
221 input.setNode((dataObject).getNode());
222 input.setPriority((dataObject).getPriority());
223 input.setMatch((dataObject).getMatch());
224 input.setCookie((dataObject).getCookie());
225 input.setInstructions((dataObject).getInstructions());
226 dataObject.getMatch().getLayer3Match();
227 for (int i = 0; i < inst.size(); i++) {
228 System.out.println("i = " + i + inst.get(i).getInstruction().toString());
229 System.out.println("i = " + i + inst.get(i).toString());
232 System.out.println("Instruction list" + (dataObject).getInstructions().getInstruction().toString());
234 // updating the staticflow cache
235 Integer ordinal = staticFlowsOrdinal.get(0);
236 staticFlowsOrdinal.put(0, ++ordinal);
237 staticFlows.put(ordinal, dataObject);
239 // We send flow to the sounthbound plugin
240 flowService.removeFlow(input.build());
241 updateLocalDatabase((NodeFlow) dataObject, false);
244 @SuppressWarnings("unchecked")
245 private void commitToPlugin(internalTransaction transaction) {
246 for (Entry<InstanceIdentifier<?>, Flow> entry : transaction.additions.entrySet()) {
247 System.out.println("Coming add cc in FlowDatacommitHandler");
248 addFlow(entry.getKey(), entry.getValue());
250 for (@SuppressWarnings("unused")
251 Entry<InstanceIdentifier<?>, Flow> entry : transaction.updates.entrySet()) {
252 System.out.println("Coming update cc in FlowDatacommitHandler");
253 // updateFlow(entry.getKey(),entry.getValue());
256 for (Entry<InstanceIdentifier<?>, Flow> entry : transaction.removals.entrySet()) {
257 System.out.println("Coming remove cc in FlowDatacommitHandler");
258 removeFlow(entry.getKey(), entry.getValue());
263 private final class FlowDataCommitHandler implements DataCommitHandler<InstanceIdentifier<?>, DataObject> {
265 @SuppressWarnings("unchecked")
267 public DataCommitTransaction requestCommit(DataModification<InstanceIdentifier<?>, DataObject> modification) {
268 // We should verify transaction
269 System.out.println("Coming in FlowDatacommitHandler");
270 internalTransaction transaction = new internalTransaction(modification);
271 transaction.prepareUpdate();
276 private final class internalTransaction implements DataCommitTransaction<InstanceIdentifier<?>, DataObject> {
278 private final DataModification<InstanceIdentifier<?>, DataObject> modification;
281 public DataModification<InstanceIdentifier<?>, DataObject> getModification() {
285 public internalTransaction(DataModification<InstanceIdentifier<?>, DataObject> modification) {
286 this.modification = modification;
289 Map<InstanceIdentifier<?>, Flow> additions = new HashMap<>();
290 Map<InstanceIdentifier<?>, Flow> updates = new HashMap<>();
291 Map<InstanceIdentifier<?>, Flow> removals = new HashMap<>();
294 * We create a plan which flows will be added, which will be updated and
295 * which will be removed based on our internal state.
298 void prepareUpdate() {
300 Set<Entry<InstanceIdentifier<?>, DataObject>> puts = modification.getUpdatedConfigurationData().entrySet();
301 for (Entry<InstanceIdentifier<?>, DataObject> entry : puts) {
303 // validating the DataObject
305 Status status = validate(container, (NodeFlow) entry);
306 if (!status.isSuccess()) {
307 logger.warn("Invalid Configuration for flow {}. The failure is {}", entry, status.getDescription());
308 String error = "Invalid Configuration (" + status.getDescription() + ")";
313 if (flowEntryExists((NodeFlow) entry)) {
314 String error = "Entry with this name on specified table already exists";
315 logger.warn("Entry with this name on specified table already exists: {}", entry);
319 if (originalSwView.containsKey((FlowKey) entry)) {
320 logger.warn("Operation Rejected: A flow with same match and priority exists on the target node");
321 logger.trace("Aborting to install {}", entry);
324 if (!FRMUtil.validateMatch((NodeFlow) entry)) {
325 logger.error("Not a valid Match");
328 if (!FRMUtil.validateInstructions((NodeFlow) entry)) {
329 logger.error("Not a valid Instruction");
332 if (entry.getValue() instanceof Flow) {
333 Flow flow = (Flow) entry.getValue();
334 preparePutEntry(entry.getKey(), flow);
339 // removals = modification.getRemovedConfigurationData();
340 Set<InstanceIdentifier<?>> removedData = modification.getRemovedConfigurationData();
341 for (InstanceIdentifier<?> removal : removedData) {
342 DataObject value = modification.getOriginalConfigurationData().get(removal);
343 if (value instanceof Flow) {
344 removals.put(removal, (Flow) value);
350 private void preparePutEntry(InstanceIdentifier<?> key, Flow flow) {
351 Flow original = originalSwView.get(key);
352 if (original != null) {
353 // It is update for us
354 System.out.println("Coming update in FlowDatacommitHandler");
355 updates.put(key, flow);
357 // It is addition for us
358 System.out.println("Coming add in FlowDatacommitHandler");
359 additions.put(key, flow);
364 * We are OK to go with execution of plan
368 public RpcResult<Void> finish() throws IllegalStateException {
370 commitToPlugin(this);
371 // We return true if internal transaction is successful.
372 // return Rpcs.getRpcResult(true, null, Collections.emptySet());
373 return Rpcs.getRpcResult(true, null, null);
378 * We should rollback our preparation
382 public RpcResult<Void> rollback() throws IllegalStateException {
383 // NOOP - we did not modified any internal state during
384 // requestCommit phase
385 // return Rpcs.getRpcResult(true, null, Collections.emptySet());
386 return Rpcs.getRpcResult(true, null, null);
390 public Status validate(IContainer container, NodeFlow dataObject) {
392 // container validation
395 String containerName = (container == null) ? GlobalConstants.DEFAULT.toString() : container.getName();
396 ISwitchManager switchManager = (ISwitchManager) ServiceHelper.getInstance(ISwitchManager.class,
397 containerName, this);
398 // flow Name validation
399 if (dataObject.getFlowName() == null || dataObject.getFlowName().trim().isEmpty()
400 || !dataObject.getFlowName().matches(NAMEREGEX)) {
401 return new Status(StatusCode.BADREQUEST, "Invalid Flow name");
404 if (dataObject.getNode() == null) {
405 return new Status(StatusCode.BADREQUEST, "Node is null");
408 if (switchManager != null) {
409 for (Switch device : switchManager.getNetworkDevices()) {
410 node = (Node) device.getNode();
411 if (device.getNode().equals(dataObject.getNode())) {
417 return new Status(StatusCode.BADREQUEST, String.format("Node %s not found", node));
420 logger.debug("switchmanager is not set yet");
423 if (dataObject.getPriority() != null) {
424 if (dataObject.getPriority() < 0 || dataObject.getPriority() > 65535) {
425 return new Status(StatusCode.BADREQUEST, String.format("priority %s is not in the range 0 - 65535",
426 dataObject.getPriority()));
430 return new Status(StatusCode.SUCCESS);
433 private boolean flowEntryExists(NodeFlow config) {
434 // Flow name has to be unique on per table id basis
435 for (ConcurrentMap.Entry<FlowKey, Flow> entry : originalSwView.entrySet()) {
436 if (entry.getValue().getFlowName().equals(config.getFlowName())
437 && entry.getValue().getTableId().equals(config.getTableId())) {
445 final class FlowEventListener implements SalFlowListener {
447 List<FlowAdded> addedFlows = new ArrayList<>();
448 List<FlowRemoved> removedFlows = new ArrayList<>();
449 List<FlowUpdated> updatedFlows = new ArrayList<>();
452 public void onFlowAdded(FlowAdded notification) {
453 System.out.println("added flow..........................");
454 addedFlows.add(notification);
458 public void onFlowRemoved(FlowRemoved notification) {
459 removedFlows.add(notification);
463 public void onFlowUpdated(FlowUpdated notification) {
464 updatedFlows.add(notification);
468 public void onSwitchFlowRemoved(SwitchFlowRemoved notification) {
474 // Commented out DataChangeListene - to be used by Stats
476 // final class FlowDataListener implements DataChangeListener {
477 // private SalFlowService flowService;
479 // public FlowDataListener() {
484 // public void onDataChanged(
485 // DataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
486 // System.out.println("Coming in onDataChange..............");
487 // @SuppressWarnings("unchecked")
488 // Collection<DataObject> additions = (Collection<DataObject>)
489 // change.getCreatedConfigurationData();
490 // // we can check for getCreated, getDeleted or getUpdated from DataChange
492 // for (DataObject dataObject : additions) {
493 // if (dataObject instanceof NodeFlow) {
494 // NodeRef nodeOne = createNodeRef("foo:node:1");
495 // // validating the dataObject here
496 // AddFlowInputBuilder input = new AddFlowInputBuilder();
497 // input.setNode(((NodeFlow) dataObject).getNode());
498 // input.setNode(nodeOne);
499 // // input.setPriority(((NodeFlow) dataObject).getPriority());
500 // //input.setMatch(((NodeFlow) dataObject).getMatch());
501 // //input.setFlowTable(((NodeFlow) dataObject).getFlowTable());
502 // //input.setCookie(((NodeFlow) dataObject).getCookie());
503 // //input.setAction(((NodeFlow) dataObject).getAction());
505 // @SuppressWarnings("unused")
506 // Future<RpcResult<java.lang.Void>> result =
507 // flowService.addFlow(input.build());
513 private static void updateLocalDatabase(NodeFlow entry, boolean add) {
515 updateSwViewes(entry, add);
517 updateNodeFlowsDB(entry, add);
522 * Update the node mapped flows database
524 private static void updateSwViewes(NodeFlow entry, boolean add) {
526 FlowConsumerImpl.originalSwView.put((FlowKey) entry, (Flow) entry);
527 installedSwView.put((FlowKey) entry, (Flow) entry);
529 originalSwView.remove((Flow) entry);
530 installedSwView.remove((FlowKey) entry);
536 * Update the node mapped flows database
538 private static void updateNodeFlowsDB(NodeFlow entry, boolean add) {
539 Node node = (Node) entry.getNode();
541 List<Flow> nodeIndeces = nodeFlows.get(node);
542 if (nodeIndeces == null) {
546 nodeIndeces = new ArrayList<Flow>();
551 nodeIndeces.add((Flow) entry);
553 nodeIndeces.remove((Flow) entry);
556 // Update cache across cluster
557 if (nodeIndeces.isEmpty()) {
558 nodeFlows.remove(node);
560 nodeFlows.put(node, nodeIndeces);
564 private static NodeRef createNodeRef(String string) {
565 NodeKey key = new NodeKey(new NodeId(string));
566 InstanceIdentifier<Node> path = InstanceIdentifier.builder().node(Nodes.class).node(Node.class, key)
569 return new NodeRef(path);