X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=renderers%2Fofoverlay%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fgroupbasedpolicy%2Frenderer%2Fofoverlay%2FPolicyManager.java;h=f50f57f47b324f68f2738cd92c26bd4d7a0a9106;hb=refs%2Fchanges%2F42%2F29642%2F14;hp=90d88c9488ab7f96223b7c7cd59e587a1b986df1;hpb=ced9c03f303c0e301e59437d6cd32f70ced14b1e;p=groupbasedpolicy.git diff --git a/renderers/ofoverlay/src/main/java/org/opendaylight/groupbasedpolicy/renderer/ofoverlay/PolicyManager.java b/renderers/ofoverlay/src/main/java/org/opendaylight/groupbasedpolicy/renderer/ofoverlay/PolicyManager.java index 90d88c948..f50f57f47 100755 --- a/renderers/ofoverlay/src/main/java/org/opendaylight/groupbasedpolicy/renderer/ofoverlay/PolicyManager.java +++ b/renderers/ofoverlay/src/main/java/org/opendaylight/groupbasedpolicy/renderer/ofoverlay/PolicyManager.java @@ -8,28 +8,35 @@ package org.opendaylight.groupbasedpolicy.renderer.ofoverlay; -import com.google.common.base.Equivalence; +import java.util.ArrayList; +import java.util.List; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import com.google.common.base.Function; import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import com.google.common.collect.Collections2; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Sets; -import com.google.common.util.concurrent.CheckedFuture; -import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.AsyncFunction; import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import org.opendaylight.controller.md.sal.binding.api.DataBroker; import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction; import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; -import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry; import org.opendaylight.groupbasedpolicy.endpoint.EpKey; -import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.equivalence.EquivalenceFabric; +import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.endpoint.EndpointManager; import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.DestinationMapper; +import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.EgressNatMapper; import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.ExternalMapper; import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.FlowUtils; import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.GroupTable; -import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.EgressNatMapper; import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.IngressNatMapper; import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.OfTable; import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.PolicyEnforcer; @@ -45,32 +52,16 @@ import org.opendaylight.groupbasedpolicy.resolver.PolicyListener; import org.opendaylight.groupbasedpolicy.resolver.PolicyResolver; import org.opendaylight.groupbasedpolicy.resolver.PolicyScope; import org.opendaylight.groupbasedpolicy.util.SingletonTask; -import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev100924.MacAddress; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow; import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.ActionDefinitionId; import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.ofoverlay.rev140528.OfOverlayConfig.LearningMode; import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.SubjectFeatureDefinitions; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId; +import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.CompletionService; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorCompletionService; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - /** * Manage policies on switches by subscribing to updates from the * policy resolver and information about endpoints from the endpoint @@ -82,15 +73,13 @@ public class PolicyManager LoggerFactory.getLogger(PolicyManager.class); private short tableOffset; - private final short TABLEID_PORTSECURITY = 0; - private final short TABLEID_INGRESS_NAT = (short) (tableOffset+1); - private final short TABLEID_SOURCE_MAPPER = (short) (tableOffset+2); - private final short TABLEID_DESTINATION_MAPPER = (short) (tableOffset+3); - private final short TABLEID_POLICY_ENFORCER = (short) (tableOffset+4); - private final short TABLEID_EGRESS_NAT = (short) (tableOffset+5); - private final short TABLEID_EXTERNAL_MAPPER = (short) (tableOffset+6); - - private static MacAddress externaMacAddress; + private static final short TABLEID_PORTSECURITY = 0; + private static final short TABLEID_INGRESS_NAT = 1; + private static final short TABLEID_SOURCE_MAPPER = 2; + private static final short TABLEID_DESTINATION_MAPPER = 3; + private static final short TABLEID_POLICY_ENFORCER = 4; + private static final short TABLEID_EGRESS_NAT = 5; + private static final short TABLEID_EXTERNAL_MAPPER = 6; private final SwitchManager switchManager; private final PolicyResolver policyResolver; @@ -100,11 +89,11 @@ public class PolicyManager private final ScheduledExecutorService executor; private final SingletonTask flowUpdateTask; private final DataBroker dataBroker; - + private final OfContext ofCtx; /** * The flow tables that make up the processing pipeline */ - private final List flowPipeline; + private List flowPipeline; /** * The delay before triggering the flow update task in response to an @@ -118,44 +107,32 @@ public class PolicyManager EndpointManager endpointManager, RpcProviderRegistry rpcRegistry, ScheduledExecutorService executor, - short tableOffset, - MacAddress externalRouterMac) { + short tableOffset) { super(); this.switchManager = switchManager; this.executor = executor; this.policyResolver = policyResolver; this.dataBroker = dataBroker; - this.tableOffset=tableOffset; - this.externaMacAddress=externalRouterMac; - - - if (dataBroker != null) { - WriteTransaction t = dataBroker.newWriteOnlyTransaction(); - t.put(LogicalDatastoreType.OPERATIONAL, - InstanceIdentifier - .builder(SubjectFeatureDefinitions.class) - .build(), - SubjectFeatures.OF_OVERLAY_FEATURES); - t.submit(); + this.tableOffset = tableOffset; + try { + // to validate against model + verifyMaxTableId(tableOffset); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Failed to start OF-Overlay renderer\n." + + "Max. table ID would be out of range. Check config-subsystem.\n{}", e); } + // TODO write capabilities in DS + for(Entry entry : SubjectFeatures.getActions().entrySet()) { policyResolver.registerActionDefinitions(entry.getKey(), entry.getValue()); } - OfContext ctx = new OfContext(dataBroker, rpcRegistry, + ofCtx = new OfContext(dataBroker, rpcRegistry, this, policyResolver, switchManager, endpointManager, executor); - flowPipeline = ImmutableList.of(new PortSecurity(ctx,TABLEID_PORTSECURITY), - new GroupTable(ctx), - new IngressNatMapper(ctx,TABLEID_INGRESS_NAT), - new SourceMapper(ctx,TABLEID_SOURCE_MAPPER), - new DestinationMapper(ctx,TABLEID_DESTINATION_MAPPER), - new PolicyEnforcer(ctx,TABLEID_POLICY_ENFORCER), - new EgressNatMapper(ctx,TABLEID_EGRESS_NAT), - new ExternalMapper(ctx,TABLEID_EXTERNAL_MAPPER) - ); + flowPipeline = createFlowPipeline(); policyScope = policyResolver.registerListener(this); if (switchManager != null) @@ -168,43 +145,137 @@ public class PolicyManager LOG.debug("Initialized OFOverlay policy manager"); } + private List createFlowPipeline() { + // TODO - PORTSECURITY is kept in table 0. + // According to openflow spec,processing on vSwitch always starts from table 0. + // Packets will be droped if table 0 is empty. + // Alternative workaround - table-miss flow entries in table 0. + return ImmutableList.of(new PortSecurity(ofCtx, (short) 0), + new GroupTable(ofCtx), + new IngressNatMapper(ofCtx, getTABLEID_INGRESS_NAT()), + new SourceMapper(ofCtx, getTABLEID_SOURCE_MAPPER()), + new DestinationMapper(ofCtx, getTABLEID_DESTINATION_MAPPER()), + new PolicyEnforcer(ofCtx, getTABLEID_POLICY_ENFORCER()), + new EgressNatMapper(ofCtx, getTABLEID_EGRESS_NAT()), + new ExternalMapper(ofCtx, getTABLEID_EXTERNAL_MAPPER()) + ); + } + + /** + * @param tableOffset the new offset value + * @return {@link ListenableFuture} to indicate that tables have been synced + */ + public ListenableFuture changeOpenFlowTableOffset(final short tableOffset) { + try { + verifyMaxTableId(tableOffset); + } catch (IllegalArgumentException e) { + LOG.error("Cannot update table offset. Max. table ID would be out of range.\n{}", e); + // TODO - invalid offset value remains in conf DS + // It's not possible to validate offset value by using constrains in model, + // because number of tables in pipeline varies. + return Futures.immediateFuture(null); + } + List tableIDs = getTableIDs(); + this.tableOffset = tableOffset; + return Futures.transform(removeUnusedTables(tableIDs), new Function() { + + @Override + public Void apply(Void tablesRemoved) { + flowPipeline = createFlowPipeline(); + scheduleUpdate(); + return null; + } + }); + } + + /** + * @param tableIDs - IDs of tables to delete + * @return ListenableFuture - which will be filled when clearing is done + */ + private ListenableFuture removeUnusedTables(final List tableIDs) { + List> checkList = new ArrayList<>(); + final ReadWriteTransaction rwTx = dataBroker.newReadWriteTransaction(); + for (Short tableId : tableIDs) { + for (NodeId nodeId : switchManager.getReadySwitches()) { + final InstanceIdentifier tablePath = FlowUtils.createTablePath(nodeId, tableId); + checkList.add(deteleTableIfExists(rwTx, tablePath)); + } + } + ListenableFuture> allAsListFuture = Futures.allAsList(checkList); + return Futures.transform(allAsListFuture, new AsyncFunction, Void>() { + + @Override + public ListenableFuture apply(List readyToSubmit) { + return rwTx.submit(); + } + }); + } + + private List getTableIDs() { + List tableIds = new ArrayList<>(); + tableIds.add(getTABLEID_PORTSECURITY()); + tableIds.add(getTABLEID_INGRESS_NAT()); + tableIds.add(getTABLEID_SOURCE_MAPPER()); + tableIds.add(getTABLEID_DESTINATION_MAPPER()); + tableIds.add(getTABLEID_POLICY_ENFORCER()); + tableIds.add(getTABLEID_EGRESS_NAT()); + tableIds.add(getTABLEID_EXTERNAL_MAPPER()); + return tableIds; + } + + private ListenableFuture deteleTableIfExists(final ReadWriteTransaction rwTx, final InstanceIdentifier
tablePath){ + return Futures.transform(rwTx.read(LogicalDatastoreType.CONFIGURATION, tablePath), new Function, Void>() { + + @Override + public Void apply(Optional
optTable) { + if(optTable.isPresent()){ + rwTx.delete(LogicalDatastoreType.CONFIGURATION, tablePath); + } + return null; + }}); + } + // ************** // SwitchListener // ************** - public short getTABLEID_PORTSECURITY() { - return TABLEID_PORTSECURITY; + return (short)(tableOffset+TABLEID_PORTSECURITY); } public short getTABLEID_INGRESS_NAT() { - return TABLEID_INGRESS_NAT; + return (short)(tableOffset+TABLEID_INGRESS_NAT); } public short getTABLEID_SOURCE_MAPPER() { - return TABLEID_SOURCE_MAPPER; + return (short)(tableOffset+TABLEID_SOURCE_MAPPER); } public short getTABLEID_DESTINATION_MAPPER() { - return TABLEID_DESTINATION_MAPPER; + return (short)(tableOffset+TABLEID_DESTINATION_MAPPER); } public short getTABLEID_POLICY_ENFORCER() { - return TABLEID_POLICY_ENFORCER; + return (short)(tableOffset+TABLEID_POLICY_ENFORCER); } public short getTABLEID_EGRESS_NAT() { - return TABLEID_EGRESS_NAT; + return (short)(tableOffset+TABLEID_EGRESS_NAT); } public short getTABLEID_EXTERNAL_MAPPER() { - return TABLEID_EXTERNAL_MAPPER; + return (short)(tableOffset+TABLEID_EXTERNAL_MAPPER); + } + + + public TableId verifyMaxTableId(short tableOffset) { + return new TableId((short)(tableOffset+TABLEID_EXTERNAL_MAPPER)); } @Override @@ -264,146 +335,10 @@ public class PolicyManager // No-op for now } - public static MacAddress getExternaMacAddress() { - return externaMacAddress; - } - // ************** // Implementation // ************** - public class FlowMap{ - private ConcurrentMap, TableBuilder> flowMap = new ConcurrentHashMap<>(); - - public FlowMap() { - } - - public TableBuilder getTableForNode(NodeId nodeId, short tableId) { - InstanceIdentifier
tableIid = FlowUtils.createTablePath(nodeId, tableId); - if(this.flowMap.get(tableIid) == null) { - this.flowMap.put(tableIid, new TableBuilder().setId(tableId)); - this.flowMap.get(tableIid).setFlow(new ArrayList()); - } - return this.flowMap.get(tableIid); - } - - public void writeFlow(NodeId nodeId, short tableId, Flow flow) { - TableBuilder tableBuilder = this.getTableForNode(nodeId, tableId); - // transforming List to Set (with customized equals/hashCode) to eliminate duplicate entries - List flows = tableBuilder.getFlow(); - Set> wrappedFlows = - new HashSet<>(Collections2.transform(flows, EquivalenceFabric.FLOW_WRAPPER_FUNCTION)); - - Equivalence.Wrapper wFlow = EquivalenceFabric.FLOW_EQUIVALENCE.wrap(flow); - - if (!wrappedFlows.contains(wFlow)) { - tableBuilder.getFlow().add(Preconditions.checkNotNull(flow)); - } - } - - public void commitToDataStore() { - if (dataBroker != null) { - for( Entry, TableBuilder> entry : flowMap.entrySet()) { - try { - /* - * Get the currently configured flows for - * this table. - */ - updateFlowTable(entry); - } catch (Exception e) { - LOG.warn("Couldn't read flow table {}", entry.getKey()); - } - } - } - } - - private void updateFlowTable(Entry, - TableBuilder> entry) throws Exception { - // flows to update - Set update = new HashSet<>(entry.getValue().getFlow()); - // flows currently in the table - Set curr = new HashSet<>(); - - ReadWriteTransaction t = dataBroker.newReadWriteTransaction(); - Optional
r = - t.read(LogicalDatastoreType.CONFIGURATION, entry.getKey()).get(); - - if (r.isPresent()) { - Table currentTable = r.get(); - curr = new HashSet<>(currentTable.getFlow()); - } - - // Sets with custom equivalence rules - Set> oldFlows = - new HashSet<>(Collections2.transform(curr, EquivalenceFabric.FLOW_WRAPPER_FUNCTION)); - Set> updatedFlows = - new HashSet<>(Collections2.transform(update, EquivalenceFabric.FLOW_WRAPPER_FUNCTION)); - - // what is still there but was not updated, needs to be deleted - Sets.SetView> deletions = - Sets.difference(oldFlows, updatedFlows); - // new flows (they were not there before) - Sets.SetView> additions = - Sets.difference(updatedFlows, oldFlows); - - if (!deletions.isEmpty()) { - for (Equivalence.Wrapper wf: deletions) { - Flow f = wf.get(); - if (f != null) { - t.delete(LogicalDatastoreType.CONFIGURATION, - FlowUtils.createFlowPath(entry.getKey(), f.getId())); - } - } - } - if (!additions.isEmpty()) { - for (Equivalence.Wrapper wf: additions) { - Flow f = wf.get(); - if (f != null) { - t.put(LogicalDatastoreType.CONFIGURATION, - FlowUtils.createFlowPath(entry.getKey(), f.getId()), f, true); - } - } - } - CheckedFuture f = t.submit(); - Futures.addCallback(f, new FutureCallback() { - @Override - public void onFailure(Throwable t) { - LOG.error("Could not write flow table {}", t); - } - - @Override - public void onSuccess(Void result) { - LOG.debug("Flow table updated."); - } - }); - } - - private void purgeFromDataStore() { - // TODO: tbachman: Remove for Lithium -- this is a workaround - // where some flow-mods aren't getting installed - // on vSwitches when changing L3 contexts - WriteTransaction d = dataBroker.newWriteOnlyTransaction(); - - for( Entry, TableBuilder> entry : flowMap.entrySet()) { - d.delete(LogicalDatastoreType.CONFIGURATION, entry.getKey()); - } - - CheckedFuture fu = d.submit(); - Futures.addCallback(fu, new FutureCallback() { - @Override - public void onFailure(Throwable th) { - LOG.error("Could not write flow table.", th); - } - - @Override - public void onSuccess(Void result) { - LOG.debug("Flow table updated."); - } - }); - } - - } - private void scheduleUpdate() { if (switchManager != null) { LOG.trace("Scheduling flow update task"); @@ -415,11 +350,11 @@ public class PolicyManager * Update the flows on a particular switch */ private class SwitchFlowUpdateTask implements Callable { - private FlowMap flowMap; + private OfWriter ofWriter; - public SwitchFlowUpdateTask(FlowMap flowMap) { + public SwitchFlowUpdateTask(OfWriter ofWriter) { super(); - this.flowMap = flowMap; + this.ofWriter = ofWriter; } @Override @@ -430,9 +365,9 @@ public class PolicyManager return null; for (OfTable table : flowPipeline) { try { - table.update(node, info, flowMap); + table.update(node, info, ofWriter); } catch (Exception e) { - LOG.error("Failed to write flow table {}", + LOG.error("Failed to write Openflow table {}", table.getClass().getSimpleName(), e); } } @@ -454,16 +389,16 @@ public class PolicyManager = new ExecutorCompletionService<>(executor); int n = 0; - FlowMap flowMap = new FlowMap(); + OfWriter ofWriter = new OfWriter(); - SwitchFlowUpdateTask swut = new SwitchFlowUpdateTask(flowMap); + SwitchFlowUpdateTask swut = new SwitchFlowUpdateTask(ofWriter); ecs.submit(swut); n+=1; for (int i = 0; i < n; i++) { try { ecs.take().get(); - flowMap.commitToDataStore(); + ofWriter.commitToDataStore(dataBroker); } catch (InterruptedException | ExecutionException e) { LOG.error("Failed to update flow tables", e); }