2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.groupbasedpolicy.renderer.ofoverlay;
11 import com.google.common.base.Equivalence;
12 import com.google.common.base.Optional;
13 import com.google.common.base.Preconditions;
14 import com.google.common.collect.Collections2;
15 import com.google.common.collect.ImmutableList;
16 import com.google.common.collect.Sets;
17 import com.google.common.util.concurrent.CheckedFuture;
18 import com.google.common.util.concurrent.FutureCallback;
19 import com.google.common.util.concurrent.Futures;
20 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
21 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
22 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
23 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
24 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
25 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
26 import org.opendaylight.groupbasedpolicy.endpoint.EpKey;
27 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.equivalence.EquivalenceFabric;
28 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.DestinationMapper;
29 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.ExternalMapper;
30 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.FlowUtils;
31 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.GroupTable;
32 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.EgressNatMapper;
33 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.IngressNatMapper;
34 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.OfTable;
35 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.PolicyEnforcer;
36 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.PortSecurity;
37 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.SourceMapper;
38 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.node.SwitchListener;
39 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.node.SwitchManager;
40 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.sf.Action;
41 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.sf.SubjectFeatures;
42 import org.opendaylight.groupbasedpolicy.resolver.EgKey;
43 import org.opendaylight.groupbasedpolicy.resolver.PolicyInfo;
44 import org.opendaylight.groupbasedpolicy.resolver.PolicyListener;
45 import org.opendaylight.groupbasedpolicy.resolver.PolicyResolver;
46 import org.opendaylight.groupbasedpolicy.resolver.PolicyScope;
47 import org.opendaylight.groupbasedpolicy.util.SingletonTask;
48 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev100924.MacAddress;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.ActionDefinitionId;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.ofoverlay.rev140528.OfOverlayConfig.LearningMode;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.SubjectFeatureDefinitions;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
56 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
57 import org.slf4j.Logger;
58 import org.slf4j.LoggerFactory;
60 import java.util.ArrayList;
61 import java.util.HashSet;
62 import java.util.List;
63 import java.util.Map.Entry;
65 import java.util.concurrent.Callable;
66 import java.util.concurrent.CompletionService;
67 import java.util.concurrent.ConcurrentHashMap;
68 import java.util.concurrent.ConcurrentMap;
69 import java.util.concurrent.ExecutionException;
70 import java.util.concurrent.ExecutorCompletionService;
71 import java.util.concurrent.ScheduledExecutorService;
72 import java.util.concurrent.TimeUnit;
75 * Manage policies on switches by subscribing to updates from the
76 * policy resolver and information about endpoints from the endpoint
79 public class PolicyManager
80 implements SwitchListener, PolicyListener, EndpointListener {
81 private static final Logger LOG =
82 LoggerFactory.getLogger(PolicyManager.class);
84 private short tableOffset;
85 private final short TABLEID_PORTSECURITY = 0;
86 private final short TABLEID_INGRESS_NAT = (short) (tableOffset+1);
87 private final short TABLEID_SOURCE_MAPPER = (short) (tableOffset+2);
88 private final short TABLEID_DESTINATION_MAPPER = (short) (tableOffset+3);
89 private final short TABLEID_POLICY_ENFORCER = (short) (tableOffset+4);
90 private final short TABLEID_EGRESS_NAT = (short) (tableOffset+5);
91 private final short TABLEID_EXTERNAL_MAPPER = (short) (tableOffset+6);
93 private static MacAddress externaMacAddress;
95 private final SwitchManager switchManager;
96 private final PolicyResolver policyResolver;
98 private final PolicyScope policyScope;
100 private final ScheduledExecutorService executor;
101 private final SingletonTask flowUpdateTask;
102 private final DataBroker dataBroker;
105 * The flow tables that make up the processing pipeline
107 private final List<? extends OfTable> flowPipeline;
110 * The delay before triggering the flow update task in response to an
111 * event in milliseconds.
113 private final static int FLOW_UPDATE_DELAY = 250;
115 public PolicyManager(DataBroker dataBroker,
116 PolicyResolver policyResolver,
117 SwitchManager switchManager,
118 EndpointManager endpointManager,
119 RpcProviderRegistry rpcRegistry,
120 ScheduledExecutorService executor,
122 MacAddress externalRouterMac) {
124 this.switchManager = switchManager;
125 this.executor = executor;
126 this.policyResolver = policyResolver;
127 this.dataBroker = dataBroker;
128 this.tableOffset=tableOffset;
129 this.externaMacAddress=externalRouterMac;
132 if (dataBroker != null) {
133 WriteTransaction t = dataBroker.newWriteOnlyTransaction();
134 t.put(LogicalDatastoreType.OPERATIONAL,
136 .builder(SubjectFeatureDefinitions.class)
138 SubjectFeatures.OF_OVERLAY_FEATURES);
142 for(Entry<ActionDefinitionId, Action> entry : SubjectFeatures.getActions().entrySet()) {
143 policyResolver.registerActionDefinitions(entry.getKey(), entry.getValue());
146 OfContext ctx = new OfContext(dataBroker, rpcRegistry,
147 this, policyResolver, switchManager,
148 endpointManager, executor);
150 flowPipeline = ImmutableList.of(new PortSecurity(ctx,TABLEID_PORTSECURITY),
152 new IngressNatMapper(ctx,TABLEID_INGRESS_NAT),
153 new SourceMapper(ctx,TABLEID_SOURCE_MAPPER),
154 new DestinationMapper(ctx,TABLEID_DESTINATION_MAPPER),
155 new PolicyEnforcer(ctx,TABLEID_POLICY_ENFORCER),
156 new EgressNatMapper(ctx,TABLEID_EGRESS_NAT),
157 new ExternalMapper(ctx,TABLEID_EXTERNAL_MAPPER)
160 policyScope = policyResolver.registerListener(this);
161 if (switchManager != null)
162 switchManager.registerListener(this);
163 endpointManager.registerListener(this);
165 flowUpdateTask = new SingletonTask(executor, new FlowUpdateTask());
168 LOG.debug("Initialized OFOverlay policy manager");
176 public short getTABLEID_PORTSECURITY() {
177 return TABLEID_PORTSECURITY;
181 public short getTABLEID_INGRESS_NAT() {
182 return TABLEID_INGRESS_NAT;
186 public short getTABLEID_SOURCE_MAPPER() {
187 return TABLEID_SOURCE_MAPPER;
191 public short getTABLEID_DESTINATION_MAPPER() {
192 return TABLEID_DESTINATION_MAPPER;
196 public short getTABLEID_POLICY_ENFORCER() {
197 return TABLEID_POLICY_ENFORCER;
201 public short getTABLEID_EGRESS_NAT() {
202 return TABLEID_EGRESS_NAT;
206 public short getTABLEID_EXTERNAL_MAPPER() {
207 return TABLEID_EXTERNAL_MAPPER;
211 public void switchReady(final NodeId nodeId) {
216 public void switchRemoved(NodeId sw) {
217 // XXX TODO purge switch flows
222 public void switchUpdated(NodeId sw) {
231 public void endpointUpdated(EpKey epKey) {
236 public void nodeEndpointUpdated(NodeId nodeId, EpKey epKey){
241 public void groupEndpointUpdated(EgKey egKey, EpKey epKey) {
242 policyScope.addToScope(egKey.getTenantId(), egKey.getEgId());
251 public void policyUpdated(Set<EgKey> updatedConsumers) {
260 * Set the learning mode to the specified value
261 * @param learningMode the learning mode to set
263 public void setLearningMode(LearningMode learningMode) {
267 public static MacAddress getExternaMacAddress() {
268 return externaMacAddress;
275 public class FlowMap{
276 private ConcurrentMap<InstanceIdentifier<Table>, TableBuilder> flowMap = new ConcurrentHashMap<>();
281 public TableBuilder getTableForNode(NodeId nodeId, short tableId) {
282 InstanceIdentifier<Table> tableIid = FlowUtils.createTablePath(nodeId, tableId);
283 if(this.flowMap.get(tableIid) == null) {
284 this.flowMap.put(tableIid, new TableBuilder().setId(tableId));
285 this.flowMap.get(tableIid).setFlow(new ArrayList<Flow>());
287 return this.flowMap.get(tableIid);
290 public void writeFlow(NodeId nodeId, short tableId, Flow flow) {
291 TableBuilder tableBuilder = this.getTableForNode(nodeId, tableId);
292 // transforming List<Flow> to Set (with customized equals/hashCode) to eliminate duplicate entries
293 List<Flow> flows = tableBuilder.getFlow();
294 Set<Equivalence.Wrapper<Flow>> wrappedFlows =
295 new HashSet<>(Collections2.transform(flows, EquivalenceFabric.FLOW_WRAPPER_FUNCTION));
297 Equivalence.Wrapper<Flow> wFlow = EquivalenceFabric.FLOW_EQUIVALENCE.wrap(flow);
299 if (!wrappedFlows.contains(wFlow)) {
300 tableBuilder.getFlow().add(Preconditions.checkNotNull(flow));
302 LOG.debug("Flow already exists in FlowMap - {}", flow);
306 public void commitToDataStore() {
307 if (dataBroker != null) {
308 for( Entry<InstanceIdentifier<Table>, TableBuilder> entry : flowMap.entrySet()) {
311 * Get the currently configured flows for
314 updateFlowTable(entry);
315 } catch (Exception e) {
316 LOG.warn("Couldn't read flow table {}", entry.getKey());
322 private void updateFlowTable(Entry<InstanceIdentifier<Table>,
323 TableBuilder> entry) throws Exception {
325 Set<Flow> update = new HashSet<>(entry.getValue().getFlow());
326 // flows currently in the table
327 Set<Flow> curr = new HashSet<>();
329 ReadWriteTransaction t = dataBroker.newReadWriteTransaction();
331 t.read(LogicalDatastoreType.CONFIGURATION, entry.getKey()).get();
334 Table currentTable = r.get();
335 curr = new HashSet<>(currentTable.getFlow());
338 // Sets with custom equivalence rules
339 Set<Equivalence.Wrapper<Flow>> oldFlows =
340 new HashSet<>(Collections2.transform(curr, EquivalenceFabric.FLOW_WRAPPER_FUNCTION));
341 Set<Equivalence.Wrapper<Flow>> updatedFlows =
342 new HashSet<>(Collections2.transform(update, EquivalenceFabric.FLOW_WRAPPER_FUNCTION));
344 // what is still there but was not updated, needs to be deleted
345 Sets.SetView<Equivalence.Wrapper<Flow>> deletions =
346 Sets.difference(oldFlows, updatedFlows);
347 // new flows (they were not there before)
348 Sets.SetView<Equivalence.Wrapper<Flow>> additions =
349 Sets.difference(updatedFlows, oldFlows);
351 if (!deletions.isEmpty()) {
352 for (Equivalence.Wrapper<Flow> wf: deletions) {
355 t.delete(LogicalDatastoreType.CONFIGURATION,
356 FlowUtils.createFlowPath(entry.getKey(), f.getId()));
360 if (!additions.isEmpty()) {
361 for (Equivalence.Wrapper<Flow> wf: additions) {
364 t.put(LogicalDatastoreType.CONFIGURATION,
365 FlowUtils.createFlowPath(entry.getKey(), f.getId()), f, true);
369 CheckedFuture<Void, TransactionCommitFailedException> f = t.submit();
370 Futures.addCallback(f, new FutureCallback<Void>() {
372 public void onFailure(Throwable t) {
373 LOG.error("Could not write flow table {}", t);
377 public void onSuccess(Void result) {
378 LOG.debug("Flow table updated.");
383 private void purgeFromDataStore() {
384 // TODO: tbachman: Remove for Lithium -- this is a workaround
385 // where some flow-mods aren't getting installed
386 // on vSwitches when changing L3 contexts
387 WriteTransaction d = dataBroker.newWriteOnlyTransaction();
389 for( Entry<InstanceIdentifier<Table>, TableBuilder> entry : flowMap.entrySet()) {
390 d.delete(LogicalDatastoreType.CONFIGURATION, entry.getKey());
393 CheckedFuture<Void, TransactionCommitFailedException> fu = d.submit();
394 Futures.addCallback(fu, new FutureCallback<Void>() {
396 public void onFailure(Throwable th) {
397 LOG.error("Could not write flow table.", th);
401 public void onSuccess(Void result) {
402 LOG.debug("Flow table updated.");
409 private void scheduleUpdate() {
410 if (switchManager != null) {
411 LOG.trace("Scheduling flow update task");
412 flowUpdateTask.reschedule(FLOW_UPDATE_DELAY, TimeUnit.MILLISECONDS);
417 * Update the flows on a particular switch
419 private class SwitchFlowUpdateTask implements Callable<Void> {
420 private FlowMap flowMap;
422 public SwitchFlowUpdateTask(FlowMap flowMap) {
424 this.flowMap = flowMap;
428 public Void call() throws Exception {
429 for (NodeId node : switchManager.getReadySwitches()) {
430 PolicyInfo info = policyResolver.getCurrentPolicy();
433 for (OfTable table : flowPipeline) {
435 table.update(node, info, flowMap);
436 } catch (Exception e) {
437 LOG.error("Failed to write flow table {}",
438 table.getClass().getSimpleName(), e);
447 * Update all flows on all switches as needed. Note that this will block
448 * one of the threads on the executor.
450 private class FlowUpdateTask implements Runnable {
453 LOG.debug("Beginning flow update task");
455 CompletionService<Void> ecs
456 = new ExecutorCompletionService<>(executor);
459 FlowMap flowMap = new FlowMap();
461 SwitchFlowUpdateTask swut = new SwitchFlowUpdateTask(flowMap);
465 for (int i = 0; i < n; i++) {
468 flowMap.commitToDataStore();
469 } catch (InterruptedException | ExecutionException e) {
470 LOG.error("Failed to update flow tables", e);
473 LOG.debug("Flow update completed");