X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=renderers%2Fofoverlay%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fgroupbasedpolicy%2Frenderer%2Fofoverlay%2FPolicyManager.java;h=f50f57f47b324f68f2738cd92c26bd4d7a0a9106;hb=refs%2Fchanges%2F42%2F29642%2F14;hp=0a5c1d2d6b85fb536e0f5c21b51de9e52c5ddd12;hpb=599612934795eef713370dec623864a12fb321fb;p=groupbasedpolicy.git diff --git a/renderers/ofoverlay/src/main/java/org/opendaylight/groupbasedpolicy/renderer/ofoverlay/PolicyManager.java b/renderers/ofoverlay/src/main/java/org/opendaylight/groupbasedpolicy/renderer/ofoverlay/PolicyManager.java old mode 100644 new mode 100755 index 0a5c1d2d6..f50f57f47 --- a/renderers/ofoverlay/src/main/java/org/opendaylight/groupbasedpolicy/renderer/ofoverlay/PolicyManager.java +++ b/renderers/ofoverlay/src/main/java/org/opendaylight/groupbasedpolicy/renderer/ofoverlay/PolicyManager.java @@ -14,26 +14,37 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import com.google.common.base.Function; +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.AsyncFunction; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import org.opendaylight.controller.md.sal.binding.api.DataBroker; +import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction; import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; -import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry; import org.opendaylight.groupbasedpolicy.endpoint.EpKey; +import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.endpoint.EndpointManager; import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.DestinationMapper; +import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.EgressNatMapper; +import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.ExternalMapper; import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.FlowUtils; import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.GroupTable; +import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.IngressNatMapper; import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.OfTable; import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.PolicyEnforcer; import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.PortSecurity; import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.SourceMapper; +import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.node.SwitchListener; +import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.node.SwitchManager; +import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.sf.Action; import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.sf.SubjectFeatures; import org.opendaylight.groupbasedpolicy.resolver.EgKey; import org.opendaylight.groupbasedpolicy.resolver.PolicyInfo; @@ -42,20 +53,15 @@ import org.opendaylight.groupbasedpolicy.resolver.PolicyResolver; import org.opendaylight.groupbasedpolicy.resolver.PolicyScope; import org.opendaylight.groupbasedpolicy.util.SingletonTask; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow; +import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.ActionDefinitionId; import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.ofoverlay.rev140528.OfOverlayConfig.LearningMode; import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.SubjectFeatureDefinitions; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId; +import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.ImmutableList; -import com.google.common.util.concurrent.CheckedFuture; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; - /** * Manage policies on switches by subscribing to updates from the * policy resolver and information about endpoints from the endpoint @@ -66,6 +72,15 @@ public class PolicyManager private static final Logger LOG = LoggerFactory.getLogger(PolicyManager.class); + private short tableOffset; + private static final short TABLEID_PORTSECURITY = 0; + private static final short TABLEID_INGRESS_NAT = 1; + private static final short TABLEID_SOURCE_MAPPER = 2; + private static final short TABLEID_DESTINATION_MAPPER = 3; + private static final short TABLEID_POLICY_ENFORCER = 4; + private static final short TABLEID_EGRESS_NAT = 5; + private static final short TABLEID_EXTERNAL_MAPPER = 6; + private final SwitchManager switchManager; private final PolicyResolver policyResolver; @@ -74,11 +89,11 @@ public class PolicyManager private final ScheduledExecutorService executor; private final SingletonTask flowUpdateTask; private final DataBroker dataBroker; - + private final OfContext ofCtx; /** * The flow tables that make up the processing pipeline */ - private final List flowPipeline; + private List flowPipeline; /** * The delay before triggering the flow update task in response to an @@ -86,39 +101,38 @@ public class PolicyManager */ private final static int FLOW_UPDATE_DELAY = 250; - - public PolicyManager(DataBroker dataBroker, PolicyResolver policyResolver, SwitchManager switchManager, EndpointManager endpointManager, RpcProviderRegistry rpcRegistry, - ScheduledExecutorService executor) { + ScheduledExecutorService executor, + short tableOffset) { super(); this.switchManager = switchManager; this.executor = executor; this.policyResolver = policyResolver; this.dataBroker = dataBroker; + this.tableOffset = tableOffset; + try { + // to validate against model + verifyMaxTableId(tableOffset); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Failed to start OF-Overlay renderer\n." + + "Max. table ID would be out of range. Check config-subsystem.\n{}", e); + } + // TODO write capabilities in DS - if (dataBroker != null) { - WriteTransaction t = dataBroker.newWriteOnlyTransaction(); - t.put(LogicalDatastoreType.OPERATIONAL, - InstanceIdentifier - .builder(SubjectFeatureDefinitions.class) - .build(), - SubjectFeatures.OF_OVERLAY_FEATURES); - t.submit(); + for(Entry entry : SubjectFeatures.getActions().entrySet()) { + policyResolver.registerActionDefinitions(entry.getKey(), entry.getValue()); } - OfContext ctx = new OfContext(dataBroker, rpcRegistry, + ofCtx = new OfContext(dataBroker, rpcRegistry, this, policyResolver, switchManager, endpointManager, executor); - flowPipeline = ImmutableList.of(new PortSecurity(ctx), - new GroupTable(ctx), - new SourceMapper(ctx), - new DestinationMapper(ctx), - new PolicyEnforcer(ctx)); + + flowPipeline = createFlowPipeline(); policyScope = policyResolver.registerListener(this); if (switchManager != null) @@ -131,38 +145,142 @@ public class PolicyManager LOG.debug("Initialized OFOverlay policy manager"); } + private List createFlowPipeline() { + // TODO - PORTSECURITY is kept in table 0. + // According to openflow spec,processing on vSwitch always starts from table 0. + // Packets will be droped if table 0 is empty. + // Alternative workaround - table-miss flow entries in table 0. + return ImmutableList.of(new PortSecurity(ofCtx, (short) 0), + new GroupTable(ofCtx), + new IngressNatMapper(ofCtx, getTABLEID_INGRESS_NAT()), + new SourceMapper(ofCtx, getTABLEID_SOURCE_MAPPER()), + new DestinationMapper(ofCtx, getTABLEID_DESTINATION_MAPPER()), + new PolicyEnforcer(ofCtx, getTABLEID_POLICY_ENFORCER()), + new EgressNatMapper(ofCtx, getTABLEID_EGRESS_NAT()), + new ExternalMapper(ofCtx, getTABLEID_EXTERNAL_MAPPER()) + ); + } + + /** + * @param tableOffset the new offset value + * @return {@link ListenableFuture} to indicate that tables have been synced + */ + public ListenableFuture changeOpenFlowTableOffset(final short tableOffset) { + try { + verifyMaxTableId(tableOffset); + } catch (IllegalArgumentException e) { + LOG.error("Cannot update table offset. Max. table ID would be out of range.\n{}", e); + // TODO - invalid offset value remains in conf DS + // It's not possible to validate offset value by using constrains in model, + // because number of tables in pipeline varies. + return Futures.immediateFuture(null); + } + List tableIDs = getTableIDs(); + this.tableOffset = tableOffset; + return Futures.transform(removeUnusedTables(tableIDs), new Function() { + + @Override + public Void apply(Void tablesRemoved) { + flowPipeline = createFlowPipeline(); + scheduleUpdate(); + return null; + } + }); + } + + /** + * @param tableIDs - IDs of tables to delete + * @return ListenableFuture - which will be filled when clearing is done + */ + private ListenableFuture removeUnusedTables(final List tableIDs) { + List> checkList = new ArrayList<>(); + final ReadWriteTransaction rwTx = dataBroker.newReadWriteTransaction(); + for (Short tableId : tableIDs) { + for (NodeId nodeId : switchManager.getReadySwitches()) { + final InstanceIdentifier tablePath = FlowUtils.createTablePath(nodeId, tableId); + checkList.add(deteleTableIfExists(rwTx, tablePath)); + } + } + ListenableFuture> allAsListFuture = Futures.allAsList(checkList); + return Futures.transform(allAsListFuture, new AsyncFunction, Void>() { + + @Override + public ListenableFuture apply(List readyToSubmit) { + return rwTx.submit(); + } + }); + } + + private List getTableIDs() { + List tableIds = new ArrayList<>(); + tableIds.add(getTABLEID_PORTSECURITY()); + tableIds.add(getTABLEID_INGRESS_NAT()); + tableIds.add(getTABLEID_SOURCE_MAPPER()); + tableIds.add(getTABLEID_DESTINATION_MAPPER()); + tableIds.add(getTABLEID_POLICY_ENFORCER()); + tableIds.add(getTABLEID_EGRESS_NAT()); + tableIds.add(getTABLEID_EXTERNAL_MAPPER()); + return tableIds; + } + + private ListenableFuture deteleTableIfExists(final ReadWriteTransaction rwTx, final InstanceIdentifier
tablePath){ + return Futures.transform(rwTx.read(LogicalDatastoreType.CONFIGURATION, tablePath), new Function, Void>() { + + @Override + public Void apply(Optional
optTable) { + if(optTable.isPresent()){ + rwTx.delete(LogicalDatastoreType.CONFIGURATION, tablePath); + } + return null; + }}); + } + // ************** // SwitchListener // ************** + public short getTABLEID_PORTSECURITY() { + return (short)(tableOffset+TABLEID_PORTSECURITY); + } + + + public short getTABLEID_INGRESS_NAT() { + return (short)(tableOffset+TABLEID_INGRESS_NAT); + } + + + public short getTABLEID_SOURCE_MAPPER() { + return (short)(tableOffset+TABLEID_SOURCE_MAPPER); + } + + + public short getTABLEID_DESTINATION_MAPPER() { + return (short)(tableOffset+TABLEID_DESTINATION_MAPPER); + } + + + public short getTABLEID_POLICY_ENFORCER() { + return (short)(tableOffset+TABLEID_POLICY_ENFORCER); + } + + + public short getTABLEID_EGRESS_NAT() { + return (short)(tableOffset+TABLEID_EGRESS_NAT); + } + + + public short getTABLEID_EXTERNAL_MAPPER() { + return (short)(tableOffset+TABLEID_EXTERNAL_MAPPER); + } + + + public TableId verifyMaxTableId(short tableOffset) { + return new TableId((short)(tableOffset+TABLEID_EXTERNAL_MAPPER)); + } + @Override public void switchReady(final NodeId nodeId) { - //TODO Apr15 alagalah : OVSDB CRUD tunnels may go here. -// WriteTransaction t = dataBroker.newWriteOnlyTransaction(); -// -// NodeBuilder nb = new NodeBuilder() -// .setId(nodeId) -// .addAugmentation(FlowCapableNode.class, -// new FlowCapableNodeBuilder() -// .build()); -// t.merge(LogicalDatastoreType.CONFIGURATION, -// FlowUtils.createNodePath(nodeId), -// nb.build(), true); -// ListenableFuture result = t.submit(); -// Futures.addCallback(result, -// new FutureCallback() { -// @Override -// public void onSuccess(Void result) { -// dirty.get().addNode(nodeId); -// scheduleUpdate(); -// } -// -// @Override -// public void onFailure(Throwable t) { -// LOG.error("Could not add switch {}", nodeId, t); -// } -// }); - + scheduleUpdate(); } @Override @@ -217,60 +335,10 @@ public class PolicyManager // No-op for now } - - // ************** // Implementation // ************** - public class FlowMap{ - private ConcurrentMap, TableBuilder> flowMap = new ConcurrentHashMap<>(); - - public FlowMap() { - } - - public TableBuilder getTableForNode(NodeId nodeId, short tableId) { - InstanceIdentifier
tableIid = FlowUtils.createTablePath(nodeId, tableId); - if(this.flowMap.get(tableIid) == null) { - this.flowMap.put(tableIid, new TableBuilder().setId(tableId)); - this.flowMap.get(tableIid).setFlow(new ArrayList()); - } - return this.flowMap.get(tableIid); - } - - public void writeFlow(NodeId nodeId,short tableId, Flow flow) { - TableBuilder tableBuilder = this.getTableForNode(nodeId, tableId); - if (!tableBuilder.getFlow().contains(flow)) { - tableBuilder.getFlow().add(flow); - } - } - - public void commitToDataStore() { - if (dataBroker != null) { - WriteTransaction t = dataBroker.newWriteOnlyTransaction(); - - for( Entry, TableBuilder> entry : flowMap.entrySet()) { - t.put(LogicalDatastoreType.CONFIGURATION, - entry.getKey(), entry.getValue().build(),true); - } - - CheckedFuture f = t.submit(); - Futures.addCallback(f, new FutureCallback() { - @Override - public void onFailure(Throwable t) { - LOG.error("Could not write flow table.", t); - } - - @Override - public void onSuccess(Void result) { - LOG.debug("Flow table updated."); - } - }); - } - } - - } - private void scheduleUpdate() { if (switchManager != null) { LOG.trace("Scheduling flow update task"); @@ -282,26 +350,24 @@ public class PolicyManager * Update the flows on a particular switch */ private class SwitchFlowUpdateTask implements Callable { - private FlowMap flowMap; + private OfWriter ofWriter; - public SwitchFlowUpdateTask(FlowMap flowMap) { + public SwitchFlowUpdateTask(OfWriter ofWriter) { super(); - this.flowMap = flowMap; + this.ofWriter = ofWriter; } @Override public Void call() throws Exception { for (NodeId node : switchManager.getReadySwitches()) { - if (!switchManager.isSwitchReady(node)) - return null; PolicyInfo info = policyResolver.getCurrentPolicy(); if (info == null) return null; for (OfTable table : flowPipeline) { try { - table.update(node, info, flowMap); + table.update(node, info, ofWriter); } catch (Exception e) { - LOG.error("Failed to write flow table {}", + LOG.error("Failed to write Openflow table {}", table.getClass().getSimpleName(), e); } } @@ -320,19 +386,19 @@ public class PolicyManager LOG.debug("Beginning flow update task"); CompletionService ecs - = new ExecutorCompletionService(executor); + = new ExecutorCompletionService<>(executor); int n = 0; - FlowMap flowMap = new FlowMap(); + OfWriter ofWriter = new OfWriter(); - SwitchFlowUpdateTask swut = new SwitchFlowUpdateTask(flowMap); + SwitchFlowUpdateTask swut = new SwitchFlowUpdateTask(ofWriter); ecs.submit(swut); n+=1; for (int i = 0; i < n; i++) { try { ecs.take().get(); - flowMap.commitToDataStore(); + ofWriter.commitToDataStore(dataBroker); } catch (InterruptedException | ExecutionException e) { LOG.error("Failed to update flow tables", e); }