1 package org.opendaylight.controller.forwardingrulesmanager.consumer.impl;
3 import java.util.ArrayList;
4 import java.util.Collections;
5 import java.util.EnumSet;
6 import java.util.HashMap;
7 import java.util.HashSet;
10 import java.util.Map.Entry;
12 import java.util.concurrent.ConcurrentHashMap;
13 import java.util.concurrent.ConcurrentMap;
14 import java.util.concurrent.Future;
16 import org.opendaylight.controller.clustering.services.CacheConfigException;
17 import org.opendaylight.controller.clustering.services.CacheExistException;
18 import org.opendaylight.controller.clustering.services.IClusterContainerServices;
19 import org.opendaylight.controller.clustering.services.IClusterServices;
20 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
21 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction;
22 import org.opendaylight.controller.md.sal.common.api.data.DataModification;
23 import org.opendaylight.controller.sal.common.util.Rpcs;
24 import org.opendaylight.controller.sal.core.IContainer;
25 import org.opendaylight.controller.sal.utils.ServiceHelper;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.config.rev130819.Flows;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.config.rev130819.flows.Flow;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.config.rev130819.flows.FlowKey;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowAdded;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowUpdated;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeErrorNotification;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeExperimenterErrorNotification;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeFlow;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInputBuilder;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowListener;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SwitchFlowRemoved;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowInputBuilder;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.flow.update.UpdatedFlowBuilder;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
49 import org.opendaylight.yangtools.concepts.Registration;
50 import org.opendaylight.yangtools.yang.binding.DataObject;
51 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
52 import org.opendaylight.yangtools.yang.binding.NotificationListener;
53 import org.opendaylight.yangtools.yang.common.RpcError;
54 import org.opendaylight.yangtools.yang.common.RpcResult;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
58 public class FlowConsumerImpl implements IForwardingRulesManager {
59 protected static final Logger logger = LoggerFactory.getLogger(FlowConsumerImpl.class);
60 private final FlowEventListener flowEventListener = new FlowEventListener();
61 private Registration<NotificationListener> listener1Reg;
62 private SalFlowService flowService;
63 // private FlowDataListener listener;
64 private FlowDataCommitHandler commitHandler;
65 private static ConcurrentHashMap<FlowKey, Flow> originalSwView;
66 private static ConcurrentMap<FlowKey, Flow> installedSwView;
67 private IClusterContainerServices clusterContainerService = null;
68 private IContainer container;
69 private static final String NAMEREGEX = "^[a-zA-Z0-9]+$";
70 private static ConcurrentMap<Integer, Flow> staticFlows;
71 private static ConcurrentMap<Integer, Integer> staticFlowsOrdinal = new ConcurrentHashMap<Integer, Integer>();
73 * Inactive flow list. This is for the global instance of FRM It will
74 * contain all the flow entries which were installed on the global container
75 * when the first container is created.
77 private static ConcurrentMap<FlowKey, Flow> inactiveFlows;
80 * /* Per node indexing
82 private static ConcurrentMap<Node, List<Flow>> nodeFlows;
83 private boolean inContainerMode; // being used by global instance only
85 public FlowConsumerImpl() {
86 InstanceIdentifier<? extends DataObject> path = InstanceIdentifier.builder(Flows.class).toInstance();
87 flowService = FRMConsumerImpl.getProviderSession().getRpcService(SalFlowService.class);
89 if (null == flowService) {
90 logger.error("Consumer SAL Service is down or NULL. FRM may not function as intended");
94 // listener = new FlowDataListener();
97 // FRMConsumerImpl.getDataBrokerService().registerDataChangeListener(path,
99 // logger.error("Failed to listen on flow data modifcation events");
100 // System.out.println("Consumer SAL Service is down or NULL.");
105 listener1Reg = FRMConsumerImpl.getNotificationService().registerNotificationListener(flowEventListener);
107 if (null == listener1Reg) {
108 logger.error("Listener to listen on flow data modifcation events");
112 commitHandler = new FlowDataCommitHandler();
113 FRMConsumerImpl.getDataProviderService().registerCommitHandler(path, commitHandler);
114 clusterContainerService = (IClusterContainerServices) ServiceHelper.getGlobalInstance(
115 IClusterContainerServices.class, this);
118 * If we are not the first cluster node to come up, do not initialize
119 * the static flow entries ordinal
121 if (staticFlowsOrdinal.size() == 0) {
122 staticFlowsOrdinal.put(0, Integer.valueOf(0));
126 private void allocateCaches() {
128 if (this.clusterContainerService == null) {
129 logger.warn("Un-initialized clusterContainerService, can't create cache");
134 clusterContainerService.createCache("frm.originalSwView",
135 EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
136 clusterContainerService.createCache("frm.installedSwView",
137 EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
138 clusterContainerService
139 .createCache("frm.staticFlows", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
140 clusterContainerService.createCache("frm.staticFlowsOrdinal",
141 EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
142 clusterContainerService.createCache("frm.inactiveFlows",
143 EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
144 clusterContainerService.createCache("frm.nodeFlows", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
145 clusterContainerService.createCache("frm.groupFlows", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
146 } catch (CacheConfigException cce) {
147 logger.error("CacheConfigException");
148 } catch (CacheExistException cce) {
149 logger.error("CacheExistException");
153 private void addFlowTest() {
155 NodeRef nodeOne = createNodeRef("foo:node:1");
156 AddFlowInputBuilder input1 = new AddFlowInputBuilder();
158 input1.setNode(nodeOne);
159 AddFlowInput firstMsg = input1.build();
161 if (null == flowService) {
162 logger.error("ConsumerFlowService is NULL");
164 @SuppressWarnings("unused")
165 Future<RpcResult<AddFlowOutput>> result1 = flowService.addFlow(firstMsg);
167 } catch (Exception e) {
168 // TODO Auto-generated catch block
174 * Adds flow to the southbound plugin and our internal database
179 private void addFlow(InstanceIdentifier<?> path, Flow dataObject) {
181 AddFlowInputBuilder input = new AddFlowInputBuilder();
183 input.setNode((dataObject).getNode());
184 input.setPriority((dataObject).getPriority());
185 input.setMatch((dataObject).getMatch());
186 input.setCookie((dataObject).getCookie());
187 input.setInstructions((dataObject).getInstructions());
188 input.setBufferId(dataObject.getBufferId());
189 input.setTableId(dataObject.getTableId());
190 input.setOutPort(dataObject.getOutPort());
191 input.setOutGroup(dataObject.getOutGroup());
192 input.setIdleTimeout(dataObject.getIdleTimeout());
193 input.setHardTimeout(dataObject.getHardTimeout());
194 input.setFlowName(dataObject.getFlowName());
195 input.setFlags(dataObject.getFlags());
196 input.setCookieMask(dataObject.getCookieMask());
197 input.setContainerName(dataObject.getContainerName());
198 input.setBarrier(dataObject.isBarrier());
199 input.setInstallHw(dataObject.isInstallHw());
200 input.setStrict(dataObject.isStrict());
202 // updating the staticflow cache
204 * Commented out... as in many other places... use of ClusteringServices
205 * is breaking things insufficient time to debug Integer ordinal =
206 * staticFlowsOrdinal.get(0); staticFlowsOrdinal.put(0, ++ordinal);
207 * staticFlows.put(ordinal, dataObject);
210 // We send flow to the sounthbound plugin
212 flowService.addFlow(input.build());
215 * Commented out as this will also break due to improper use of
216 * ClusteringServices updateLocalDatabase((NodeFlow) dataObject, true);
221 * Removes flow to the southbound plugin and our internal database
226 private void removeFlow(InstanceIdentifier<?> path, Flow dataObject) {
228 RemoveFlowInputBuilder input = new RemoveFlowInputBuilder();
229 input.setNode((dataObject).getNode());
230 input.setPriority((dataObject).getPriority());
231 input.setMatch((dataObject).getMatch());
232 input.setCookie((dataObject).getCookie());
233 input.setInstructions((dataObject).getInstructions());
234 input.setBufferId(dataObject.getBufferId());
235 input.setTableId(dataObject.getTableId());
236 input.setOutPort(dataObject.getOutPort());
237 input.setOutGroup(dataObject.getOutGroup());
238 input.setIdleTimeout(dataObject.getIdleTimeout());
239 input.setHardTimeout(dataObject.getHardTimeout());
240 input.setFlowName(dataObject.getFlowName());
241 input.setFlags(dataObject.getFlags());
242 input.setCookieMask(dataObject.getCookieMask());
243 input.setContainerName(dataObject.getContainerName());
244 input.setBarrier(dataObject.isBarrier());
245 input.setInstallHw(dataObject.isInstallHw());
246 input.setStrict(dataObject.isStrict());
247 // updating the staticflow cache
249 * Commented out due to problems caused by improper use of
250 * ClusteringServices Integer ordinal = staticFlowsOrdinal.get(0);
251 * staticFlowsOrdinal.put(0, ++ordinal); staticFlows.put(ordinal,
255 // We send flow to the sounthbound plugin
256 flowService.removeFlow(input.build());
259 * Commented out due to problems caused by improper use of
260 * ClusteringServices updateLocalDatabase((NodeFlow) dataObject, false);
265 * Update flow to the southbound plugin and our internal database
270 private void updateFlow(InstanceIdentifier<?> path, Flow dataObject) {
272 UpdateFlowInputBuilder input = new UpdateFlowInputBuilder();
273 UpdatedFlowBuilder updatedflowbuilder = new UpdatedFlowBuilder();
274 updatedflowbuilder.fieldsFrom(dataObject);
275 input.setNode(dataObject.getNode());
276 input.setUpdatedFlow(updatedflowbuilder.build());
278 // updating the staticflow cache
280 * Commented out due to problems caused by improper use of
281 * ClusteringServices. Integer ordinal = staticFlowsOrdinal.get(0);
282 * staticFlowsOrdinal.put(0, ++ordinal); staticFlows.put(ordinal,
286 // We send flow to the sounthbound plugin
287 flowService.updateFlow(input.build());
290 * Commented out due to problems caused by improper use of
291 * ClusteringServices. updateLocalDatabase((NodeFlow) dataObject, true);
295 @SuppressWarnings("unchecked")
296 private void commitToPlugin(internalTransaction transaction) {
297 Set<Entry<InstanceIdentifier<?>, DataObject>> createdEntries = transaction.getModification()
298 .getCreatedConfigurationData().entrySet();
301 * This little dance is because updatedEntries contains both created and
302 * modified entries The reason I created a new HashSet is because the
303 * collections we are returned are immutable.
305 Set<Entry<InstanceIdentifier<?>, DataObject>> updatedEntries = new HashSet<Entry<InstanceIdentifier<?>, DataObject>>();
306 updatedEntries.addAll(transaction.getModification().getUpdatedConfigurationData().entrySet());
307 updatedEntries.removeAll(createdEntries);
309 Set<InstanceIdentifier<?>> removeEntriesInstanceIdentifiers = transaction.getModification()
310 .getRemovedConfigurationData();
311 transaction.getModification().getOriginalConfigurationData();
312 for (Entry<InstanceIdentifier<?>, DataObject> entry : createdEntries) {
313 if (entry.getValue() instanceof Flow) {
314 logger.debug("Coming add cc in FlowDatacommitHandler");
315 Flow flow = (Flow) entry.getValue();
316 boolean status = validate(flow);
320 addFlow(entry.getKey(), (Flow) entry.getValue());
323 for (@SuppressWarnings("unused")
324 Entry<InstanceIdentifier<?>, DataObject> entry : updatedEntries) {
325 if (entry.getValue() instanceof Flow) {
326 logger.debug("Coming update cc in FlowDatacommitHandler");
327 Flow flow = (Flow) entry.getValue();
328 boolean status = validate(flow);
332 updateFlow(entry.getKey(), (Flow) entry.getValue());
336 for (InstanceIdentifier<?> instanceId : removeEntriesInstanceIdentifiers) {
337 DataObject removeValue = transaction.getModification().getOriginalConfigurationData().get(instanceId);
338 if (removeValue instanceof Flow) {
339 logger.debug("Coming remove cc in FlowDatacommitHandler");
340 Flow flow = (Flow) removeValue;
341 boolean status = validate(flow);
345 removeFlow(instanceId, (Flow) removeValue);
352 private final class FlowDataCommitHandler implements DataCommitHandler<InstanceIdentifier<?>, DataObject> {
354 @SuppressWarnings("unchecked")
356 public DataCommitTransaction requestCommit(DataModification<InstanceIdentifier<?>, DataObject> modification) {
357 // We should verify transaction
358 logger.debug("Coming in FlowDatacommitHandler");
359 internalTransaction transaction = new internalTransaction(modification);
360 transaction.prepareUpdate();
365 private final class internalTransaction implements DataCommitTransaction<InstanceIdentifier<?>, DataObject> {
367 private final DataModification<InstanceIdentifier<?>, DataObject> modification;
370 public DataModification<InstanceIdentifier<?>, DataObject> getModification() {
374 public internalTransaction(DataModification<InstanceIdentifier<?>, DataObject> modification) {
375 this.modification = modification;
378 Map<InstanceIdentifier<?>, Flow> additions = new HashMap<>();
379 Map<InstanceIdentifier<?>, Flow> updates = new HashMap<>();
380 Map<InstanceIdentifier<?>, Flow> removals = new HashMap<>();
383 * We create a plan which flows will be added, which will be updated and
384 * which will be removed based on our internal state.
387 void prepareUpdate() {
389 Set<Entry<InstanceIdentifier<?>, DataObject>> puts = modification.getUpdatedConfigurationData().entrySet();
390 for (Entry<InstanceIdentifier<?>, DataObject> entry : puts) {
393 // removals = modification.getRemovedConfigurationData();
394 Set<InstanceIdentifier<?>> removedData = modification.getRemovedConfigurationData();
395 for (InstanceIdentifier<?> removal : removedData) {
396 DataObject value = modification.getOriginalConfigurationData().get(removal);
397 if (value instanceof Flow) {
398 removals.put(removal, (Flow) value);
404 private void preparePutEntry(InstanceIdentifier<?> key, Flow flow) {
405 Flow original = originalSwView.get(key);
406 if (original != null) {
407 // It is update for us
408 updates.put(key, flow);
410 // It is addition for us
411 additions.put(key, flow);
416 * We are OK to go with execution of plan
420 public RpcResult<Void> finish() throws IllegalStateException {
422 commitToPlugin(this);
423 // We return true if internal transaction is successful.
424 // return Rpcs.getRpcResult(true, null, Collections.emptySet());
425 return Rpcs.getRpcResult(true, null, Collections.<RpcError> emptySet());
430 * We should rollback our preparation
434 public RpcResult<Void> rollback() throws IllegalStateException {
435 // NOOP - we did not modified any internal state during
436 // requestCommit phase
437 // return Rpcs.getRpcResult(true, null, Collections.emptySet());
438 return Rpcs.getRpcResult(true, null, Collections.<RpcError> emptySet());
442 private boolean flowEntryExists(Flow flow) {
443 // Flow name has to be unique on per table id basis
444 for (ConcurrentMap.Entry<FlowKey, Flow> entry : originalSwView.entrySet()) {
445 if (entry.getValue().getFlowName().equals(flow.getFlowName())
446 && entry.getValue().getTableId().equals(flow.getTableId())) {
454 final class FlowEventListener implements SalFlowListener {
456 List<FlowAdded> addedFlows = new ArrayList<>();
457 List<FlowRemoved> removedFlows = new ArrayList<>();
458 List<FlowUpdated> updatedFlows = new ArrayList<>();
461 public void onFlowAdded(FlowAdded notification) {
462 addedFlows.add(notification);
466 public void onFlowRemoved(FlowRemoved notification) {
467 removedFlows.add(notification);
471 public void onFlowUpdated(FlowUpdated notification) {
472 updatedFlows.add(notification);
476 public void onNodeErrorNotification(NodeErrorNotification notification) {
477 // TODO Auto-generated method stub
482 public void onNodeExperimenterErrorNotification(NodeExperimenterErrorNotification notification) {
483 // TODO Auto-generated method stub
488 public void onSwitchFlowRemoved(SwitchFlowRemoved notification) {
489 // TODO Auto-generated method stub
495 // Commented out DataChangeListene - to be used by Stats
497 // final class FlowDataListener implements DataChangeListener {
498 // private SalFlowService flowService;
500 // public FlowDataListener() {
505 // public void onDataChanged(
506 // DataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
507 // System.out.println("Coming in onDataChange..............");
508 // @SuppressWarnings("unchecked")
509 // Collection<DataObject> additions = (Collection<DataObject>)
510 // change.getCreatedConfigurationData();
511 // // we can check for getCreated, getDeleted or getUpdated from DataChange
513 // for (DataObject dataObject : additions) {
514 // if (dataObject instanceof NodeFlow) {
515 // NodeRef nodeOne = createNodeRef("foo:node:1");
516 // // validating the dataObject here
517 // AddFlowInputBuilder input = new AddFlowInputBuilder();
518 // input.setNode(((NodeFlow) dataObject).getNode());
519 // input.setNode(nodeOne);
520 // // input.setPriority(((NodeFlow) dataObject).getPriority());
521 // //input.setMatch(((NodeFlow) dataObject).getMatch());
522 // //input.setFlowTable(((NodeFlow) dataObject).getFlowTable());
523 // //input.setCookie(((NodeFlow) dataObject).getCookie());
524 // //input.setAction(((NodeFlow) dataObject).getAction());
526 // @SuppressWarnings("unused")
527 // Future<RpcResult<java.lang.Void>> result =
528 // flowService.addFlow(input.build());
534 public boolean validate(Flow flow) {
536 String msg = ""; // Specific part of warn/error log
538 boolean result = true;
539 // flow Name validation
540 if (flow.getFlowName() == null || flow.getFlowName().trim().isEmpty() || !flow.getFlowName().matches(NAMEREGEX)) {
541 msg = "Invalid Flow name";
545 if (result == true && flow.getNode() == null) {
546 msg = "Node is null";
550 // TODO: Validate we are seeking to program a flow against a valid
553 if (result == true && flow.getPriority() != null) {
554 if (flow.getPriority() < 0 || flow.getPriority() > 65535) {
555 msg = String.format("priority %s is not in the range 0 - 65535", flow.getPriority());
562 * This is breaking due to some improper use of caches...
564 * if (flowEntryExists(flow)) { String error =
565 * "Entry with this name on specified table already exists";
567 * "Entry with this name on specified table already exists: {}" ,
568 * entry); logger.error(error); return; } if
569 * (originalSwView.containsKey(entry)) { logger.warn(
570 * "Operation Rejected: A flow with same match and priority exists on the target node"
571 * ); logger.trace("Aborting to install {}", entry); continue; }
573 if (!FRMUtil.validateMatch(flow)) {
574 logger.error("Not a valid Match");
577 if (!FRMUtil.validateInstructions(flow)) {
578 logger.error("Not a valid Instruction");
581 if (result == false) {
582 logger.warn("Invalid Configuration for flow {}. The failure is {}", flow, msg);
583 logger.error("Invalid Configuration ({})", msg);
588 private static void updateLocalDatabase(NodeFlow entry, boolean add) {
590 updateSwViewes(entry, add);
592 updateNodeFlowsDB(entry, add);
597 * Update the node mapped flows database
599 private static void updateSwViewes(NodeFlow entry, boolean add) {
601 FlowConsumerImpl.originalSwView.put((FlowKey) entry, (Flow) entry);
602 installedSwView.put((FlowKey) entry, (Flow) entry);
604 originalSwView.remove(entry);
605 installedSwView.remove(entry);
611 public List<DataObject> get() {
613 List<DataObject> orderedList = new ArrayList<DataObject>();
614 ConcurrentMap<Integer, Flow> flowMap = staticFlows;
615 int maxKey = staticFlowsOrdinal.get(0).intValue();
616 for (int i = 0; i <= maxKey; i++) {
617 Flow entry = flowMap.get(i);
619 orderedList.add(entry);
626 public DataObject getWithName(String name, org.opendaylight.controller.sal.core.Node n) {
627 if (this instanceof FlowConsumerImpl) {
628 for (ConcurrentMap.Entry<Integer, Flow> flowEntry : staticFlows.entrySet()) {
629 Flow flow = flowEntry.getValue();
630 if (flow.getNode().equals(n) && flow.getFlowName().equals(name)) {
632 return flowEntry.getValue();
640 * Update the node mapped flows database
642 private static void updateNodeFlowsDB(NodeFlow entry, boolean add) {
643 Node node = (Node) entry.getNode();
645 List<Flow> nodeIndeces = nodeFlows.get(node);
646 if (nodeIndeces == null) {
650 nodeIndeces = new ArrayList<Flow>();
655 nodeIndeces.add((Flow) entry);
657 nodeIndeces.remove(entry);
660 // Update cache across cluster
661 if (nodeIndeces.isEmpty()) {
662 nodeFlows.remove(node);
664 nodeFlows.put(node, nodeIndeces);
668 private static NodeRef createNodeRef(String string) {
669 NodeKey key = new NodeKey(new NodeId(string));
670 InstanceIdentifier<Node> path = InstanceIdentifier.builder().node(Nodes.class).node(Node.class, key)
673 return new NodeRef(path);