2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.groupbasedpolicy.renderer.ofoverlay;
11 import java.util.ArrayList;
12 import java.util.HashSet;
13 import java.util.List;
14 import java.util.Map.Entry;
16 import java.util.concurrent.Callable;
17 import java.util.concurrent.CompletionService;
18 import java.util.concurrent.ConcurrentHashMap;
19 import java.util.concurrent.ConcurrentMap;
20 import java.util.concurrent.ExecutionException;
21 import java.util.concurrent.ExecutorCompletionService;
22 import java.util.concurrent.ScheduledExecutorService;
23 import java.util.concurrent.TimeUnit;
25 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
26 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
27 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
28 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
29 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
30 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
31 import org.opendaylight.groupbasedpolicy.endpoint.EpKey;
32 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.equivalence.EquivalenceFabric;
33 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.DestinationMapper;
34 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.EgressNatMapper;
35 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.ExternalMapper;
36 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.FlowUtils;
37 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.GroupTable;
38 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.IngressNatMapper;
39 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.OfTable;
40 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.PolicyEnforcer;
41 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.PortSecurity;
42 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.SourceMapper;
43 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.node.SwitchListener;
44 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.node.SwitchManager;
45 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.sf.Action;
46 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.sf.SubjectFeatures;
47 import org.opendaylight.groupbasedpolicy.resolver.EgKey;
48 import org.opendaylight.groupbasedpolicy.resolver.PolicyInfo;
49 import org.opendaylight.groupbasedpolicy.resolver.PolicyListener;
50 import org.opendaylight.groupbasedpolicy.resolver.PolicyResolver;
51 import org.opendaylight.groupbasedpolicy.resolver.PolicyScope;
52 import org.opendaylight.groupbasedpolicy.util.SingletonTask;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.ActionDefinitionId;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.ofoverlay.rev140528.OfOverlayConfig.LearningMode;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.SubjectFeatureDefinitions;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
60 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
61 import org.slf4j.Logger;
62 import org.slf4j.LoggerFactory;
64 import com.google.common.base.Equivalence;
65 import com.google.common.base.Optional;
66 import com.google.common.base.Preconditions;
67 import com.google.common.collect.Collections2;
68 import com.google.common.collect.ImmutableList;
69 import com.google.common.collect.Sets;
70 import com.google.common.util.concurrent.CheckedFuture;
71 import com.google.common.util.concurrent.FutureCallback;
72 import com.google.common.util.concurrent.Futures;
75 * Manage policies on switches by subscribing to updates from the
76 * policy resolver and information about endpoints from the endpoint
79 public class PolicyManager
80 implements SwitchListener, PolicyListener, EndpointListener {
81 private static final Logger LOG =
82 LoggerFactory.getLogger(PolicyManager.class);
84 private short tableOffset;
85 private static final short TABLEID_PORTSECURITY = 0;
86 private static final short TABLEID_INGRESS_NAT = 1;
87 private static final short TABLEID_SOURCE_MAPPER = 2;
88 private static final short TABLEID_DESTINATION_MAPPER = 3;
89 private static final short TABLEID_POLICY_ENFORCER = 4;
90 private static final short TABLEID_EGRESS_NAT = 5;
91 private static final short TABLEID_EXTERNAL_MAPPER = 6;
93 private final SwitchManager switchManager;
94 private final PolicyResolver policyResolver;
96 private final PolicyScope policyScope;
98 private final ScheduledExecutorService executor;
99 private final SingletonTask flowUpdateTask;
100 private final DataBroker dataBroker;
103 * The flow tables that make up the processing pipeline
105 private final List<? extends OfTable> flowPipeline;
108 * The delay before triggering the flow update task in response to an
109 * event in milliseconds.
111 private final static int FLOW_UPDATE_DELAY = 250;
113 public PolicyManager(DataBroker dataBroker,
114 PolicyResolver policyResolver,
115 SwitchManager switchManager,
116 EndpointManager endpointManager,
117 RpcProviderRegistry rpcRegistry,
118 ScheduledExecutorService executor,
121 this.switchManager = switchManager;
122 this.executor = executor;
123 this.policyResolver = policyResolver;
124 this.dataBroker = dataBroker;
125 this.tableOffset = tableOffset;
128 if (dataBroker != null) {
129 WriteTransaction t = dataBroker.newWriteOnlyTransaction();
130 t.put(LogicalDatastoreType.OPERATIONAL,
132 .builder(SubjectFeatureDefinitions.class)
134 SubjectFeatures.OF_OVERLAY_FEATURES);
138 for(Entry<ActionDefinitionId, Action> entry : SubjectFeatures.getActions().entrySet()) {
139 policyResolver.registerActionDefinitions(entry.getKey(), entry.getValue());
142 OfContext ctx = new OfContext(dataBroker, rpcRegistry,
143 this, policyResolver, switchManager,
144 endpointManager, executor);
146 flowPipeline = ImmutableList.of(new PortSecurity(ctx, (short)(tableOffset+TABLEID_PORTSECURITY)),
148 new IngressNatMapper(ctx, (short)(tableOffset+TABLEID_INGRESS_NAT)),
149 new SourceMapper(ctx, (short)(tableOffset+TABLEID_SOURCE_MAPPER)),
150 new DestinationMapper(ctx, (short)(tableOffset+TABLEID_DESTINATION_MAPPER)),
151 new PolicyEnforcer(ctx, (short)(tableOffset+TABLEID_POLICY_ENFORCER)),
152 new EgressNatMapper(ctx, (short)(tableOffset+TABLEID_EGRESS_NAT)),
153 new ExternalMapper(ctx, (short)(tableOffset+TABLEID_EXTERNAL_MAPPER))
156 policyScope = policyResolver.registerListener(this);
157 if (switchManager != null)
158 switchManager.registerListener(this);
159 endpointManager.registerListener(this);
161 flowUpdateTask = new SingletonTask(executor, new FlowUpdateTask());
164 LOG.debug("Initialized OFOverlay policy manager");
172 public short getTABLEID_PORTSECURITY() {
173 return (short)(tableOffset+TABLEID_PORTSECURITY);
177 public short getTABLEID_INGRESS_NAT() {
178 return (short)(tableOffset+TABLEID_INGRESS_NAT);
182 public short getTABLEID_SOURCE_MAPPER() {
183 return (short)(tableOffset+TABLEID_SOURCE_MAPPER);
187 public short getTABLEID_DESTINATION_MAPPER() {
188 return (short)(tableOffset+TABLEID_DESTINATION_MAPPER);
192 public short getTABLEID_POLICY_ENFORCER() {
193 return (short)(tableOffset+TABLEID_POLICY_ENFORCER);
197 public short getTABLEID_EGRESS_NAT() {
198 return (short)(tableOffset+TABLEID_EGRESS_NAT);
202 public short getTABLEID_EXTERNAL_MAPPER() {
203 return (short)(tableOffset+TABLEID_EXTERNAL_MAPPER);
207 public void switchReady(final NodeId nodeId) {
212 public void switchRemoved(NodeId sw) {
213 // XXX TODO purge switch flows
218 public void switchUpdated(NodeId sw) {
227 public void endpointUpdated(EpKey epKey) {
232 public void nodeEndpointUpdated(NodeId nodeId, EpKey epKey){
237 public void groupEndpointUpdated(EgKey egKey, EpKey epKey) {
238 policyScope.addToScope(egKey.getTenantId(), egKey.getEgId());
247 public void policyUpdated(Set<EgKey> updatedConsumers) {
256 * Set the learning mode to the specified value
257 * @param learningMode the learning mode to set
259 public void setLearningMode(LearningMode learningMode) {
267 public class FlowMap{
268 private ConcurrentMap<InstanceIdentifier<Table>, TableBuilder> flowMap = new ConcurrentHashMap<>();
273 public TableBuilder getTableForNode(NodeId nodeId, short tableId) {
274 InstanceIdentifier<Table> tableIid = FlowUtils.createTablePath(nodeId, tableId);
275 if(this.flowMap.get(tableIid) == null) {
276 this.flowMap.put(tableIid, new TableBuilder().setId(tableId));
277 this.flowMap.get(tableIid).setFlow(new ArrayList<Flow>());
279 return this.flowMap.get(tableIid);
282 public void writeFlow(NodeId nodeId, short tableId, Flow flow) {
283 TableBuilder tableBuilder = this.getTableForNode(nodeId, tableId);
284 // transforming List<Flow> to Set (with customized equals/hashCode) to eliminate duplicate entries
285 List<Flow> flows = tableBuilder.getFlow();
286 Set<Equivalence.Wrapper<Flow>> wrappedFlows =
287 new HashSet<>(Collections2.transform(flows, EquivalenceFabric.FLOW_WRAPPER_FUNCTION));
289 Equivalence.Wrapper<Flow> wFlow = EquivalenceFabric.FLOW_EQUIVALENCE.wrap(flow);
291 if (!wrappedFlows.contains(wFlow)) {
292 tableBuilder.getFlow().add(Preconditions.checkNotNull(flow));
294 LOG.debug("Flow already exists in FlowMap - {}", flow);
298 public void commitToDataStore() {
299 if (dataBroker != null) {
300 for( Entry<InstanceIdentifier<Table>, TableBuilder> entry : flowMap.entrySet()) {
303 * Get the currently configured flows for
306 updateFlowTable(entry);
307 } catch (Exception e) {
308 LOG.warn("Couldn't read flow table {}", entry.getKey());
314 private void updateFlowTable(Entry<InstanceIdentifier<Table>,
315 TableBuilder> entry) throws Exception {
317 Set<Flow> update = new HashSet<>(entry.getValue().getFlow());
318 // flows currently in the table
319 Set<Flow> curr = new HashSet<>();
321 ReadWriteTransaction t = dataBroker.newReadWriteTransaction();
323 t.read(LogicalDatastoreType.CONFIGURATION, entry.getKey()).get();
326 Table currentTable = r.get();
327 curr = new HashSet<>(currentTable.getFlow());
330 // Sets with custom equivalence rules
331 Set<Equivalence.Wrapper<Flow>> oldFlows =
332 new HashSet<>(Collections2.transform(curr, EquivalenceFabric.FLOW_WRAPPER_FUNCTION));
333 Set<Equivalence.Wrapper<Flow>> updatedFlows =
334 new HashSet<>(Collections2.transform(update, EquivalenceFabric.FLOW_WRAPPER_FUNCTION));
336 // what is still there but was not updated, needs to be deleted
337 Sets.SetView<Equivalence.Wrapper<Flow>> deletions =
338 Sets.difference(oldFlows, updatedFlows);
339 // new flows (they were not there before)
340 Sets.SetView<Equivalence.Wrapper<Flow>> additions =
341 Sets.difference(updatedFlows, oldFlows);
343 if (!deletions.isEmpty()) {
344 for (Equivalence.Wrapper<Flow> wf: deletions) {
347 t.delete(LogicalDatastoreType.CONFIGURATION,
348 FlowUtils.createFlowPath(entry.getKey(), f.getId()));
352 if (!additions.isEmpty()) {
353 for (Equivalence.Wrapper<Flow> wf: additions) {
356 t.put(LogicalDatastoreType.CONFIGURATION,
357 FlowUtils.createFlowPath(entry.getKey(), f.getId()), f, true);
361 CheckedFuture<Void, TransactionCommitFailedException> f = t.submit();
362 Futures.addCallback(f, new FutureCallback<Void>() {
364 public void onFailure(Throwable t) {
365 LOG.error("Could not write flow table {}", t);
369 public void onSuccess(Void result) {
370 LOG.debug("Flow table updated.");
377 private void scheduleUpdate() {
378 if (switchManager != null) {
379 LOG.trace("Scheduling flow update task");
380 flowUpdateTask.reschedule(FLOW_UPDATE_DELAY, TimeUnit.MILLISECONDS);
385 * Update the flows on a particular switch
387 private class SwitchFlowUpdateTask implements Callable<Void> {
388 private FlowMap flowMap;
390 public SwitchFlowUpdateTask(FlowMap flowMap) {
392 this.flowMap = flowMap;
396 public Void call() throws Exception {
397 for (NodeId node : switchManager.getReadySwitches()) {
398 PolicyInfo info = policyResolver.getCurrentPolicy();
401 for (OfTable table : flowPipeline) {
403 table.update(node, info, flowMap);
404 } catch (Exception e) {
405 LOG.error("Failed to write flow table {}",
406 table.getClass().getSimpleName(), e);
415 * Update all flows on all switches as needed. Note that this will block
416 * one of the threads on the executor.
418 private class FlowUpdateTask implements Runnable {
421 LOG.debug("Beginning flow update task");
423 CompletionService<Void> ecs
424 = new ExecutorCompletionService<>(executor);
427 FlowMap flowMap = new FlowMap();
429 SwitchFlowUpdateTask swut = new SwitchFlowUpdateTask(flowMap);
433 for (int i = 0; i < n; i++) {
436 flowMap.commitToDataStore();
437 } catch (InterruptedException | ExecutionException e) {
438 LOG.error("Failed to update flow tables", e);
441 LOG.debug("Flow update completed");