package org.opendaylight.groupbasedpolicy.renderer.ofoverlay;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
import org.opendaylight.groupbasedpolicy.endpoint.EpKey;
-import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.equivalence.EquivalenceFabric;
import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.DestinationMapper;
import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.EgressNatMapper;
import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.ExternalMapper;
import org.opendaylight.groupbasedpolicy.resolver.PolicyScope;
import org.opendaylight.groupbasedpolicy.util.SingletonTask;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.ActionDefinitionId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.ofoverlay.rev140528.OfOverlayConfig.LearningMode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.SubjectFeatureDefinitions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Equivalence;
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Collections2;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.AsyncFunction;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-
/**
* Manage policies on switches by subscribing to updates from the
* policy resolver and information about endpoints from the endpoint
// Implementation
// **************
- public class FlowMap{
- private ConcurrentMap<InstanceIdentifier<Table>, TableBuilder> flowMap = new ConcurrentHashMap<>();
-
- public FlowMap() {
- }
-
- public TableBuilder getTableForNode(NodeId nodeId, short tableId) {
- InstanceIdentifier<Table> tableIid = FlowUtils.createTablePath(nodeId, tableId);
- if(this.flowMap.get(tableIid) == null) {
- this.flowMap.put(tableIid, new TableBuilder().setId(tableId));
- this.flowMap.get(tableIid).setFlow(new ArrayList<Flow>());
- }
- return this.flowMap.get(tableIid);
- }
-
- public void writeFlow(NodeId nodeId, short tableId, Flow flow) {
- TableBuilder tableBuilder = this.getTableForNode(nodeId, tableId);
- // transforming List<Flow> to Set (with customized equals/hashCode) to eliminate duplicate entries
- List<Flow> flows = tableBuilder.getFlow();
- Set<Equivalence.Wrapper<Flow>> wrappedFlows =
- new HashSet<>(Collections2.transform(flows, EquivalenceFabric.FLOW_WRAPPER_FUNCTION));
-
- Equivalence.Wrapper<Flow> wFlow = EquivalenceFabric.FLOW_EQUIVALENCE.wrap(flow);
-
- if (!wrappedFlows.contains(wFlow)) {
- tableBuilder.getFlow().add(Preconditions.checkNotNull(flow));
- } else {
- LOG.debug("Flow already exists in FlowMap - {}", flow);
- }
- }
-
- public void commitToDataStore() {
- if (dataBroker != null) {
- for( Entry<InstanceIdentifier<Table>, TableBuilder> entry : flowMap.entrySet()) {
- try {
- /*
- * Get the currently configured flows for
- * this table.
- */
- updateFlowTable(entry);
- } catch (Exception e) {
- LOG.warn("Couldn't read flow table {}", entry.getKey());
- }
- }
- }
- }
-
- private void updateFlowTable(Entry<InstanceIdentifier<Table>,
- TableBuilder> entry) throws Exception {
- // flows to update
- Set<Flow> update = new HashSet<>(entry.getValue().getFlow());
- // flows currently in the table
- Set<Flow> curr = new HashSet<>();
-
- ReadWriteTransaction t = dataBroker.newReadWriteTransaction();
- Optional<Table> r =
- t.read(LogicalDatastoreType.CONFIGURATION, entry.getKey()).get();
-
- if (r.isPresent()) {
- Table currentTable = r.get();
- curr = new HashSet<>(currentTable.getFlow());
- }
-
- // Sets with custom equivalence rules
- Set<Equivalence.Wrapper<Flow>> oldFlows =
- new HashSet<>(Collections2.transform(curr, EquivalenceFabric.FLOW_WRAPPER_FUNCTION));
- Set<Equivalence.Wrapper<Flow>> updatedFlows =
- new HashSet<>(Collections2.transform(update, EquivalenceFabric.FLOW_WRAPPER_FUNCTION));
-
- // what is still there but was not updated, needs to be deleted
- Sets.SetView<Equivalence.Wrapper<Flow>> deletions =
- Sets.difference(oldFlows, updatedFlows);
- // new flows (they were not there before)
- Sets.SetView<Equivalence.Wrapper<Flow>> additions =
- Sets.difference(updatedFlows, oldFlows);
-
- if (!deletions.isEmpty()) {
- for (Equivalence.Wrapper<Flow> wf: deletions) {
- Flow f = wf.get();
- if (f != null) {
- t.delete(LogicalDatastoreType.CONFIGURATION,
- FlowUtils.createFlowPath(entry.getKey(), f.getId()));
- }
- }
- }
- if (!additions.isEmpty()) {
- for (Equivalence.Wrapper<Flow> wf: additions) {
- Flow f = wf.get();
- if (f != null) {
- t.put(LogicalDatastoreType.CONFIGURATION,
- FlowUtils.createFlowPath(entry.getKey(), f.getId()), f, true);
- }
- }
- }
- CheckedFuture<Void, TransactionCommitFailedException> f = t.submit();
- Futures.addCallback(f, new FutureCallback<Void>() {
- @Override
- public void onFailure(Throwable t) {
- LOG.error("Could not write flow table {}", t);
- }
-
- @Override
- public void onSuccess(Void result) {
- LOG.debug("Flow table updated.");
- }
- });
- }
-
- }
-
private void scheduleUpdate() {
if (switchManager != null) {
LOG.trace("Scheduling flow update task");
* Update the flows on a particular switch
*/
private class SwitchFlowUpdateTask implements Callable<Void> {
- private FlowMap flowMap;
+ private OfWriter ofWriter;
- public SwitchFlowUpdateTask(FlowMap flowMap) {
+ public SwitchFlowUpdateTask(OfWriter ofWriter) {
super();
- this.flowMap = flowMap;
+ this.ofWriter = ofWriter;
}
@Override
return null;
for (OfTable table : flowPipeline) {
try {
- table.update(node, info, flowMap);
+ table.update(node, info, ofWriter);
} catch (Exception e) {
- LOG.error("Failed to write flow table {}",
+ LOG.error("Failed to write Openflow table {}",
table.getClass().getSimpleName(), e);
}
}
= new ExecutorCompletionService<>(executor);
int n = 0;
- FlowMap flowMap = new FlowMap();
+ OfWriter ofWriter = new OfWriter();
- SwitchFlowUpdateTask swut = new SwitchFlowUpdateTask(flowMap);
+ SwitchFlowUpdateTask swut = new SwitchFlowUpdateTask(ofWriter);
ecs.submit(swut);
n+=1;
for (int i = 0; i < n; i++) {
try {
ecs.take().get();
- flowMap.commitToDataStore();
+ ofWriter.commitToDataStore(dataBroker);
} catch (InterruptedException | ExecutionException e) {
LOG.error("Failed to update flow tables", e);
}