// Populate the SFF Connection Graph
//
SffGraph sffGraph = populateSffGraph(rsp);
-
SfcRspTransportProcessorBase transportProcessor = getTransportProcessor(sffGraph, rsp);
//
configureTransportEgressFlows(entry, sffGraph, transportProcessor);
}
+ // Flush the flows to the data store
+ this.sfcOfFlowProgrammer.flushFlows();
+
LOG.info("Processing complete for RSP: name [{}] Id [{}]", rsp.getName(), rsp.getPathId());
} catch (RuntimeException e) {
LOG.error("RuntimeException in processRenderedServicePath: ", e.getMessage(), e);
} finally {
+ // If there were any errors, purge any remaining flows so they're not written
+ this.sfcOfFlowProgrammer.purgeFlows();
sfcSynchronizer.unlock();
sfcOfProviderUtils.removeRsp(rsp.getPathId());
}
* @param rsp - the Rendered Service Path to delete
*/
public void deleteRenderedServicePath(RenderedServicePath rsp) {
- Set<NodeId> clearedSffNodeIDs = sfcOfFlowProgrammer.deleteRspFlowsAndClearSFFsIfNoRspExists(rsp.getPathId());
+ Set<NodeId> clearedSffNodeIDs = sfcOfFlowProgrammer.deleteRspFlows(rsp.getPathId());
for(NodeId sffNodeId : clearedSffNodeIDs){
setSffInitialized(sffNodeId, false);
}
}
@Override
- public Set<NodeId> deleteRspFlowsAndClearSFFsIfNoRspExists(final Long rspId) {
+ public Set<NodeId> deleteRspFlows(final Long rspId) {
sfcOfFlowWriter.deleteRspFlows(rspId);
- return sfcOfFlowWriter.clearSffsIfNoRspExists();
+ Set<NodeId> nodes = sfcOfFlowWriter.clearSffsIfNoRspExists();
+ sfcOfFlowWriter.deleteFlowSet();
+ return nodes;
+ }
+
+ @Override
+ public void flushFlows() {
+ this.sfcOfFlowWriter.flushFlows();
+ }
+
+ @Override
+ public void purgeFlows() {
+ this.sfcOfFlowWriter.purgeFlows();
}
/**
configureTableMatchAnyFlow(
getTableId(TABLE_INDEX_CLASSIFIER),
getTableId(TABLE_INDEX_TRANSPORT_INGRESS));
- sfcOfFlowWriter.writeFlowToConfig(flowRspId, sffNodeName, flowBuilder);
+ sfcOfFlowWriter.writeFlow(flowRspId, sffNodeName, flowBuilder);
}
/**
FlowBuilder flowBuilder =
configureTableMatchAnyDropFlow(
getTableId(TABLE_INDEX_TRANSPORT_INGRESS));
- sfcOfFlowWriter.writeFlowToConfig(flowRspId, sffNodeName, flowBuilder);
+ sfcOfFlowWriter.writeFlow(flowRspId, sffNodeName, flowBuilder);
}
/**
configureTableMatchAnyFlow(
getTableId(TABLE_INDEX_PATH_MAPPER),
getTableId(TABLE_INDEX_PATH_MAPPER_ACL));
- sfcOfFlowWriter.writeFlowToConfig(flowRspId, sffNodeName, flowBuilder);
+ sfcOfFlowWriter.writeFlow(flowRspId, sffNodeName, flowBuilder);
}
/**
configureTableMatchAnyFlow(
getTableId(TABLE_INDEX_PATH_MAPPER_ACL),
getTableId(TABLE_INDEX_NEXT_HOP));
- sfcOfFlowWriter.writeFlowToConfig(flowRspId, sffNodeName, flowBuilder);
+ sfcOfFlowWriter.writeFlow(flowRspId, sffNodeName, flowBuilder);
}
/**
configureTableMatchAnyFlow(
getTableId(TABLE_INDEX_NEXT_HOP),
getTableId(TABLE_INDEX_TRANSPORT_EGRESS));
- sfcOfFlowWriter.writeFlowToConfig(flowRspId, sffNodeName, flowBuilder);
+ sfcOfFlowWriter.writeFlow(flowRspId, sffNodeName, flowBuilder);
}
/**
FlowBuilder flowBuilder =
configureTableMatchAnyDropFlow(
getTableId(TABLE_INDEX_TRANSPORT_EGRESS));
- sfcOfFlowWriter.writeFlowToConfig(flowRspId, sffNodeName, flowBuilder);
+ sfcOfFlowWriter.writeFlow(flowRspId, sffNodeName, flowBuilder);
}
/**
configureTransportIngressFlow(
SfcOpenflowUtils.ETHERTYPE_IPV4,
SfcOpenflowUtils.IP_PROTOCOL_TCP);
- sfcOfFlowWriter.writeFlowToConfig(flowRspId, sffNodeName, transportIngressFlowTcp);
+ sfcOfFlowWriter.writeFlow(flowRspId, sffNodeName, transportIngressFlowTcp);
FlowBuilder transportIngressFlowUdp =
configureTransportIngressFlow(
SfcOpenflowUtils.ETHERTYPE_IPV4,
SfcOpenflowUtils.IP_PROTOCOL_UDP);
- sfcOfFlowWriter.writeFlowToConfig(flowRspId, sffNodeName, transportIngressFlowUdp);
+ sfcOfFlowWriter.writeFlow(flowRspId, sffNodeName, transportIngressFlowUdp);
}
/**
match.setVlanMatch(vlanBuilder.build());
FlowBuilder transportIngressFlow = configureTransportIngressFlow(match);
- sfcOfFlowWriter.writeFlowToConfig(flowRspId, sffNodeName, transportIngressFlow);
+ sfcOfFlowWriter.writeFlow(flowRspId, sffNodeName, transportIngressFlow);
}
/**
FlowBuilder transportIngressFlow =
configureTransportIngressFlow(match, getTableId(TABLE_INDEX_NEXT_HOP));
- sfcOfFlowWriter.writeFlowToConfig(flowRspId, sffNodeName, transportIngressFlow);
+ sfcOfFlowWriter.writeFlow(flowRspId, sffNodeName, transportIngressFlow);
}
/**
public void configureMplsTransportIngressFlow(final String sffNodeName) {
FlowBuilder transportIngressFlow =
configureTransportIngressFlow(SfcOpenflowUtils.ETHERTYPE_MPLS_UCAST);
- sfcOfFlowWriter.writeFlowToConfig(flowRspId, sffNodeName, transportIngressFlow);
+ sfcOfFlowWriter.writeFlow(flowRspId, sffNodeName, transportIngressFlow);
}
/**
"ingress_Transport_Arp_Flow",
match, isb);
- sfcOfFlowWriter.writeFlowToConfig(flowRspId, sffNodeName, arpTransportIngressFlow);
+ sfcOfFlowWriter.writeFlow(flowRspId, sffNodeName, arpTransportIngressFlow);
}
@Override
"ingress_Transport_Arp_Flow",
match, isb);
- sfcOfFlowWriter.writeFlowToConfig(flowRspId, sffNodeName, sfFlow);
+ sfcOfFlowWriter.writeFlow(flowRspId, sffNodeName, sfFlow);
}
@Override
"ingress_Transport_Arp_Flow",
match, isb);
- sfcOfFlowWriter.writeFlowToConfig(flowRspId, sffNodeName, sfFlow);
+ sfcOfFlowWriter.writeFlow(flowRspId, sffNodeName, sfFlow);
}
} else {
pathMapperFlow = configurePathMapperFlow(pathId, match, actionList);
}
- sfcOfFlowWriter.writeFlowToConfig(flowRspId, sffNodeName, pathMapperFlow);
+ sfcOfFlowWriter.writeFlow(flowRspId, sffNodeName, pathMapperFlow);
}
/**
} else {
pathMapperFlow = configurePathMapperFlow(pathId, match, actionList);
}
- sfcOfFlowWriter.writeFlowToConfig(flowRspId, sffNodeName, pathMapperFlow);
+ sfcOfFlowWriter.writeFlow(flowRspId, sffNodeName, pathMapperFlow);
}
/**
// Set an idle timeout on this flow
ingressFlow.setIdleTimeout(PKTIN_IDLE_TIMEOUT);
- sfcOfFlowWriter.writeFlowToConfig(flowRspId, sffNodeName, ingressFlow);
+ sfcOfFlowWriter.writeFlow(flowRspId, sffNodeName, ingressFlow);
}
//
}
FlowBuilder nextHopFlow = configureNextHopFlow(match, actionList, flowPriority);
- sfcOfFlowWriter.writeFlowToConfig(flowRspId, sffNodeName, nextHopFlow);
+ sfcOfFlowWriter.writeFlow(flowRspId, sffNodeName, nextHopFlow);
}
/**
}
FlowBuilder nextHopFlow = configureNextHopFlow(match, actionList);
- sfcOfFlowWriter.writeFlowToConfig(flowRspId, sffNodeName, nextHopFlow);
+ sfcOfFlowWriter.writeFlow(flowRspId, sffNodeName, nextHopFlow);
}
/**
FlowBuilder transportEgressFlow =
configureTransportEgressFlow(match, actionList, port, order, pathId, srcMac, dstMac);
- sfcOfFlowWriter.writeFlowToConfig(flowRspId, sffNodeName, transportEgressFlow);
+ sfcOfFlowWriter.writeFlow(flowRspId, sffNodeName, transportEgressFlow);
}
/**
FlowBuilder transportEgressFlow =
configureTransportEgressFlow(match, actionList, port, order, pathId, srcMac, dstMac);
- sfcOfFlowWriter.writeFlowToConfig(flowRspId, sffNodeName, transportEgressFlow);
+ sfcOfFlowWriter.writeFlow(flowRspId, sffNodeName, transportEgressFlow);
}
/**
FlowBuilder transportEgressFlow =
configureTransportEgressFlow(match, actionList, port, order, pathId, srcMac, dstMac);
- sfcOfFlowWriter.writeFlowToConfig(flowRspId, sffNodeName, transportEgressFlow);
+ sfcOfFlowWriter.writeFlow(flowRspId, sffNodeName, transportEgressFlow);
}
/**
FlowBuilder transportEgressFlow =
configureTransportEgressFlow(match, actionList, port, order);
- sfcOfFlowWriter.writeFlowToConfig(flowRspId, sffNodeName, transportEgressFlow);
+ sfcOfFlowWriter.writeFlow(flowRspId, sffNodeName, transportEgressFlow);
}
@Override
FlowBuilder transportEgressFlow =
configureTransportEgressFlow(match, actionList, port, order);
- sfcOfFlowWriter.writeFlowToConfig(flowRspId, sffNodeName, transportEgressFlow);
+ sfcOfFlowWriter.writeFlow(flowRspId, sffNodeName, transportEgressFlow);
}
/**
configureTransportEgressFlow(
match, new ArrayList<Action>(), port,
order, FLOW_PRIORITY_TRANSPORT_EGRESS + 10);
- sfcOfFlowWriter.writeFlowToConfig(flowRspId, sffNodeName, transportEgressFlow);
+ sfcOfFlowWriter.writeFlow(flowRspId, sffNodeName, transportEgressFlow);
}
@Override
configureTransportEgressFlow(
match, actionList, EMPTY_SWITCH_PORT,
order, FLOW_PRIORITY_TRANSPORT_EGRESS + 10);
- sfcOfFlowWriter.writeFlowToConfig(flowRspId, sffNodeName, transportEgressFlow);
+ sfcOfFlowWriter.writeFlow(flowRspId, sffNodeName, transportEgressFlow);
}
/**
SfcOpenflowUtils.createFlowBuilder(
getTableId(TABLE_INDEX_NEXT_HOP),
flowPriority, "nextHop", match, isb);
- sfcOfFlowWriter.writeFlowToConfig(flowRspId, sffNodeName, nextHopFlow);
+ sfcOfFlowWriter.writeFlow(flowRspId, sffNodeName, nextHopFlow);
}
private static BigInteger getMetadataSFP(long sfpId) {
*
* @return Node IDs from which initialization flows were removed.
*/
- public Set<NodeId> deleteRspFlowsAndClearSFFsIfNoRspExists(final Long rspId);
+ public Set<NodeId> deleteRspFlows(final Long rspId);
+
+ // Write any buffered flows to the data store
+ public void flushFlows();
+
+ // Purge any unwritten flows not written yet. This should be called upon
+ // errors, when the remaining buffered flows should not be written.
+ public void purgeFlows();
//Set FlowWriter implementation
public void setFlowWriter(SfcOfFlowWriterInterface sfcOfFlowWriter);
import java.util.Map.Entry;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.sfc.provider.OpendaylightSfc;
import org.opendaylight.sfc.provider.api.SfcDataStoreAPI;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.util.concurrent.CheckedFuture;
+
/**
* Set of instructions in order to interact with MD-SAL datastore.
* <p>
*
* @since 2015-11-25
*/
public class SfcOfFlowWriterImpl implements SfcOfFlowWriterInterface {
- private static final int SCHEDULED_THREAD_POOL_SIZE = 1;
- private static final int QUEUE_SIZE = 1000;
- private static final int ASYNC_THREAD_POOL_KEEP_ALIVE_TIME_SECS = 300;
private static final long SHUTDOWN_TIME = 5;
- private static final String LOGSTR_THREAD_QUEUE_FULL = "Thread Queue is full, cant execute action: {}";
+ private static final String LOGSTR_THREAD_EXCEPTION = "Exception executing Thread: {}";
private static final Logger LOG = LoggerFactory.getLogger(SfcOfFlowWriterImpl.class);
+ //private ExecutorService threadPoolExecutorServiceDelete;
private ExecutorService threadPoolExecutorService;
+
private FlowBuilder flowBuilder;
+ // Store RspId to List of FlowDetails, to be able
+ // to delete all flows for a particular RSP
private Map<Long, List<FlowDetails>> rspNameToFlowsMap;
+ //temporary list of flows to be deleted. All of them will be transactionally deleted on
+ // deleteFlowSet() invokation
+ private Set<FlowDetails> setOfFlowsToDelete;
+ // temporary list of flows to be deleted. All of them will be transactionally deleted on
+ // flushFlows() invokation
+ private Set<FlowDetails> setOfFlowsToAdd;
+
public SfcOfFlowWriterImpl() {
- // Not using an Executors.newSingleThreadExecutor() here, since it creates
- // an Executor that uses a single worker thread operating off an unbounded
- // queue, and we want to be able to limit the size of the queue
- this.threadPoolExecutorService = new ThreadPoolExecutor(SCHEDULED_THREAD_POOL_SIZE, SCHEDULED_THREAD_POOL_SIZE,
- ASYNC_THREAD_POOL_KEEP_ALIVE_TIME_SECS, TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>(QUEUE_SIZE));
+
+ this.threadPoolExecutorService = Executors.newSingleThreadExecutor();;
this.rspNameToFlowsMap = new HashMap<Long, List<FlowDetails>>();
this.flowBuilder = null;
+ this.setOfFlowsToDelete = new HashSet<FlowDetails>();
+ this.setOfFlowsToAdd = new HashSet<FlowDetails>();
}
+ /**
+ * Shutdown the thread pool
+ */
+ @Override
public void shutdown() throws ExecutionException, InterruptedException {
// When we close this service we need to shutdown our executor!
threadPoolExecutorService.shutdown();
}
/**
- * A thread class used to write the flows to the data store.
+ * A thread class used to write the flows to the data store. It receives the list of flows to create at creation time.
+ * The flows are written together in a single data store transaction
*/
- class FlowWriterTask implements Runnable {
- String sffNodeName;
- InstanceIdentifier<Flow> flowInstanceId;
- FlowBuilder flowBuilder;
+ class FlowSetWriterTask implements Runnable {
+ Set<FlowDetails> flowsToWrite = new HashSet<FlowDetails>();
- public FlowWriterTask(String sffNodeName, InstanceIdentifier<Flow> flowInstanceId, FlowBuilder flowBuilder) {
- this.sffNodeName = sffNodeName;
- this.flowInstanceId = flowInstanceId;
- this.flowBuilder = flowBuilder;
+ public FlowSetWriterTask(Set<FlowDetails> flowsToWrite) {
+ this.flowsToWrite.addAll(flowsToWrite);
}
public void run(){
- if (!SfcDataStoreAPI.writeMergeTransactionAPI(
- this.flowInstanceId,
- this.flowBuilder.build(),
- LogicalDatastoreType.CONFIGURATION)) {
- LOG.error("{}: Failed to create Flow on node: {}", Thread.currentThread().getStackTrace()[1], this.sffNodeName);
+ WriteTransaction trans = OpendaylightSfc.getOpendaylightSfcObj().getDataProvider().newWriteOnlyTransaction();
+
+ LOG.debug("FlowSetWriterTask: starting addition of {} flows", flowsToWrite.size());
+
+ for (FlowDetails f: flowsToWrite) {
+
+ NodeBuilder nodeBuilder = new NodeBuilder();
+ nodeBuilder.setId(new NodeId(f.sffNodeName));
+ nodeBuilder.setKey(new NodeKey(nodeBuilder.getId()));
+
+ InstanceIdentifier<Flow> iidFlow = InstanceIdentifier.builder(Nodes.class)
+ .child(Node.class, nodeBuilder.getKey())
+ .augmentation(FlowCapableNode.class)
+ .child(Table.class, f.tableKey)
+ .child(Flow.class, f.flowKey)
+ .build();
+
+ // No need to read previously existing flows. Merge will take care of that
+ trans.merge(LogicalDatastoreType.CONFIGURATION, iidFlow, f.flow, true);
+ }
+
+ CheckedFuture<Void, TransactionCommitFailedException> submitFuture = trans.submit();
+
+ try {
+ submitFuture.checkedGet();
+ } catch (TransactionCommitFailedException e) {
+ LOG.error("deleteTransactionAPI: Transaction failed. Message: {}", e.getMessage());
}
}
}
/**
- * A thread class used to remove flows from the data store.
+ * A thread class used to transactionally delete a set of flows belonging to a given RSP in a single transaction
*/
- class FlowRemoverTask implements Runnable {
- String sffNodeName;
- InstanceIdentifier<Flow> flowInstanceId;
+ class FlowSetRemoverTask implements Runnable {
- public FlowRemoverTask(String sffNodeName, InstanceIdentifier<Flow> flowInstanceId) {
- this.flowInstanceId = flowInstanceId;
- this.sffNodeName = sffNodeName;
+ Set<FlowDetails> flowsToDelete = new HashSet<FlowDetails>();
+
+ public FlowSetRemoverTask(Set<FlowDetails> flowsToDelete) {
+ this.flowsToDelete.addAll(flowsToDelete);
}
public void run(){
- if (!SfcDataStoreAPI.deleteTransactionAPI(flowInstanceId, LogicalDatastoreType.CONFIGURATION)) {
- LOG.error("{}: Failed to remove Flow on node: {}", Thread.currentThread().getStackTrace()[1], sffNodeName);
+
+ WriteTransaction writeTx = OpendaylightSfc.getOpendaylightSfcObj().getDataProvider().newWriteOnlyTransaction();
+
+ LOG.debug("FlowSetRemoverTask: starting deletion of {} flows", flowsToDelete.size());
+
+ for (FlowDetails f: flowsToDelete) {
+
+ NodeBuilder nodeBuilder = new NodeBuilder();
+ nodeBuilder.setId(new NodeId(f.sffNodeName));
+ nodeBuilder.setKey(new NodeKey(nodeBuilder.getId()));
+
+ InstanceIdentifier<Flow> iidFlow = InstanceIdentifier.builder(Nodes.class)
+ .child(Node.class, nodeBuilder.getKey())
+ .augmentation(FlowCapableNode.class)
+ .child(Table.class, f.tableKey)
+ .child(Flow.class, f.flowKey)
+ .build();
+
+ writeTx.delete(LogicalDatastoreType.CONFIGURATION, iidFlow);
+ }
+
+ CheckedFuture<Void, TransactionCommitFailedException> submitFuture = writeTx.submit();
+ try {
+ submitFuture.checkedGet();
+ } catch (TransactionCommitFailedException e) {
+ LOG.error("deleteTransactionAPI: Transaction failed. Message: {}", e.getMessage());
}
}
}
/**
- * Internal class used to store the details of a flow for easy deletion later
+ * Internal class used to store the details of a flow for easy creation / deletion later
*/
- private class FlowDetails {
+ public class FlowDetails {
public String sffNodeName;
public FlowKey flowKey;
public TableKey tableKey;
+ public Flow flow;
- public FlowDetails(final String sffNodeName, FlowKey flowKey, TableKey tableKey) {
+ /**
+ * This constructor is used for storing flows to be added
+ */
+ public FlowDetails(final String sffNodeName, FlowKey flowKey, TableKey tableKey, Flow flow) {
this.sffNodeName = sffNodeName;
this.flowKey = flowKey;
this.tableKey = tableKey;
+ this.flow = flow;
+ }
+
+ /**
+ * This constructor is used for storing flows to be deleted. Only the path ids are needed
+ */
+ public FlowDetails(final String sffNodeName, FlowKey flowKey, TableKey tableKey) {
+ this(sffNodeName, flowKey, tableKey, null);
}
}
/**
- * Write a flow to the DataStore
+ * Store a flow to be written later. The flows will be stored per
+ * SFF and table. Later, when flushFlows() is called, all the flows
+ * will be written. The tableId is taken from the FlowBuilder.
*
* @param sffNodeName - which SFF to write the flow to
* @param flow - details of the flow to be written
*/
@Override
- public void writeFlowToConfig(Long rspId, String sffNodeName,
- FlowBuilder flow) {
+ public void writeFlow(Long rspId, String sffNodeName, FlowBuilder flow) {
+ this.flowBuilder = flow;
- // Create the NodeBuilder
- NodeBuilder nodeBuilder = new NodeBuilder();
- nodeBuilder.setId(new NodeId(sffNodeName));
- nodeBuilder.setKey(new NodeKey(nodeBuilder.getId()));
-
- // Create the flow path, which will include the Node, Table, and Flow
- InstanceIdentifier<Flow> flowInstanceId = InstanceIdentifier.builder(Nodes.class)
- .child(Node.class, nodeBuilder.getKey())
- .augmentation(FlowCapableNode.class)
- .child(Table.class, new TableKey(flow.getTableId()))
- .child(Flow.class, flow.getKey())
- .build();
+ LOG.debug("writeFlow storing flow to Node {}, table {}", sffNodeName, flow.getTableId());
- LOG.debug("writeFlowToConfig writing flow to Node {}, table {}", sffNodeName, flow.getTableId());
+ // Add the flow to the set of flows to be added in a single transaction
+ setOfFlowsToAdd.add(new FlowDetails(sffNodeName, flow.getKey(), new TableKey(flow.getTableId()), flowBuilder.build()));
+ // This will store the flow info and rspId for removal later
storeFlowDetails(rspId, sffNodeName, flow.getKey(), flow.getTableId());
+ }
- FlowWriterTask writerThread = new FlowWriterTask(sffNodeName, flowInstanceId, flow);
- try {
- threadPoolExecutorService.execute(writerThread);
- } catch (Exception ex) {
- LOG.error(LOGSTR_THREAD_QUEUE_FULL, ex.toString());
- }
+ @Override
+ public void removeFlow(String sffNodeName, FlowKey flowKey,
+ TableKey tableKey) {
+
+ LOG.debug("removeFlow: removing flow with key {} from table {} in sff {}", flowKey, tableKey, sffNodeName);
+
+ FlowDetails flowDetail = new FlowDetails(sffNodeName, flowKey, tableKey);
+ setOfFlowsToDelete.add(flowDetail);
}
/**
- * Remove a Flow from the DataStore
- *
- * @param sffNodeName - which SFF the flow is in
- * @param flowKey - The flow key of the flow to be removed
- * @param tableKey - The table the flow was written to
+ * From previous calls to writeFlowToConfig(), flows were stored per table
+ * and per SFF. Now the flows will be written, one table at at time per SFF.
*/
@Override
- public void removeFlowFromConfig(String sffNodeName, FlowKey flowKey,
- TableKey tableKey) {
+ public void flushFlows() {
- NodeBuilder nodeBuilder = new NodeBuilder();
- nodeBuilder.setId(new NodeId(sffNodeName));
- nodeBuilder.setKey(new NodeKey(nodeBuilder.getId()));
+ LOG.info("flushFlows: creating flowWriter task, writing [{}] flows.",
+ setOfFlowsToAdd.size());
- // Create the flow path
- InstanceIdentifier<Flow> flowInstanceId = InstanceIdentifier.builder(Nodes.class)
- .child(Node.class, nodeBuilder.getKey())
- .augmentation(FlowCapableNode.class)
- .child(Table.class, tableKey)
- .child(Flow.class, flowKey)
- .build();
+ FlowSetWriterTask writerThread = new FlowSetWriterTask(setOfFlowsToAdd);
- FlowRemoverTask removerThread = new FlowRemoverTask(sffNodeName, flowInstanceId);
try {
- threadPoolExecutorService.execute(removerThread);
+ threadPoolExecutorService.execute(writerThread);
} catch (Exception ex) {
- LOG.error(LOGSTR_THREAD_QUEUE_FULL, ex.toString());
+ LOG.error(LOGSTR_THREAD_EXCEPTION, ex.toString());
}
+
+ // Clear the entries
+ setOfFlowsToAdd.clear();
+
+ }
+
+ /**
+ * Purge any unwritten flows not written-deleted yet. This should be called upon
+ * errors, when the remaining buffered flows should not be persisted
+ */
+ @Override
+ public void purgeFlows() {
+ setOfFlowsToAdd.clear();
+ setOfFlowsToDelete.clear();
}
/**
* @param flowKey - the flow key of the new flow
* @param tableId - the table the flow was written to
*/
- @Override
- public void storeFlowDetails(final Long rspId, final String sffNodeName, FlowKey flowKey, short tableId) {
+ private void storeFlowDetails(final Long rspId, final String sffNodeName, FlowKey flowKey, short tableId) {
List<FlowDetails> flowDetails = rspNameToFlowsMap.get(rspId);
if (flowDetails == null) {
flowDetails = new ArrayList<FlowDetails>();
}
}
+ /**
+ * Return the last flow builder
+ * Used mainly in Unit Testing
+ */
@Override
public FlowBuilder getFlowBuilder() {
return this.flowBuilder;
}
/**
- * Delete all flows created for the given rspId
+ * Delete all flows created for the given rspId (flows are stored in a deletion buffer;
+ * actual transactional deletion is performed upon deleteFlowSet() invokation
*
* @param rspId - the rspId to delete flows for
*/
}
rspNameToFlowsMap.remove(rspId);
- for (FlowDetails flowDetails : flowDetailsList) {
- removeFlowFromConfig(flowDetails.sffNodeName, flowDetails.flowKey, flowDetails.tableKey);
- }
+ setOfFlowsToDelete.addAll(flowDetailsList);
+ }
+
+ @Override
+ public void deleteFlowSet() {
+
+ LOG.info("deleteFlowSet: deleting {} flows", setOfFlowsToDelete.size());
+ FlowSetRemoverTask fsrt = new FlowSetRemoverTask(setOfFlowsToDelete);
+ try {
+ threadPoolExecutorService.execute(fsrt);
+ } catch (Exception ex) {
+ LOG.error(LOGSTR_THREAD_EXCEPTION, ex.toString());
+ }
+
+ // Clear the entries
+ setOfFlowsToDelete.clear();
+
}
@Override
// RSPs, which can be deleted.
Set<NodeId> sffNodeIDs = new HashSet<>();
if (rspNameToFlowsMap.size() == 1) {
+ LOG.debug("clearSffIfNoRspExists:only one rsp - deleting all remaining flows");
Set<Entry<Long, List<FlowDetails>>> entries = rspNameToFlowsMap.entrySet();
List<FlowDetails> flowDetailsList = entries.iterator().next().getValue();
for (FlowDetails flowDetails : flowDetailsList) {
- removeFlowFromConfig(flowDetails.sffNodeName, flowDetails.flowKey, flowDetails.tableKey);
+ setOfFlowsToDelete.add(flowDetails);
sffNodeIDs.add(new NodeId(flowDetails.sffNodeName));
}
rspNameToFlowsMap.clear();
}
return sffNodeIDs;
}
+
}
public interface SfcOfFlowWriterInterface {
//Write flows to MD-SAL datastore
- public void writeFlowToConfig(final Long rspId, final String sffNodeName, FlowBuilder flow);
+ public void writeFlow(final Long rspId, final String sffNodeName, FlowBuilder flow);
//Remove flows from MD-SAL datastore
- public void removeFlowFromConfig(final String sffNodeName, FlowKey flowKey, TableKey tableKey);
-
- //Store the flow details so it is easier to remove later
- public void storeFlowDetails(final Long rspId, final String sffNodeName, FlowKey flowKey, short tableId);
+ public void removeFlow(final String sffNodeName, FlowKey flowKey, TableKey tableKey);
//Write group to MD-SAL datastore
public void writeGroupToDataStore(String sffNodeName, GroupBuilder gb, boolean isAdd);
*/
public Set<NodeId> clearSffsIfNoRspExists();
- //Get flow
+ // Get the most recent Flow Builder
public FlowBuilder getFlowBuilder();
+ // Flush any flows that havent been written to the data store yet
+ public void flushFlows();
+
+ // Performs the deletion of any flows that havent been deleted from the data store yet
+ public void deleteFlowSet();
+
+ // Purge any flows that havent been written/deleted to/from the data store yet
+ public void purgeFlows();
+
// If the impl uses threads, shut it down
public void shutdown() throws ExecutionException, InterruptedException;
package org.opendaylight.sfc.ofrenderer;
-import static org.mockito.Mockito.anyLong;
-import static org.mockito.Mockito.anyString;
-import static org.mockito.Mockito.anyObject;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import org.mockito.stubbing.Answer;
flowBuilder = (FlowBuilder) args[2];
return null;
}
- }).when(this.sfcOfFlowWriter).writeFlowToConfig(anyLong(), anyString(), (FlowBuilder) anyObject());
+ }).when(this.sfcOfFlowWriter).writeFlow(anyLong(), anyString(), (FlowBuilder) anyObject());
// Configure Mockito to return the FlowBuilder stored by writeFlowToConfig()
// when getFlowBuilder() is called
"SFF_0", "00:00:00:00:00:02", "00:00:00:00:00:00", 2, "1", 0, false);
verify(this.flowProgrammerTestMoc, times(1)).configureVlanSfTransportEgressFlow(
"SFF_1", "00:00:00:00:00:07", "00:00:00:00:00:05", 3, "1", 0, false);
+
+ // verify flow flushing
+ verify(this.flowProgrammerTestMoc).flushFlows();
+ verify(this.flowProgrammerTestMoc).purgeFlows();
+
verifyNoMoreInteractions(this.flowProgrammerTestMoc);
}
verify(this.flowProgrammerTestMoc, times(1)).configureMplsLastHopTransportEgressFlow(eq("SFF_1"), anyString(),
anyString(), anyLong(), anyString(), anyLong());
+ // verify flow flushing
+ verify(this.flowProgrammerTestMoc).flushFlows();
+ verify(this.flowProgrammerTestMoc).purgeFlows();
+
verifyNoMoreInteractions(this.flowProgrammerTestMoc);
}
verify(this.flowProgrammerTestMoc).configureVxlanGpeAppCoexistTransportEgressFlow(
eq("SFF_1"), anyLong(), anyShort(), anyString());
+ // verify flow flushing
+ verify(this.flowProgrammerTestMoc).flushFlows();
+ verify(this.flowProgrammerTestMoc).purgeFlows();
+
verifyNoMoreInteractions(this.flowProgrammerTestMoc);
}
verify(this.flowProgrammerTestMoc).configureVxlanGpeAppCoexistTransportEgressFlow(
"SFF_0", 0, (short) 254, "192.168.0.2");
+ // verify flow flushing
+ verify(this.flowProgrammerTestMoc).flushFlows();
+ verify(this.flowProgrammerTestMoc).purgeFlows();
+
verifyNoMoreInteractions(this.flowProgrammerTestMoc);
}
verify(this.flowProgrammerTestMoc, times(1)).configureVlanSfTransportEgressFlow(
eq("SFF_1"), anyString(), anyString(), anyInt(), anyString(), anyLong(), eq(true));
+ // verify flow flushing
+ verify(this.flowProgrammerTestMoc).flushFlows();
+ verify(this.flowProgrammerTestMoc).purgeFlows();
+
verifyNoMoreInteractions(this.flowProgrammerTestMoc);
}
}
return false;
}
+
+ /**
+ * Creates an Instance Identifier (path) for node with specified id
+ *
+ * @param nodeId the ID of the node
+ * @return the {@link InstanceIdentifier}
+ */
+ public static final InstanceIdentifier<Node> createNodePath(final NodeId nodeId) {
+ return InstanceIdentifier.builder(Nodes.class).child(Node.class, new NodeKey(nodeId)).build();
+ }
+
+ /**
+ * Creates a table path from a node ID and table ID
+ *
+ * @param nodeId the ID of the node
+ * @param tableId the ID of the table
+ * @return the {@link InstanceIdentifier}
+ */
+ public static final InstanceIdentifier<Table> createTablePath(final NodeId nodeId, final short tableId) {
+ return createNodePath(nodeId).builder()
+ .augmentation(FlowCapableNode.class)
+ .child(Table.class, new TableKey(tableId))
+ .build();
+ }
+
+ /**
+ * Creates a path for particular flow, by appending flow-specific information
+ * to table path.
+ *
+ * @param table the table iid
+ * @param flowKey the flow key
+ * @return the {@link InstanceIdentifier}
+ */
+ public static InstanceIdentifier<Flow> createFlowPath(final InstanceIdentifier<Table> table, final FlowKey flowKey) {
+ return table.child(Flow.class, flowKey);
+ }
+
+ /**
+ * Creates a path for particular flow, by appending flow-specific information
+ * to table path.
+ *
+ * @param table the table iid
+ * @param flowId the flow id
+ * @return the {@link InstanceIdentifier}
+ */
+ public static InstanceIdentifier<Flow> createFlowPath(final InstanceIdentifier<Table> table, final FlowId flowId) {
+ return createFlowPath(table, new FlowKey(flowId));
+ }
+
}