/* * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.groupbasedpolicy.renderer.ofoverlay; import com.google.common.base.Equivalence; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.Collections2; import com.google.common.collect.ImmutableList; import com.google.common.collect.Sets; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import org.opendaylight.controller.md.sal.binding.api.DataBroker; import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction; import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry; import org.opendaylight.groupbasedpolicy.endpoint.EpKey; import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.equivalence.EquivalenceFabric; import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.DestinationMapper; import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.ExternalMapper; import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.FlowUtils; import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.GroupTable; import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.EgressNatMapper; import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.IngressNatMapper; import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.OfTable; import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.PolicyEnforcer; import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.PortSecurity; import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.SourceMapper; import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.node.SwitchListener; import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.node.SwitchManager; import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.sf.Action; import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.sf.SubjectFeatures; import org.opendaylight.groupbasedpolicy.resolver.EgKey; import org.opendaylight.groupbasedpolicy.resolver.PolicyInfo; import org.opendaylight.groupbasedpolicy.resolver.PolicyListener; import org.opendaylight.groupbasedpolicy.resolver.PolicyResolver; import org.opendaylight.groupbasedpolicy.resolver.PolicyScope; import org.opendaylight.groupbasedpolicy.util.SingletonTask; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev100924.MacAddress; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow; import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.ActionDefinitionId; import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.ofoverlay.rev140528.OfOverlayConfig.LearningMode; import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.SubjectFeatureDefinitions; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /** * Manage policies on switches by subscribing to updates from the * policy resolver and information about endpoints from the endpoint * registry */ public class PolicyManager implements SwitchListener, PolicyListener, EndpointListener { private static final Logger LOG = LoggerFactory.getLogger(PolicyManager.class); private short tableOffset; private final short TABLEID_PORTSECURITY = 0; private final short TABLEID_INGRESS_NAT = (short) (tableOffset+1); private final short TABLEID_SOURCE_MAPPER = (short) (tableOffset+2); private final short TABLEID_DESTINATION_MAPPER = (short) (tableOffset+3); private final short TABLEID_POLICY_ENFORCER = (short) (tableOffset+4); private final short TABLEID_EGRESS_NAT = (short) (tableOffset+5); private final short TABLEID_EXTERNAL_MAPPER = (short) (tableOffset+6); private static MacAddress externaMacAddress; private final SwitchManager switchManager; private final PolicyResolver policyResolver; private final PolicyScope policyScope; private final ScheduledExecutorService executor; private final SingletonTask flowUpdateTask; private final DataBroker dataBroker; /** * The flow tables that make up the processing pipeline */ private final List flowPipeline; /** * The delay before triggering the flow update task in response to an * event in milliseconds. */ private final static int FLOW_UPDATE_DELAY = 250; public PolicyManager(DataBroker dataBroker, PolicyResolver policyResolver, SwitchManager switchManager, EndpointManager endpointManager, RpcProviderRegistry rpcRegistry, ScheduledExecutorService executor, short tableOffset, MacAddress externalRouterMac) { super(); this.switchManager = switchManager; this.executor = executor; this.policyResolver = policyResolver; this.dataBroker = dataBroker; this.tableOffset=tableOffset; this.externaMacAddress=externalRouterMac; if (dataBroker != null) { WriteTransaction t = dataBroker.newWriteOnlyTransaction(); t.put(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier .builder(SubjectFeatureDefinitions.class) .build(), SubjectFeatures.OF_OVERLAY_FEATURES); t.submit(); } for(Entry entry : SubjectFeatures.getActions().entrySet()) { policyResolver.registerActionDefinitions(entry.getKey(), entry.getValue()); } OfContext ctx = new OfContext(dataBroker, rpcRegistry, this, policyResolver, switchManager, endpointManager, executor); flowPipeline = ImmutableList.of(new PortSecurity(ctx,TABLEID_PORTSECURITY), new GroupTable(ctx), new IngressNatMapper(ctx,TABLEID_INGRESS_NAT), new SourceMapper(ctx,TABLEID_SOURCE_MAPPER), new DestinationMapper(ctx,TABLEID_DESTINATION_MAPPER), new PolicyEnforcer(ctx,TABLEID_POLICY_ENFORCER), new EgressNatMapper(ctx,TABLEID_EGRESS_NAT), new ExternalMapper(ctx,TABLEID_EXTERNAL_MAPPER) ); policyScope = policyResolver.registerListener(this); if (switchManager != null) switchManager.registerListener(this); endpointManager.registerListener(this); flowUpdateTask = new SingletonTask(executor, new FlowUpdateTask()); scheduleUpdate(); LOG.debug("Initialized OFOverlay policy manager"); } // ************** // SwitchListener // ************** public short getTABLEID_PORTSECURITY() { return TABLEID_PORTSECURITY; } public short getTABLEID_INGRESS_NAT() { return TABLEID_INGRESS_NAT; } public short getTABLEID_SOURCE_MAPPER() { return TABLEID_SOURCE_MAPPER; } public short getTABLEID_DESTINATION_MAPPER() { return TABLEID_DESTINATION_MAPPER; } public short getTABLEID_POLICY_ENFORCER() { return TABLEID_POLICY_ENFORCER; } public short getTABLEID_EGRESS_NAT() { return TABLEID_EGRESS_NAT; } public short getTABLEID_EXTERNAL_MAPPER() { return TABLEID_EXTERNAL_MAPPER; } @Override public void switchReady(final NodeId nodeId) { scheduleUpdate(); } @Override public void switchRemoved(NodeId sw) { // XXX TODO purge switch flows scheduleUpdate(); } @Override public void switchUpdated(NodeId sw) { scheduleUpdate(); } // **************** // EndpointListener // **************** @Override public void endpointUpdated(EpKey epKey) { scheduleUpdate(); } @Override public void nodeEndpointUpdated(NodeId nodeId, EpKey epKey){ scheduleUpdate(); } @Override public void groupEndpointUpdated(EgKey egKey, EpKey epKey) { policyScope.addToScope(egKey.getTenantId(), egKey.getEgId()); scheduleUpdate(); } // ************** // PolicyListener // ************** @Override public void policyUpdated(Set updatedConsumers) { scheduleUpdate(); } // ************* // PolicyManager // ************* /** * Set the learning mode to the specified value * @param learningMode the learning mode to set */ public void setLearningMode(LearningMode learningMode) { // No-op for now } public static MacAddress getExternaMacAddress() { return externaMacAddress; } // ************** // Implementation // ************** public class FlowMap{ private ConcurrentMap, TableBuilder> flowMap = new ConcurrentHashMap<>(); public FlowMap() { } public TableBuilder getTableForNode(NodeId nodeId, short tableId) { InstanceIdentifier tableIid = FlowUtils.createTablePath(nodeId, tableId); if(this.flowMap.get(tableIid) == null) { this.flowMap.put(tableIid, new TableBuilder().setId(tableId)); this.flowMap.get(tableIid).setFlow(new ArrayList()); } return this.flowMap.get(tableIid); } public void writeFlow(NodeId nodeId, short tableId, Flow flow) { TableBuilder tableBuilder = this.getTableForNode(nodeId, tableId); // transforming List to Set (with customized equals/hashCode) to eliminate duplicate entries List flows = tableBuilder.getFlow(); Set> wrappedFlows = new HashSet<>(Collections2.transform(flows, EquivalenceFabric.FLOW_WRAPPER_FUNCTION)); Equivalence.Wrapper wFlow = EquivalenceFabric.FLOW_EQUIVALENCE.wrap(flow); if (!wrappedFlows.contains(wFlow)) { tableBuilder.getFlow().add(Preconditions.checkNotNull(flow)); } } public void commitToDataStore() { if (dataBroker != null) { for( Entry, TableBuilder> entry : flowMap.entrySet()) { try { /* * Get the currently configured flows for * this table. */ updateFlowTable(entry); } catch (Exception e) { LOG.warn("Couldn't read flow table {}", entry.getKey()); } } } } private void updateFlowTable(Entry, TableBuilder> entry) throws Exception { // flows to update Set update = new HashSet<>(entry.getValue().getFlow()); // flows currently in the table Set curr = new HashSet<>(); ReadWriteTransaction t = dataBroker.newReadWriteTransaction(); Optional
r = t.read(LogicalDatastoreType.CONFIGURATION, entry.getKey()).get(); if (r.isPresent()) { Table currentTable = r.get(); curr = new HashSet<>(currentTable.getFlow()); } // Sets with custom equivalence rules Set> oldFlows = new HashSet<>(Collections2.transform(curr, EquivalenceFabric.FLOW_WRAPPER_FUNCTION)); Set> updatedFlows = new HashSet<>(Collections2.transform(update, EquivalenceFabric.FLOW_WRAPPER_FUNCTION)); // what is still there but was not updated, needs to be deleted Sets.SetView> deletions = Sets.difference(oldFlows, updatedFlows); // new flows (they were not there before) Sets.SetView> additions = Sets.difference(updatedFlows, oldFlows); if (!deletions.isEmpty()) { for (Equivalence.Wrapper wf: deletions) { Flow f = wf.get(); if (f != null) { t.delete(LogicalDatastoreType.CONFIGURATION, FlowUtils.createFlowPath(entry.getKey(), f.getId())); } } } if (!additions.isEmpty()) { for (Equivalence.Wrapper wf: additions) { Flow f = wf.get(); if (f != null) { t.put(LogicalDatastoreType.CONFIGURATION, FlowUtils.createFlowPath(entry.getKey(), f.getId()), f, true); } } } CheckedFuture f = t.submit(); Futures.addCallback(f, new FutureCallback() { @Override public void onFailure(Throwable t) { LOG.error("Could not write flow table {}", t); } @Override public void onSuccess(Void result) { LOG.debug("Flow table updated."); } }); } private void purgeFromDataStore() { // TODO: tbachman: Remove for Lithium -- this is a workaround // where some flow-mods aren't getting installed // on vSwitches when changing L3 contexts WriteTransaction d = dataBroker.newWriteOnlyTransaction(); for( Entry, TableBuilder> entry : flowMap.entrySet()) { d.delete(LogicalDatastoreType.CONFIGURATION, entry.getKey()); } CheckedFuture fu = d.submit(); Futures.addCallback(fu, new FutureCallback() { @Override public void onFailure(Throwable th) { LOG.error("Could not write flow table.", th); } @Override public void onSuccess(Void result) { LOG.debug("Flow table updated."); } }); } } private void scheduleUpdate() { if (switchManager != null) { LOG.trace("Scheduling flow update task"); flowUpdateTask.reschedule(FLOW_UPDATE_DELAY, TimeUnit.MILLISECONDS); } } /** * Update the flows on a particular switch */ private class SwitchFlowUpdateTask implements Callable { private FlowMap flowMap; public SwitchFlowUpdateTask(FlowMap flowMap) { super(); this.flowMap = flowMap; } @Override public Void call() throws Exception { for (NodeId node : switchManager.getReadySwitches()) { PolicyInfo info = policyResolver.getCurrentPolicy(); if (info == null) return null; for (OfTable table : flowPipeline) { try { table.update(node, info, flowMap); } catch (Exception e) { LOG.error("Failed to write flow table {}", table.getClass().getSimpleName(), e); } } } return null; } } /** * Update all flows on all switches as needed. Note that this will block * one of the threads on the executor. */ private class FlowUpdateTask implements Runnable { @Override public void run() { LOG.debug("Beginning flow update task"); CompletionService ecs = new ExecutorCompletionService<>(executor); int n = 0; FlowMap flowMap = new FlowMap(); SwitchFlowUpdateTask swut = new SwitchFlowUpdateTask(flowMap); ecs.submit(swut); n+=1; for (int i = 0; i < n; i++) { try { ecs.take().get(); flowMap.commitToDataStore(); } catch (InterruptedException | ExecutionException e) { LOG.error("Failed to update flow tables", e); } } LOG.debug("Flow update completed"); } } }