import java.util.ArrayList;
import java.util.Collections;
-import java.util.EnumSet;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
-import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Future;
-
-import org.opendaylight.controller.clustering.services.CacheConfigException;
-import org.opendaylight.controller.clustering.services.CacheExistException;
-import org.opendaylight.controller.clustering.services.IClusterContainerServices;
-import org.opendaylight.controller.clustering.services.IClusterServices;
+
import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction;
import org.opendaylight.controller.md.sal.common.api.data.DataModification;
import org.opendaylight.controller.sal.common.util.Rpcs;
-import org.opendaylight.controller.sal.core.IContainer;
-import org.opendaylight.controller.sal.utils.ServiceHelper;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.config.rev130819.Flows;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.config.rev130819.flows.Flow;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.config.rev130819.flows.FlowKey;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowAdded;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowTableRef;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowUpdated;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeErrorNotification;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeExperimenterErrorNotification;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeFlow;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowListener;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SwitchFlowRemoved;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.flow.update.OriginalFlowBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.flow.update.UpdatedFlowBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.table.config.rev131024.Tables;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.table.config.rev131024.tables.Table;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.table.config.rev131024.tables.TableBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.table.config.rev131024.tables.TableKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableRef;
import org.opendaylight.yangtools.concepts.Registration;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class FlowConsumerImpl implements IForwardingRulesManager {
+public class FlowConsumerImpl {
protected static final Logger logger = LoggerFactory.getLogger(FlowConsumerImpl.class);
private final FlowEventListener flowEventListener = new FlowEventListener();
private Registration<NotificationListener> listener1Reg;
private SalFlowService flowService;
// private FlowDataListener listener;
- private FlowDataCommitHandler commitHandler;
- private static ConcurrentHashMap<FlowKey, Flow> originalSwView;
- private static ConcurrentMap<FlowKey, Flow> installedSwView;
- private IClusterContainerServices clusterContainerService = null;
- private IContainer container;
- private static final String NAMEREGEX = "^[a-zA-Z0-9]+$";
- private static ConcurrentMap<Integer, Flow> staticFlows;
- private static ConcurrentMap<Integer, Integer> staticFlowsOrdinal = new ConcurrentHashMap<Integer, Integer>();
- /*
- * Inactive flow list. This is for the global instance of FRM It will
- * contain all the flow entries which were installed on the global container
- * when the first container is created.
- */
- private static ConcurrentMap<FlowKey, Flow> inactiveFlows;
-
- /*
- * /* Per node indexing
- */
- private static ConcurrentMap<Node, List<Flow>> nodeFlows;
- private boolean inContainerMode; // being used by global instance only
+ private FlowDataCommitHandler commitHandler;
public FlowConsumerImpl() {
InstanceIdentifier<? extends DataObject> path = InstanceIdentifier.builder(Flows.class).toInstance();
logger.error("Consumer SAL Service is down or NULL. FRM may not function as intended");
return;
}
-
- // listener = new FlowDataListener();
-
- // if (null ==
- // FRMConsumerImpl.getDataBrokerService().registerDataChangeListener(path,
- // listener)) {
- // logger.error("Failed to listen on flow data modifcation events");
- // System.out.println("Consumer SAL Service is down or NULL.");
- // return;
- // }
-
+
// For switch events
listener1Reg = FRMConsumerImpl.getNotificationService().registerNotificationListener(flowEventListener);
}
// addFlowTest();
commitHandler = new FlowDataCommitHandler();
- FRMConsumerImpl.getDataProviderService().registerCommitHandler(path, commitHandler);
- clusterContainerService = (IClusterContainerServices) ServiceHelper.getGlobalInstance(
- IClusterContainerServices.class, this);
- allocateCaches();
- /*
- * If we are not the first cluster node to come up, do not initialize
- * the static flow entries ordinal
- */
- if (staticFlowsOrdinal.size() == 0) {
- staticFlowsOrdinal.put(0, Integer.valueOf(0));
- }
+ FRMConsumerImpl.getDataProviderService().registerCommitHandler(path, commitHandler);
}
-
- private void allocateCaches() {
-
- if (this.clusterContainerService == null) {
- logger.warn("Un-initialized clusterContainerService, can't create cache");
- return;
- }
-
- try {
- clusterContainerService.createCache("frm.originalSwView",
- EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
- clusterContainerService.createCache("frm.installedSwView",
- EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
- clusterContainerService
- .createCache("frm.staticFlows", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
- clusterContainerService.createCache("frm.staticFlowsOrdinal",
- EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
- clusterContainerService.createCache("frm.inactiveFlows",
- EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
- clusterContainerService.createCache("frm.nodeFlows", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
- clusterContainerService.createCache("frm.groupFlows", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
- } catch (CacheConfigException cce) {
- logger.error("CacheConfigException");
- } catch (CacheExistException cce) {
- logger.error("CacheExistException");
- }
- }
-
- private void addFlowTest() {
- try {
- NodeRef nodeOne = createNodeRef("foo:node:1");
- AddFlowInputBuilder input1 = new AddFlowInputBuilder();
-
- input1.setNode(nodeOne);
- AddFlowInput firstMsg = input1.build();
-
- if (null == flowService) {
- logger.error("ConsumerFlowService is NULL");
- }
- @SuppressWarnings("unused")
- Future<RpcResult<AddFlowOutput>> result1 = flowService.addFlow(firstMsg);
-
- } catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
-
+
/**
* Adds flow to the southbound plugin and our internal database
*
private void addFlow(InstanceIdentifier<?> path, Flow dataObject) {
AddFlowInputBuilder input = new AddFlowInputBuilder();
-
+ input.fieldsFrom(dataObject);
input.setNode((dataObject).getNode());
- input.setPriority((dataObject).getPriority());
- input.setMatch((dataObject).getMatch());
- input.setCookie((dataObject).getCookie());
- input.setInstructions((dataObject).getInstructions());
- input.setBufferId(dataObject.getBufferId());
- input.setTableId(dataObject.getTableId());
- input.setOutPort(dataObject.getOutPort());
- input.setOutGroup(dataObject.getOutGroup());
- input.setIdleTimeout(dataObject.getIdleTimeout());
- input.setHardTimeout(dataObject.getHardTimeout());
- input.setFlowName(dataObject.getFlowName());
- input.setFlags(dataObject.getFlags());
- input.setCookieMask(dataObject.getCookieMask());
- input.setContainerName(dataObject.getContainerName());
- input.setBarrier(dataObject.isBarrier());
- input.setInstallHw(dataObject.isInstallHw());
- input.setStrict(dataObject.isStrict());
-
- // updating the staticflow cache
- /*
- * Commented out... as in many other places... use of ClusteringServices
- * is breaking things insufficient time to debug Integer ordinal =
- * staticFlowsOrdinal.get(0); staticFlowsOrdinal.put(0, ++ordinal);
- * staticFlows.put(ordinal, dataObject);
- */
-
+ input.setFlowTable(new FlowTableRef(createTableInstance(dataObject.getId(), dataObject.getNode())));
// We send flow to the sounthbound plugin
-
flowService.addFlow(input.build());
-
- /*
- * Commented out as this will also break due to improper use of
- * ClusteringServices updateLocalDatabase((NodeFlow) dataObject, true);
- */
}
/**
* @param dataObject
*/
private void removeFlow(InstanceIdentifier<?> path, Flow dataObject) {
-
+
RemoveFlowInputBuilder input = new RemoveFlowInputBuilder();
+ input.fieldsFrom(dataObject);
input.setNode((dataObject).getNode());
- input.setPriority((dataObject).getPriority());
- input.setMatch((dataObject).getMatch());
- input.setCookie((dataObject).getCookie());
- input.setInstructions((dataObject).getInstructions());
- input.setBufferId(dataObject.getBufferId());
input.setTableId(dataObject.getTableId());
- input.setOutPort(dataObject.getOutPort());
- input.setOutGroup(dataObject.getOutGroup());
- input.setIdleTimeout(dataObject.getIdleTimeout());
- input.setHardTimeout(dataObject.getHardTimeout());
- input.setFlowName(dataObject.getFlowName());
- input.setFlags(dataObject.getFlags());
- input.setCookieMask(dataObject.getCookieMask());
- input.setContainerName(dataObject.getContainerName());
- input.setBarrier(dataObject.isBarrier());
- input.setInstallHw(dataObject.isInstallHw());
- input.setStrict(dataObject.isStrict());
- // updating the staticflow cache
- /*
- * Commented out due to problems caused by improper use of
- * ClusteringServices Integer ordinal = staticFlowsOrdinal.get(0);
- * staticFlowsOrdinal.put(0, ++ordinal); staticFlows.put(ordinal,
- * dataObject);
- */
-
+ input.setFlowTable(new FlowTableRef(createTableInstance((long)dataObject.getTableId(), (dataObject).getNode())));
// We send flow to the sounthbound plugin
flowService.removeFlow(input.build());
-
- /*
- * Commented out due to problems caused by improper use of
- * ClusteringServices updateLocalDatabase((NodeFlow) dataObject, false);
- */
}
/**
* @param path
* @param dataObject
*/
- private void updateFlow(InstanceIdentifier<?> path, Flow dataObject) {
+ private void updateFlow(InstanceIdentifier<?> path, Flow updatedFlow, Flow originalFlow) {
UpdateFlowInputBuilder input = new UpdateFlowInputBuilder();
UpdatedFlowBuilder updatedflowbuilder = new UpdatedFlowBuilder();
- updatedflowbuilder.fieldsFrom(dataObject);
- input.setNode(dataObject.getNode());
- input.setUpdatedFlow(updatedflowbuilder.build());
-
- // updating the staticflow cache
- /*
- * Commented out due to problems caused by improper use of
- * ClusteringServices. Integer ordinal = staticFlowsOrdinal.get(0);
- * staticFlowsOrdinal.put(0, ++ordinal); staticFlows.put(ordinal,
- * dataObject);
- */
-
+ updatedflowbuilder.fieldsFrom(updatedFlow);
+ input.setNode(updatedFlow.getNode());
+ input.setUpdatedFlow(updatedflowbuilder.build());
+ OriginalFlowBuilder ofb = new OriginalFlowBuilder(originalFlow);
+ input.setOriginalFlow(ofb.build());
// We send flow to the sounthbound plugin
flowService.updateFlow(input.build());
-
- /*
- * Commented out due to problems caused by improper use of
- * ClusteringServices. updateLocalDatabase((NodeFlow) dataObject, true);
- */
}
-
- @SuppressWarnings("unchecked")
+
private void commitToPlugin(internalTransaction transaction) {
Set<Entry<InstanceIdentifier<?>, DataObject>> createdEntries = transaction.getModification()
.getCreatedConfigurationData().entrySet();
addFlow(entry.getKey(), (Flow) entry.getValue());
}
}
- for (@SuppressWarnings("unused")
- Entry<InstanceIdentifier<?>, DataObject> entry : updatedEntries) {
+
+ for (Entry<InstanceIdentifier<?>, DataObject> entry : updatedEntries) {
if (entry.getValue() instanceof Flow) {
logger.debug("Coming update cc in FlowDatacommitHandler");
- Flow flow = (Flow) entry.getValue();
- boolean status = validate(flow);
+ Flow updatedFlow = (Flow) entry.getValue();
+ Flow originalFlow = (Flow) transaction.modification.getOriginalConfigurationData().get(entry.getKey());
+ boolean status = validate(updatedFlow);
if (!status) {
return;
}
- updateFlow(entry.getKey(), (Flow) entry.getValue());
+ updateFlow(entry.getKey(), updatedFlow, originalFlow);
}
}
logger.debug("Coming remove cc in FlowDatacommitHandler");
Flow flow = (Flow) removeValue;
boolean status = validate(flow);
+
if (!status) {
return;
}
+
removeFlow(instanceId, (Flow) removeValue);
-
}
}
-
}
- private final class FlowDataCommitHandler implements DataCommitHandler<InstanceIdentifier<?>, DataObject> {
-
+ private final class FlowDataCommitHandler implements DataCommitHandler<InstanceIdentifier<?>, DataObject> {
+
@SuppressWarnings("unchecked")
- @Override
- public DataCommitTransaction requestCommit(DataModification<InstanceIdentifier<?>, DataObject> modification) {
+ public DataCommitTransaction<InstanceIdentifier<?>, DataObject> requestCommit(DataModification<InstanceIdentifier<?>, DataObject> modification) {
// We should verify transaction
logger.debug("Coming in FlowDatacommitHandler");
internalTransaction transaction = new internalTransaction(modification);
public internalTransaction(DataModification<InstanceIdentifier<?>, DataObject> modification) {
this.modification = modification;
}
-
- Map<InstanceIdentifier<?>, Flow> additions = new HashMap<>();
- Map<InstanceIdentifier<?>, Flow> updates = new HashMap<>();
- Map<InstanceIdentifier<?>, Flow> removals = new HashMap<>();
-
+
/**
* We create a plan which flows will be added, which will be updated and
* which will be removed based on our internal state.
*
*/
- void prepareUpdate() {
+ void prepareUpdate() {
- Set<Entry<InstanceIdentifier<?>, DataObject>> puts = modification.getUpdatedConfigurationData().entrySet();
- for (Entry<InstanceIdentifier<?>, DataObject> entry : puts) {
- }
-
- // removals = modification.getRemovedConfigurationData();
- Set<InstanceIdentifier<?>> removedData = modification.getRemovedConfigurationData();
- for (InstanceIdentifier<?> removal : removedData) {
- DataObject value = modification.getOriginalConfigurationData().get(removal);
- if (value instanceof Flow) {
- removals.put(removal, (Flow) value);
- }
- }
-
- }
-
- private void preparePutEntry(InstanceIdentifier<?> key, Flow flow) {
- Flow original = originalSwView.get(key);
- if (original != null) {
- // It is update for us
- updates.put(key, flow);
- } else {
- // It is addition for us
- additions.put(key, flow);
- }
}
-
+
/**
* We are OK to go with execution of plan
*
*/
@Override
public RpcResult<Void> finish() throws IllegalStateException {
-
- commitToPlugin(this);
- // We return true if internal transaction is successful.
- // return Rpcs.getRpcResult(true, null, Collections.emptySet());
+ commitToPlugin(this);
return Rpcs.getRpcResult(true, null, Collections.<RpcError> emptySet());
}
*
*/
@Override
- public RpcResult<Void> rollback() throws IllegalStateException {
- // NOOP - we did not modified any internal state during
- // requestCommit phase
- // return Rpcs.getRpcResult(true, null, Collections.emptySet());
+ public RpcResult<Void> rollback() throws IllegalStateException {
+ rollBackFlows(modification);
return Rpcs.getRpcResult(true, null, Collections.<RpcError> emptySet());
- }
+ }
+ }
- private boolean flowEntryExists(Flow flow) {
- // Flow name has to be unique on per table id basis
- for (ConcurrentMap.Entry<FlowKey, Flow> entry : originalSwView.entrySet()) {
- if (entry.getValue().getFlowName().equals(flow.getFlowName())
- && entry.getValue().getTableId().equals(flow.getTableId())) {
- return true;
- }
- }
- return false;
+ private void rollBackFlows(DataModification<InstanceIdentifier<?>, DataObject> modification) {
+ Set<Entry<InstanceIdentifier<? extends DataObject>, DataObject>> createdEntries = modification.getCreatedConfigurationData().entrySet();
+
+ /*
+ * This little dance is because updatedEntries contains both created and modified entries
+ * The reason I created a new HashSet is because the collections we are returned are immutable.
+ */
+ Set<Entry<InstanceIdentifier<? extends DataObject>, DataObject>> updatedEntries = new HashSet<Entry<InstanceIdentifier<? extends DataObject>, DataObject>>();
+ updatedEntries.addAll(modification.getUpdatedConfigurationData().entrySet());
+ updatedEntries.removeAll(createdEntries);
+
+ Set<InstanceIdentifier<? >> removeEntriesInstanceIdentifiers = modification.getRemovedConfigurationData();
+ for (Entry<InstanceIdentifier<?>, DataObject> entry : createdEntries) {
+ if(entry.getValue() instanceof Flow) {
+ removeFlow(entry.getKey(),(Flow) entry.getValue()); // because we are rolling back, remove what we would have added.
}
}
+
+ for (Entry<InstanceIdentifier<?>, DataObject> entry : updatedEntries) {
+ if(entry.getValue() instanceof Flow) {
+ Flow updatedFlow = (Flow) entry.getValue();
+ Flow originalFlow = (Flow) modification.getOriginalConfigurationData().get(entry.getKey());
+ updateFlow(entry.getKey(), updatedFlow ,originalFlow);// because we are rolling back, replace the updated with the original
+ }
+ }
+
+ for (InstanceIdentifier<?> instanceId : removeEntriesInstanceIdentifiers ) {
+ DataObject removeValue = (Flow) modification.getOriginalConfigurationData().get(instanceId);
+ if(removeValue instanceof Flow) {
+ addFlow(instanceId,(Flow) removeValue);// because we are rolling back, add what we would have removed.
+ }
+ }
+}
final class FlowEventListener implements SalFlowListener {
List<FlowAdded> addedFlows = new ArrayList<>();
// TODO Auto-generated method stub
}
-
}
- // Commented out DataChangeListene - to be used by Stats
-
- // final class FlowDataListener implements DataChangeListener {
- // private SalFlowService flowService;
- //
- // public FlowDataListener() {
- //
- // }
- //
- // @Override
- // public void onDataChanged(
- // DataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
- // System.out.println("Coming in onDataChange..............");
- // @SuppressWarnings("unchecked")
- // Collection<DataObject> additions = (Collection<DataObject>)
- // change.getCreatedConfigurationData();
- // // we can check for getCreated, getDeleted or getUpdated from DataChange
- // Event class
- // for (DataObject dataObject : additions) {
- // if (dataObject instanceof NodeFlow) {
- // NodeRef nodeOne = createNodeRef("foo:node:1");
- // // validating the dataObject here
- // AddFlowInputBuilder input = new AddFlowInputBuilder();
- // input.setNode(((NodeFlow) dataObject).getNode());
- // input.setNode(nodeOne);
- // // input.setPriority(((NodeFlow) dataObject).getPriority());
- // //input.setMatch(((NodeFlow) dataObject).getMatch());
- // //input.setFlowTable(((NodeFlow) dataObject).getFlowTable());
- // //input.setCookie(((NodeFlow) dataObject).getCookie());
- // //input.setAction(((NodeFlow) dataObject).getAction());
- //
- // @SuppressWarnings("unused")
- // Future<RpcResult<java.lang.Void>> result =
- // flowService.addFlow(input.build());
- // }
- // }
- // }
- // }
-
public boolean validate(Flow flow) {
-
String msg = ""; // Specific part of warn/error log
boolean result = true;
// flow Name validation
- if (flow.getFlowName() == null || flow.getFlowName().trim().isEmpty() || !flow.getFlowName().matches(NAMEREGEX)) {
+ if (!FRMUtil.isNameValid(flow.getFlowName())) {
msg = "Invalid Flow name";
result = false;
}
+
// Node Validation
if (result == true && flow.getNode() == null) {
msg = "Node is null";
result = false;
}
}
-
- // Presence check
- /*
- * This is breaking due to some improper use of caches...
- *
- * if (flowEntryExists(flow)) { String error =
- * "Entry with this name on specified table already exists";
- * logger.warn(
- * "Entry with this name on specified table already exists: {}" ,
- * entry); logger.error(error); return; } if
- * (originalSwView.containsKey(entry)) { logger.warn(
- * "Operation Rejected: A flow with same match and priority exists on the target node"
- * ); logger.trace("Aborting to install {}", entry); continue; }
- */
+
if (!FRMUtil.validateMatch(flow)) {
logger.error("Not a valid Match");
result = false;
}
return result;
}
-
- private static void updateLocalDatabase(NodeFlow entry, boolean add) {
-
- updateSwViewes(entry, add);
-
- updateNodeFlowsDB(entry, add);
-
- }
-
- /*
- * Update the node mapped flows database
- */
- private static void updateSwViewes(NodeFlow entry, boolean add) {
- if (add) {
- FlowConsumerImpl.originalSwView.put((FlowKey) entry, (Flow) entry);
- installedSwView.put((FlowKey) entry, (Flow) entry);
- } else {
- originalSwView.remove(entry);
- installedSwView.remove(entry);
-
- }
- }
-
- @Override
- public List<DataObject> get() {
-
- List<DataObject> orderedList = new ArrayList<DataObject>();
- ConcurrentMap<Integer, Flow> flowMap = staticFlows;
- int maxKey = staticFlowsOrdinal.get(0).intValue();
- for (int i = 0; i <= maxKey; i++) {
- Flow entry = flowMap.get(i);
- if (entry != null) {
- orderedList.add(entry);
- }
- }
- return orderedList;
- }
-
- @Override
- public DataObject getWithName(String name, org.opendaylight.controller.sal.core.Node n) {
- if (this instanceof FlowConsumerImpl) {
- for (ConcurrentMap.Entry<Integer, Flow> flowEntry : staticFlows.entrySet()) {
- Flow flow = flowEntry.getValue();
- if (flow.getNode().equals(n) && flow.getFlowName().equals(name)) {
-
- return flowEntry.getValue();
- }
- }
- }
- return null;
- }
-
- /*
- * Update the node mapped flows database
- */
- private static void updateNodeFlowsDB(NodeFlow entry, boolean add) {
- Node node = (Node) entry.getNode();
-
- List<Flow> nodeIndeces = nodeFlows.get(node);
- if (nodeIndeces == null) {
- if (!add) {
- return;
- } else {
- nodeIndeces = new ArrayList<Flow>();
- }
- }
-
- if (add) {
- nodeIndeces.add((Flow) entry);
- } else {
- nodeIndeces.remove(entry);
- }
-
- // Update cache across cluster
- if (nodeIndeces.isEmpty()) {
- nodeFlows.remove(node);
- } else {
- nodeFlows.put(node, nodeIndeces);
- }
- }
-
- private static NodeRef createNodeRef(String string) {
- NodeKey key = new NodeKey(new NodeId(string));
- InstanceIdentifier<Node> path = InstanceIdentifier.builder().node(Nodes.class).node(Node.class, key)
- .toInstance();
-
- return new NodeRef(path);
+
+ private InstanceIdentifier<?> createTableInstance(Long tableId, NodeRef nodeRef) {
+ Table table;
+ InstanceIdentifier<Table> tableInstance;
+ TableBuilder builder = new TableBuilder();
+ builder.setId(tableId);
+ builder.setKey(new TableKey(tableId, nodeRef));
+ table = builder.build();
+ tableInstance = InstanceIdentifier.builder(Tables.class).child(Table.class, table.getKey()).toInstance();
+ return tableInstance;
}
}
\ No newline at end of file
import org.opendaylight.controller.sal.utils.GlobalConstants;
import org.opendaylight.controller.sal.utils.Status;
import org.opendaylight.controller.sal.utils.StatusCode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.flow.update.OriginalFlowBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.flow.update.UpdatedFlowBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.group.update.OriginalGroupBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.group.update.UpdatedGroupBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.config.rev131024.Meters;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.config.rev131024.meters.Meter;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.config.rev131024.meters.MeterKey;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.SalMeterListener;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.SalMeterService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.meter.update.OriginalMeterBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.meter.update.UpdatedMeterBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.MeterId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.band.type.BandType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class MeterConsumerImpl implements IForwardingRulesManager {
+public class MeterConsumerImpl {
protected static final Logger logger = LoggerFactory.getLogger(MeterConsumerImpl.class);
private final MeterEventListener meterEventListener = new MeterEventListener();
private Registration<NotificationListener> meterListener;
private SalMeterService meterService;
private MeterDataCommitHandler commitHandler;
- private ConcurrentMap<MeterKey, Meter> originalSwMeterView;
- @SuppressWarnings("unused")
- private ConcurrentMap<MeterKey, Meter> installedSwMeterView;
- @SuppressWarnings("unused")
- private ConcurrentMap<Node, List<Meter>> nodeMeters;
- @SuppressWarnings("unused")
- private ConcurrentMap<MeterKey, Meter> inactiveMeters;
- @SuppressWarnings("unused")
- private IContainer container;
- private IClusterContainerServices clusterMeterContainerService = null;
-
public MeterConsumerImpl() {
InstanceIdentifier<? extends DataObject> path = InstanceIdentifier.builder(Meters.class).toInstance();
meterService = FRMConsumerImpl.getProviderSession().getRpcService(SalMeterService.class);
- clusterMeterContainerService = FRMConsumerImpl.getClusterContainerService();
- container = FRMConsumerImpl.getContainer();
-
- if (!(cacheStartup())) {
- logger.error("Unable to allocate/retrieve meter cache");
- System.out.println("Unable to allocate/retrieve meter cache");
- }
-
+
if (null == meterService) {
logger.error("Consumer SAL Meter Service is down or NULL. FRM may not function as intended");
System.out.println("Consumer SAL Meter Service is down or NULL.");
commitHandler = new MeterDataCommitHandler();
FRMConsumerImpl.getDataProviderService().registerCommitHandler(path, commitHandler);
}
-
- private boolean allocateMeterCaches() {
- if (this.clusterMeterContainerService == null) {
- logger.warn("Meter: Un-initialized clusterMeterContainerService, can't create cache");
- return false;
- }
-
- try {
- clusterMeterContainerService.createCache("frm.originalSwMeterView",
- EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
-
- clusterMeterContainerService.createCache("frm.installedSwMeterView",
- EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
-
- clusterMeterContainerService.createCache("frm.inactiveMeters",
- EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
-
- clusterMeterContainerService.createCache("frm.nodeMeters",
- EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
-
- // TODO for cluster mode
- /*
- * clusterMeterContainerService.createCache(WORK_STATUS_CACHE,
- * EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL,
- * IClusterServices.cacheMode.ASYNC));
- *
- * clusterMeterContainerService.createCache(WORK_ORDER_CACHE,
- * EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL,
- * IClusterServices.cacheMode.ASYNC));
- */
-
- } catch (CacheConfigException cce) {
- logger.error("Meter CacheConfigException");
- return false;
-
- } catch (CacheExistException cce) {
- logger.error(" Meter CacheExistException");
- }
-
- return true;
- }
-
- private void nonClusterMeterObjectCreate() {
- originalSwMeterView = new ConcurrentHashMap<MeterKey, Meter>();
- installedSwMeterView = new ConcurrentHashMap<MeterKey, Meter>();
- nodeMeters = new ConcurrentHashMap<Node, List<Meter>>();
- inactiveMeters = new ConcurrentHashMap<MeterKey, Meter>();
- }
-
- @SuppressWarnings({ "unchecked" })
- private boolean retrieveMeterCaches() {
- ConcurrentMap<?, ?> map;
-
- if (this.clusterMeterContainerService == null) {
- logger.warn("Meter: un-initialized clusterMeterContainerService, can't retrieve cache");
- nonClusterMeterObjectCreate();
- return false;
- }
-
- map = clusterMeterContainerService.getCache("frm.originalSwMeterView");
- if (map != null) {
- originalSwMeterView = (ConcurrentMap<MeterKey, Meter>) map;
- } else {
- logger.error("Retrieval of cache(originalSwMeterView) failed");
- return false;
- }
-
- map = clusterMeterContainerService.getCache("frm.installedSwMeterView");
- if (map != null) {
- installedSwMeterView = (ConcurrentMap<MeterKey, Meter>) map;
- } else {
- logger.error("Retrieval of cache(installedSwMeterView) failed");
- return false;
- }
-
- map = clusterMeterContainerService.getCache("frm.inactiveMeters");
- if (map != null) {
- inactiveMeters = (ConcurrentMap<MeterKey, Meter>) map;
- } else {
- logger.error("Retrieval of cache(inactiveMeters) failed");
- return false;
- }
-
- map = clusterMeterContainerService.getCache("frm.nodeMeters");
- if (map != null) {
- nodeMeters = (ConcurrentMap<Node, List<Meter>>) map;
- } else {
- logger.error("Retrieval of cache(nodeMeter) failed");
- return false;
- }
-
- return true;
- }
-
- private boolean cacheStartup() {
- if (allocateMeterCaches()) {
- if (retrieveMeterCaches()) {
- return true;
- }
- }
-
- return false;
- }
-
+
/**
* Adds Meter to the southbound plugin and our internal database
*
private Status addMeter(InstanceIdentifier<?> path, Meter meterAddDataObject) {
MeterKey meterKey = meterAddDataObject.getKey();
- if (null != meterKey && validateMeter(meterAddDataObject, FRMUtil.operation.ADD).isSuccess()) {
+ if (null != meterKey && validateMeter(meterAddDataObject).isSuccess()) {
AddMeterInputBuilder meterBuilder = new AddMeterInputBuilder();
- meterBuilder.setContainerName(meterAddDataObject.getContainerName());
- meterBuilder.setFlags(meterAddDataObject.getFlags());
- meterBuilder.setMeterBandHeaders(meterAddDataObject.getMeterBandHeaders());
+ meterBuilder.fieldsFrom(meterAddDataObject);
meterBuilder.setMeterId(new MeterId(meterAddDataObject.getId()));
meterBuilder.setNode(meterAddDataObject.getNode());
meterService.addMeter(meterBuilder.build());
*
* @param dataObject
*/
- private Status updateMeter(InstanceIdentifier<?> path, Meter meterUpdateDataObject) {
- MeterKey meterKey = meterUpdateDataObject.getKey();
+ private Status updateMeter(InstanceIdentifier<?> path,
+ Meter updatedMeter, Meter originalMeter) {
UpdatedMeterBuilder updateMeterBuilder = null;
- if (null != meterKey && validateMeter(meterUpdateDataObject, FRMUtil.operation.UPDATE).isSuccess()) { UpdateMeterInputBuilder updateMeterInputBuilder = new UpdateMeterInputBuilder();
+ if (validateMeter(updatedMeter).isSuccess()) {
+ UpdateMeterInputBuilder updateMeterInputBuilder = new UpdateMeterInputBuilder();
+ updateMeterInputBuilder.setNode(updatedMeter.getNode());
updateMeterBuilder = new UpdatedMeterBuilder();
- updateMeterBuilder.fieldsFrom(meterUpdateDataObject);
- updateMeterBuilder.setMeterId(new MeterId(meterUpdateDataObject.getId()));
-
+ updateMeterBuilder.fieldsFrom(updatedMeter);
+ updateMeterBuilder.setMeterId(new MeterId(updatedMeter.getId()));
updateMeterInputBuilder.setUpdatedMeter(updateMeterBuilder.build());
+ OriginalMeterBuilder originalMeterBuilder = new OriginalMeterBuilder(originalMeter);
+ updateMeterInputBuilder.setOriginalMeter(originalMeterBuilder.build());
meterService.updateMeter(updateMeterInputBuilder.build());
} else {
return new Status(StatusCode.BADREQUEST, "Meter Key or attribute validation failed");
private Status removeMeter(InstanceIdentifier<?> path, Meter meterRemoveDataObject) {
MeterKey meterKey = meterRemoveDataObject.getKey();
- if (null != meterKey && validateMeter(meterRemoveDataObject, FRMUtil.operation.DELETE).isSuccess()) {
+ if (null != meterKey && validateMeter(meterRemoveDataObject).isSuccess()) {
RemoveMeterInputBuilder meterBuilder = new RemoveMeterInputBuilder();
- meterBuilder.setContainerName(meterRemoveDataObject.getContainerName());
- meterBuilder.setNode(meterRemoveDataObject.getNode());
- meterBuilder.setFlags(meterRemoveDataObject.getFlags());
- meterBuilder.setMeterBandHeaders(meterRemoveDataObject.getMeterBandHeaders());
- meterBuilder.setMeterId(new MeterId(meterRemoveDataObject.getId()));
- meterBuilder.setNode(meterRemoveDataObject.getNode());
+ meterBuilder.fieldsFrom(meterRemoveDataObject);
+ meterBuilder.setNode(meterRemoveDataObject.getNode());
+ meterBuilder.setMeterId(new MeterId(meterRemoveDataObject.getId()));
meterService.removeMeter(meterBuilder.build());
} else {
return new Status(StatusCode.BADREQUEST, "Meter Key or attribute validation failed");
return new Status(StatusCode.SUCCESS);
}
- public Status validateMeter(Meter meter, FRMUtil.operation operation) {
- String containerName;
+ public Status validateMeter(Meter meter) {
String meterName;
Status returnStatus = null;
if (null != meter) {
- containerName = meter.getContainerName();
-
- if (null == containerName) {
- containerName = GlobalConstants.DEFAULT.toString();
- } else if (!FRMUtil.isNameValid(containerName)) {
- logger.error("Container Name is invalid %s" + containerName);
- returnStatus = new Status(StatusCode.BADREQUEST, "Container Name is invalid");
- return returnStatus;
- }
-
meterName = meter.getMeterName();
if (!FRMUtil.isNameValid(meterName)) {
logger.error("Meter Name is invalid %s" + meterName);
}
for (int i = 0; i < meter.getMeterBandHeaders().getMeterBandHeader().size(); i++) {
- if (!meter.getFlags().isMeterBurst()) {
+ if (null != meter.getFlags() && !meter.getFlags().isMeterBurst()) {
if (0 < meter.getMeterBandHeaders().getMeterBandHeader().get(i).getBurstSize()) {
logger.error("Burst size should only be associated when Burst FLAG is set");
returnStatus = new Status(StatusCode.BADREQUEST,
if (null != returnStatus && !returnStatus.isSuccess()) {
return returnStatus;
- } else {
+ } else if (null != meter.getMeterBandHeaders()) {
BandType setBandType = null;
DscpRemark dscpRemark = null;
for (int i = 0; i < meter.getMeterBandHeaders().getMeterBandHeader().size(); i++) {
return new Status(StatusCode.SUCCESS);
}
+ private RpcResult<Void> commitToPlugin(InternalTransaction transaction) {
+ DataModification<InstanceIdentifier<?>, DataObject> modification = transaction.modification;
+ //get created entries
+ Set<Entry<InstanceIdentifier<? extends DataObject>, DataObject>> createdEntries =
+ modification.getCreatedConfigurationData().entrySet();
+
+ //get updated entries
+ Set<Entry<InstanceIdentifier<? extends DataObject>, DataObject>> updatedEntries =
+ new HashSet<Entry<InstanceIdentifier<? extends DataObject>, DataObject>>();
+
+ updatedEntries.addAll(modification.getUpdatedConfigurationData().entrySet());
+ updatedEntries.removeAll(createdEntries);
+
+ //get removed entries
+ Set<InstanceIdentifier<? extends DataObject>> removeEntriesInstanceIdentifiers =
+ modification.getRemovedConfigurationData();
+
+ for (Entry<InstanceIdentifier<? extends DataObject >, DataObject> entry : createdEntries) {
+ if(entry.getValue() instanceof Meter) {
+ addMeter(entry.getKey(), (Meter)entry.getValue());
+ }
+ }
+
+ for (Entry<InstanceIdentifier<?>, DataObject> entry : updatedEntries) {
+ if(entry.getValue() instanceof Meter) {
+ Meter originalMeter = (Meter) modification.getOriginalConfigurationData().get(entry.getKey());
+ Meter updatedMeter = (Meter) entry.getValue();
+ updateMeter(entry.getKey(), originalMeter, updatedMeter);
+ }
+ }
+
+ for (InstanceIdentifier<?> instanceId : removeEntriesInstanceIdentifiers ) {
+ DataObject removeValue = modification.getOriginalConfigurationData().get(instanceId);
+ if(removeValue instanceof Meter) {
+ removeMeter(instanceId, (Meter)removeValue);
+ }
+ }
+
+ return Rpcs.getRpcResult(true, null, Collections.<RpcError>emptySet());
+ }
+
final class InternalTransaction implements DataCommitTransaction<InstanceIdentifier<?>, DataObject> {
private final DataModification<InstanceIdentifier<?>, DataObject> modification;
this.modification = modification;
}
- Map<InstanceIdentifier<?>, Meter> additions = new HashMap<>();
- Map<InstanceIdentifier<?>, Meter> updates = new HashMap<>();
- Set<InstanceIdentifier<?>> removals = new HashSet<>();
-
/**
* We create a plan which flows will be added, which will be updated and
* which will be removed based on our internal state.
*
*/
- void prepareUpdate() {
-
- Set<Entry<InstanceIdentifier<?>, DataObject>> addMeter = modification.getCreatedConfigurationData().entrySet();
- for (Entry<InstanceIdentifier<?>, DataObject> entry : addMeter) {
- if (entry.getValue() instanceof Meter) {
- Meter meter = (Meter) entry.getValue();
- additions.put(entry.getKey(), meter);
- }
-
- }
+ void prepareUpdate() {
- Set<Entry<InstanceIdentifier<?>, DataObject>> updateMeter = modification.getUpdatedConfigurationData().entrySet();
- for (Entry<InstanceIdentifier<?>, DataObject> entry : updateMeter) {
- if (entry.getValue() instanceof Meter) {
- Meter meter = (Meter) entry.getValue();
- ///will be fixed once getUpdatedConfigurationData returns only updated data not created data with it.
- if (!additions.containsKey(entry.getKey())) {
- updates.put(entry.getKey(), meter);
- }
- }
- }
-
- removals = modification.getRemovedConfigurationData();
}
/**
@Override
public RpcResult<Void> finish() throws IllegalStateException {
- RpcResult<Void> rpcStatus = commitToPlugin(this);
- // We return true if internal transaction is successful.
- // return Rpcs.getRpcResult(true, null, Collections.emptySet());
+ RpcResult<Void> rpcStatus = commitToPlugin(this);
return rpcStatus;
}
*
*/
@Override
- public RpcResult<Void> rollback() throws IllegalStateException {
- // NOOP - we did not modified any internal state during
- // requestCommit phase
- // return Rpcs.getRpcResult(true, null, Collections.emptySet());
+ public RpcResult<Void> rollback() throws IllegalStateException {
return Rpcs.getRpcResult(true, null, Collections.<RpcError>emptySet());
}
}
- private RpcResult<Void> commitToPlugin(InternalTransaction transaction) {
- for (Entry<InstanceIdentifier<?>, Meter> entry : transaction.additions.entrySet()) {
-
- if (!addMeter(entry.getKey(), entry.getValue()).isSuccess()) {
- return Rpcs.getRpcResult(false, null, Collections.<RpcError>emptySet());
- }
- }
- for (Entry<InstanceIdentifier<?>, Meter> entry : transaction.updates.entrySet()) {
-
- if (!updateMeter(entry.getKey(), entry.getValue()).isSuccess()) {
- return Rpcs.getRpcResult(false, null, Collections.<RpcError>emptySet());
- }
- }
-
- for (InstanceIdentifier<?> meterId : transaction.removals) {
- DataObject removeValue = transaction.getModification().getOriginalConfigurationData().get(meterId);
-
- if(removeValue instanceof Meter) {
- if(!removeMeter(meterId, (Meter)removeValue).isSuccess()) {
- return Rpcs.getRpcResult(false, null, Collections.<RpcError>emptySet());
- }
- }
- }
-
- return Rpcs.getRpcResult(true, null, Collections.<RpcError>emptySet());
- }
-
private final class MeterDataCommitHandler implements DataCommitHandler<InstanceIdentifier<?>, DataObject> {
@Override
public org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction<InstanceIdentifier<?>, DataObject> requestCommit(
// TODO Auto-generated method stub
}
- }
-
- @Override
- public List<DataObject> get() {
-
- List<DataObject> orderedList = new ArrayList<DataObject>();
- Collection<Meter> meterList = originalSwMeterView.values();
- for (Iterator<Meter> iterator = meterList.iterator(); iterator.hasNext();) {
- orderedList.add(iterator.next());
- }
- return orderedList;
- }
-
- @Override
- public DataObject getWithName(String name, Node n) {
- if (this instanceof MeterConsumerImpl) {
- Collection<Meter> meterList = originalSwMeterView.values();
- for (Iterator<Meter> iterator = meterList.iterator(); iterator.hasNext();) {
- Meter meter = iterator.next();
- if (meter.getNode().equals(n) && meter.getMeterName().equals(name)) {
-
- return meter;
- }
- }
- }
-
- return null;
- }
+ }
}