2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.groupbasedpolicy.renderer.ofoverlay;
11 import com.google.common.base.Equivalence;
12 import com.google.common.base.Optional;
13 import com.google.common.base.Preconditions;
14 import com.google.common.collect.Collections2;
15 import com.google.common.collect.ImmutableList;
16 import com.google.common.collect.Sets;
17 import com.google.common.util.concurrent.CheckedFuture;
18 import com.google.common.util.concurrent.FutureCallback;
19 import com.google.common.util.concurrent.Futures;
20 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
21 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
22 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
23 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
24 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
25 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
26 import org.opendaylight.groupbasedpolicy.endpoint.EpKey;
27 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.equivalence.EquivalenceFabric;
28 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.DestinationMapper;
29 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.ExternalMapper;
30 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.FlowUtils;
31 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.GroupTable;
32 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.EgressNatMapper;
33 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.IngressNatMapper;
34 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.OfTable;
35 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.PolicyEnforcer;
36 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.PortSecurity;
37 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.SourceMapper;
38 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.node.SwitchListener;
39 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.node.SwitchManager;
40 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.sf.Action;
41 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.sf.SubjectFeatures;
42 import org.opendaylight.groupbasedpolicy.resolver.EgKey;
43 import org.opendaylight.groupbasedpolicy.resolver.PolicyInfo;
44 import org.opendaylight.groupbasedpolicy.resolver.PolicyListener;
45 import org.opendaylight.groupbasedpolicy.resolver.PolicyResolver;
46 import org.opendaylight.groupbasedpolicy.resolver.PolicyScope;
47 import org.opendaylight.groupbasedpolicy.util.SingletonTask;
48 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev100924.MacAddress;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.ActionDefinitionId;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.ofoverlay.rev140528.OfOverlayConfig.LearningMode;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.SubjectFeatureDefinitions;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
56 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
57 import org.slf4j.Logger;
58 import org.slf4j.LoggerFactory;
60 import java.util.ArrayList;
61 import java.util.HashSet;
62 import java.util.List;
63 import java.util.Map.Entry;
65 import java.util.concurrent.Callable;
66 import java.util.concurrent.CompletionService;
67 import java.util.concurrent.ConcurrentHashMap;
68 import java.util.concurrent.ConcurrentMap;
69 import java.util.concurrent.ExecutionException;
70 import java.util.concurrent.ExecutorCompletionService;
71 import java.util.concurrent.ScheduledExecutorService;
72 import java.util.concurrent.TimeUnit;
75 * Manage policies on switches by subscribing to updates from the
76 * policy resolver and information about endpoints from the endpoint
79 public class PolicyManager
80 implements SwitchListener, PolicyListener, EndpointListener {
81 private static final Logger LOG =
82 LoggerFactory.getLogger(PolicyManager.class);
84 private short tableOffset;
85 private final short TABLEID_PORTSECURITY = 0;
86 private final short TABLEID_INGRESS_NAT = (short) (tableOffset+1);
87 private final short TABLEID_SOURCE_MAPPER = (short) (tableOffset+2);
88 private final short TABLEID_DESTINATION_MAPPER = (short) (tableOffset+3);
89 private final short TABLEID_POLICY_ENFORCER = (short) (tableOffset+4);
90 private final short TABLEID_EGRESS_NAT = (short) (tableOffset+5);
91 private final short TABLEID_EXTERNAL_MAPPER = (short) (tableOffset+6);
93 private static MacAddress externaMacAddress;
95 private final SwitchManager switchManager;
96 private final PolicyResolver policyResolver;
98 private final PolicyScope policyScope;
100 private final ScheduledExecutorService executor;
101 private final SingletonTask flowUpdateTask;
102 private final DataBroker dataBroker;
105 * The flow tables that make up the processing pipeline
107 private final List<? extends OfTable> flowPipeline;
110 * The delay before triggering the flow update task in response to an
111 * event in milliseconds.
113 private final static int FLOW_UPDATE_DELAY = 250;
115 public PolicyManager(DataBroker dataBroker,
116 PolicyResolver policyResolver,
117 SwitchManager switchManager,
118 EndpointManager endpointManager,
119 RpcProviderRegistry rpcRegistry,
120 ScheduledExecutorService executor,
122 MacAddress externalRouterMac) {
124 this.switchManager = switchManager;
125 this.executor = executor;
126 this.policyResolver = policyResolver;
127 this.dataBroker = dataBroker;
128 this.tableOffset=tableOffset;
129 this.externaMacAddress=externalRouterMac;
132 if (dataBroker != null) {
133 WriteTransaction t = dataBroker.newWriteOnlyTransaction();
134 t.put(LogicalDatastoreType.OPERATIONAL,
136 .builder(SubjectFeatureDefinitions.class)
138 SubjectFeatures.OF_OVERLAY_FEATURES);
142 for(Entry<ActionDefinitionId, Action> entry : SubjectFeatures.getActions().entrySet()) {
143 policyResolver.registerActionDefinitions(entry.getKey(), entry.getValue());
146 OfContext ctx = new OfContext(dataBroker, rpcRegistry,
147 this, policyResolver, switchManager,
148 endpointManager, executor);
150 flowPipeline = ImmutableList.of(new PortSecurity(ctx,TABLEID_PORTSECURITY),
152 new IngressNatMapper(ctx,TABLEID_INGRESS_NAT),
153 new SourceMapper(ctx,TABLEID_SOURCE_MAPPER),
154 new DestinationMapper(ctx,TABLEID_DESTINATION_MAPPER),
155 new PolicyEnforcer(ctx,TABLEID_POLICY_ENFORCER),
156 new EgressNatMapper(ctx,TABLEID_EGRESS_NAT),
157 new ExternalMapper(ctx,TABLEID_EXTERNAL_MAPPER)
160 policyScope = policyResolver.registerListener(this);
161 if (switchManager != null)
162 switchManager.registerListener(this);
163 endpointManager.registerListener(this);
165 flowUpdateTask = new SingletonTask(executor, new FlowUpdateTask());
168 LOG.debug("Initialized OFOverlay policy manager");
176 public short getTABLEID_PORTSECURITY() {
177 return TABLEID_PORTSECURITY;
181 public short getTABLEID_INGRESS_NAT() {
182 return TABLEID_INGRESS_NAT;
186 public short getTABLEID_SOURCE_MAPPER() {
187 return TABLEID_SOURCE_MAPPER;
191 public short getTABLEID_DESTINATION_MAPPER() {
192 return TABLEID_DESTINATION_MAPPER;
196 public short getTABLEID_POLICY_ENFORCER() {
197 return TABLEID_POLICY_ENFORCER;
201 public short getTABLEID_EGRESS_NAT() {
202 return TABLEID_EGRESS_NAT;
206 public short getTABLEID_EXTERNAL_MAPPER() {
207 return TABLEID_EXTERNAL_MAPPER;
211 public void switchReady(final NodeId nodeId) {
216 public void switchRemoved(NodeId sw) {
217 // XXX TODO purge switch flows
222 public void switchUpdated(NodeId sw) {
231 public void endpointUpdated(EpKey epKey) {
236 public void nodeEndpointUpdated(NodeId nodeId, EpKey epKey){
241 public void groupEndpointUpdated(EgKey egKey, EpKey epKey) {
242 policyScope.addToScope(egKey.getTenantId(), egKey.getEgId());
251 public void policyUpdated(Set<EgKey> updatedConsumers) {
260 * Set the learning mode to the specified value
261 * @param learningMode the learning mode to set
263 public void setLearningMode(LearningMode learningMode) {
267 public static MacAddress getExternaMacAddress() {
268 return externaMacAddress;
275 public class FlowMap{
276 private ConcurrentMap<InstanceIdentifier<Table>, TableBuilder> flowMap = new ConcurrentHashMap<>();
281 public TableBuilder getTableForNode(NodeId nodeId, short tableId) {
282 InstanceIdentifier<Table> tableIid = FlowUtils.createTablePath(nodeId, tableId);
283 if(this.flowMap.get(tableIid) == null) {
284 this.flowMap.put(tableIid, new TableBuilder().setId(tableId));
285 this.flowMap.get(tableIid).setFlow(new ArrayList<Flow>());
287 return this.flowMap.get(tableIid);
290 public void writeFlow(NodeId nodeId, short tableId, Flow flow) {
291 TableBuilder tableBuilder = this.getTableForNode(nodeId, tableId);
292 // transforming List<Flow> to Set (with customized equals/hashCode) to eliminate duplicate entries
293 List<Flow> flows = tableBuilder.getFlow();
294 Set<Equivalence.Wrapper<Flow>> wrappedFlows =
295 new HashSet<>(Collections2.transform(flows, EquivalenceFabric.FLOW_WRAPPER_FUNCTION));
297 Equivalence.Wrapper<Flow> wFlow = EquivalenceFabric.FLOW_EQUIVALENCE.wrap(flow);
299 if (!wrappedFlows.contains(wFlow)) {
300 tableBuilder.getFlow().add(Preconditions.checkNotNull(flow));
304 public void commitToDataStore() {
305 if (dataBroker != null) {
306 for( Entry<InstanceIdentifier<Table>, TableBuilder> entry : flowMap.entrySet()) {
309 * Get the currently configured flows for
312 updateFlowTable(entry);
313 } catch (Exception e) {
314 LOG.warn("Couldn't read flow table {}", entry.getKey());
320 private void updateFlowTable(Entry<InstanceIdentifier<Table>,
321 TableBuilder> entry) throws Exception {
323 Set<Flow> update = new HashSet<>(entry.getValue().getFlow());
324 // flows currently in the table
325 Set<Flow> curr = new HashSet<>();
327 ReadWriteTransaction t = dataBroker.newReadWriteTransaction();
329 t.read(LogicalDatastoreType.CONFIGURATION, entry.getKey()).get();
332 Table currentTable = r.get();
333 curr = new HashSet<>(currentTable.getFlow());
336 // Sets with custom equivalence rules
337 Set<Equivalence.Wrapper<Flow>> oldFlows =
338 new HashSet<>(Collections2.transform(curr, EquivalenceFabric.FLOW_WRAPPER_FUNCTION));
339 Set<Equivalence.Wrapper<Flow>> updatedFlows =
340 new HashSet<>(Collections2.transform(update, EquivalenceFabric.FLOW_WRAPPER_FUNCTION));
342 // what is still there but was not updated, needs to be deleted
343 Sets.SetView<Equivalence.Wrapper<Flow>> deletions =
344 Sets.difference(oldFlows, updatedFlows);
345 // new flows (they were not there before)
346 Sets.SetView<Equivalence.Wrapper<Flow>> additions =
347 Sets.difference(updatedFlows, oldFlows);
349 if (!deletions.isEmpty()) {
350 for (Equivalence.Wrapper<Flow> wf: deletions) {
353 t.delete(LogicalDatastoreType.CONFIGURATION,
354 FlowUtils.createFlowPath(entry.getKey(), f.getId()));
358 if (!additions.isEmpty()) {
359 for (Equivalence.Wrapper<Flow> wf: additions) {
362 t.put(LogicalDatastoreType.CONFIGURATION,
363 FlowUtils.createFlowPath(entry.getKey(), f.getId()), f, true);
367 CheckedFuture<Void, TransactionCommitFailedException> f = t.submit();
368 Futures.addCallback(f, new FutureCallback<Void>() {
370 public void onFailure(Throwable t) {
371 LOG.error("Could not write flow table {}", t);
375 public void onSuccess(Void result) {
376 LOG.debug("Flow table updated.");
381 private void purgeFromDataStore() {
382 // TODO: tbachman: Remove for Lithium -- this is a workaround
383 // where some flow-mods aren't getting installed
384 // on vSwitches when changing L3 contexts
385 WriteTransaction d = dataBroker.newWriteOnlyTransaction();
387 for( Entry<InstanceIdentifier<Table>, TableBuilder> entry : flowMap.entrySet()) {
388 d.delete(LogicalDatastoreType.CONFIGURATION, entry.getKey());
391 CheckedFuture<Void, TransactionCommitFailedException> fu = d.submit();
392 Futures.addCallback(fu, new FutureCallback<Void>() {
394 public void onFailure(Throwable th) {
395 LOG.error("Could not write flow table.", th);
399 public void onSuccess(Void result) {
400 LOG.debug("Flow table updated.");
407 private void scheduleUpdate() {
408 if (switchManager != null) {
409 LOG.trace("Scheduling flow update task");
410 flowUpdateTask.reschedule(FLOW_UPDATE_DELAY, TimeUnit.MILLISECONDS);
415 * Update the flows on a particular switch
417 private class SwitchFlowUpdateTask implements Callable<Void> {
418 private FlowMap flowMap;
420 public SwitchFlowUpdateTask(FlowMap flowMap) {
422 this.flowMap = flowMap;
426 public Void call() throws Exception {
427 for (NodeId node : switchManager.getReadySwitches()) {
428 PolicyInfo info = policyResolver.getCurrentPolicy();
431 for (OfTable table : flowPipeline) {
433 table.update(node, info, flowMap);
434 } catch (Exception e) {
435 LOG.error("Failed to write flow table {}",
436 table.getClass().getSimpleName(), e);
445 * Update all flows on all switches as needed. Note that this will block
446 * one of the threads on the executor.
448 private class FlowUpdateTask implements Runnable {
451 LOG.debug("Beginning flow update task");
453 CompletionService<Void> ecs
454 = new ExecutorCompletionService<>(executor);
457 FlowMap flowMap = new FlowMap();
459 SwitchFlowUpdateTask swut = new SwitchFlowUpdateTask(flowMap);
463 for (int i = 0; i < n; i++) {
466 flowMap.commitToDataStore();
467 } catch (InterruptedException | ExecutionException e) {
468 LOG.error("Failed to update flow tables", e);
471 LOG.debug("Flow update completed");