2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.groupbasedpolicy.renderer.ofoverlay;
11 import com.google.common.base.Equivalence;
12 import com.google.common.base.Optional;
13 import com.google.common.base.Preconditions;
14 import com.google.common.collect.Collections2;
15 import com.google.common.collect.ImmutableList;
16 import com.google.common.collect.Sets;
17 import com.google.common.util.concurrent.CheckedFuture;
18 import com.google.common.util.concurrent.FutureCallback;
19 import com.google.common.util.concurrent.Futures;
20 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
21 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
22 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
23 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
24 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
25 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
26 import org.opendaylight.groupbasedpolicy.endpoint.EpKey;
27 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.equivalence.EquivalenceFabric;
28 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.DestinationMapper;
29 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.FlowUtils;
30 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.GroupTable;
31 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.OfTable;
32 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.PolicyEnforcer;
33 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.PortSecurity;
34 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.SourceMapper;
35 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.node.SwitchListener;
36 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.node.SwitchManager;
37 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.sf.Action;
38 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.sf.SubjectFeatures;
39 import org.opendaylight.groupbasedpolicy.resolver.EgKey;
40 import org.opendaylight.groupbasedpolicy.resolver.PolicyInfo;
41 import org.opendaylight.groupbasedpolicy.resolver.PolicyListener;
42 import org.opendaylight.groupbasedpolicy.resolver.PolicyResolver;
43 import org.opendaylight.groupbasedpolicy.resolver.PolicyScope;
44 import org.opendaylight.groupbasedpolicy.util.SingletonTask;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.ActionDefinitionId;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.ofoverlay.rev140528.OfOverlayConfig.LearningMode;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.SubjectFeatureDefinitions;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
52 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
56 import java.util.ArrayList;
57 import java.util.HashSet;
58 import java.util.List;
59 import java.util.Map.Entry;
61 import java.util.concurrent.Callable;
62 import java.util.concurrent.CompletionService;
63 import java.util.concurrent.ConcurrentHashMap;
64 import java.util.concurrent.ConcurrentMap;
65 import java.util.concurrent.ExecutionException;
66 import java.util.concurrent.ExecutorCompletionService;
67 import java.util.concurrent.ScheduledExecutorService;
68 import java.util.concurrent.TimeUnit;
71 * Manage policies on switches by subscribing to updates from the
72 * policy resolver and information about endpoints from the endpoint
75 public class PolicyManager
76 implements SwitchListener, PolicyListener, EndpointListener {
77 private static final Logger LOG =
78 LoggerFactory.getLogger(PolicyManager.class);
80 private final SwitchManager switchManager;
81 private final PolicyResolver policyResolver;
83 private final PolicyScope policyScope;
85 private final ScheduledExecutorService executor;
86 private final SingletonTask flowUpdateTask;
87 private final DataBroker dataBroker;
90 * The flow tables that make up the processing pipeline
92 private final List<? extends OfTable> flowPipeline;
95 * The delay before triggering the flow update task in response to an
96 * event in milliseconds.
98 private final static int FLOW_UPDATE_DELAY = 250;
100 public PolicyManager(DataBroker dataBroker,
101 PolicyResolver policyResolver,
102 SwitchManager switchManager,
103 EndpointManager endpointManager,
104 RpcProviderRegistry rpcRegistry,
105 ScheduledExecutorService executor) {
107 this.switchManager = switchManager;
108 this.executor = executor;
109 this.policyResolver = policyResolver;
110 this.dataBroker = dataBroker;
113 if (dataBroker != null) {
114 WriteTransaction t = dataBroker.newWriteOnlyTransaction();
115 t.put(LogicalDatastoreType.OPERATIONAL,
117 .builder(SubjectFeatureDefinitions.class)
119 SubjectFeatures.OF_OVERLAY_FEATURES);
123 for(Entry<ActionDefinitionId, Action> entry : SubjectFeatures.getActions().entrySet()) {
124 policyResolver.registerActionDefinitions(entry.getKey(), entry.getValue());
127 OfContext ctx = new OfContext(dataBroker, rpcRegistry,
128 this, policyResolver, switchManager,
129 endpointManager, executor);
130 flowPipeline = ImmutableList.of(new PortSecurity(ctx),
132 new SourceMapper(ctx),
133 new DestinationMapper(ctx),
134 new PolicyEnforcer(ctx));
136 policyScope = policyResolver.registerListener(this);
137 if (switchManager != null)
138 switchManager.registerListener(this);
139 endpointManager.registerListener(this);
141 flowUpdateTask = new SingletonTask(executor, new FlowUpdateTask());
144 LOG.debug("Initialized OFOverlay policy manager");
152 public void switchReady(final NodeId nodeId) {
157 public void switchRemoved(NodeId sw) {
158 // XXX TODO purge switch flows
163 public void switchUpdated(NodeId sw) {
172 public void endpointUpdated(EpKey epKey) {
177 public void nodeEndpointUpdated(NodeId nodeId, EpKey epKey){
182 public void groupEndpointUpdated(EgKey egKey, EpKey epKey) {
183 policyScope.addToScope(egKey.getTenantId(), egKey.getEgId());
192 public void policyUpdated(Set<EgKey> updatedConsumers) {
201 * Set the learning mode to the specified value
202 * @param learningMode the learning mode to set
204 public void setLearningMode(LearningMode learningMode) {
214 public class FlowMap{
215 private ConcurrentMap<InstanceIdentifier<Table>, TableBuilder> flowMap = new ConcurrentHashMap<>();
220 public TableBuilder getTableForNode(NodeId nodeId, short tableId) {
221 InstanceIdentifier<Table> tableIid = FlowUtils.createTablePath(nodeId, tableId);
222 if(this.flowMap.get(tableIid) == null) {
223 this.flowMap.put(tableIid, new TableBuilder().setId(tableId));
224 this.flowMap.get(tableIid).setFlow(new ArrayList<Flow>());
226 return this.flowMap.get(tableIid);
229 public void writeFlow(NodeId nodeId, short tableId, Flow flow) {
230 TableBuilder tableBuilder = this.getTableForNode(nodeId, tableId);
231 // transforming List<Flow> to Set (with customized equals/hashCode) to eliminate duplicate entries
232 List<Flow> flows = tableBuilder.getFlow();
233 Set<Equivalence.Wrapper<Flow>> wrappedFlows =
234 new HashSet<>(Collections2.transform(flows, EquivalenceFabric.FLOW_WRAPPER_FUNCTION));
236 Equivalence.Wrapper<Flow> wFlow = EquivalenceFabric.FLOW_EQUIVALENCE.wrap(flow);
238 if (!wrappedFlows.contains(wFlow)) {
239 tableBuilder.getFlow().add(Preconditions.checkNotNull(flow));
243 public void commitToDataStore() {
244 if (dataBroker != null) {
245 for( Entry<InstanceIdentifier<Table>, TableBuilder> entry : flowMap.entrySet()) {
248 * Get the currently configured flows for
251 updateFlowTable(entry);
252 } catch (Exception e) {
253 LOG.warn("Couldn't read flow table {}", entry.getKey());
259 private void updateFlowTable(Entry<InstanceIdentifier<Table>,
260 TableBuilder> entry) throws Exception {
262 Set<Flow> update = new HashSet<>(entry.getValue().getFlow());
263 // flows currently in the table
264 Set<Flow> curr = new HashSet<>();
266 ReadWriteTransaction t = dataBroker.newReadWriteTransaction();
268 t.read(LogicalDatastoreType.CONFIGURATION, entry.getKey()).get();
271 Table currentTable = r.get();
272 curr = new HashSet<>(currentTable.getFlow());
275 // Sets with custom equivalence rules
276 Set<Equivalence.Wrapper<Flow>> oldFlows =
277 new HashSet<>(Collections2.transform(curr, EquivalenceFabric.FLOW_WRAPPER_FUNCTION));
278 Set<Equivalence.Wrapper<Flow>> updatedFlows =
279 new HashSet<>(Collections2.transform(update, EquivalenceFabric.FLOW_WRAPPER_FUNCTION));
281 // what is still there but was not updated, needs to be deleted
282 Sets.SetView<Equivalence.Wrapper<Flow>> deletions =
283 Sets.difference(oldFlows, updatedFlows);
284 // new flows (they were not there before)
285 Sets.SetView<Equivalence.Wrapper<Flow>> additions =
286 Sets.difference(updatedFlows, oldFlows);
288 if (!deletions.isEmpty()) {
289 for (Equivalence.Wrapper<Flow> wf: deletions) {
292 t.delete(LogicalDatastoreType.CONFIGURATION,
293 FlowUtils.createFlowPath(entry.getKey(), f.getId()));
297 if (!additions.isEmpty()) {
298 for (Equivalence.Wrapper<Flow> wf: additions) {
301 t.put(LogicalDatastoreType.CONFIGURATION,
302 FlowUtils.createFlowPath(entry.getKey(), f.getId()), f, true);
306 CheckedFuture<Void, TransactionCommitFailedException> f = t.submit();
307 Futures.addCallback(f, new FutureCallback<Void>() {
309 public void onFailure(Throwable t) {
310 LOG.error("Could not write flow table {}", t);
314 public void onSuccess(Void result) {
315 LOG.debug("Flow table updated.");
320 private void purgeFromDataStore() {
321 // TODO: tbachman: Remove for Lithium -- this is a workaround
322 // where some flow-mods aren't getting installed
323 // on vSwitches when changing L3 contexts
324 WriteTransaction d = dataBroker.newWriteOnlyTransaction();
326 for( Entry<InstanceIdentifier<Table>, TableBuilder> entry : flowMap.entrySet()) {
327 d.delete(LogicalDatastoreType.CONFIGURATION, entry.getKey());
330 CheckedFuture<Void, TransactionCommitFailedException> fu = d.submit();
331 Futures.addCallback(fu, new FutureCallback<Void>() {
333 public void onFailure(Throwable th) {
334 LOG.error("Could not write flow table.", th);
338 public void onSuccess(Void result) {
339 LOG.debug("Flow table updated.");
346 private void scheduleUpdate() {
347 if (switchManager != null) {
348 LOG.trace("Scheduling flow update task");
349 flowUpdateTask.reschedule(FLOW_UPDATE_DELAY, TimeUnit.MILLISECONDS);
354 * Update the flows on a particular switch
356 private class SwitchFlowUpdateTask implements Callable<Void> {
357 private FlowMap flowMap;
359 public SwitchFlowUpdateTask(FlowMap flowMap) {
361 this.flowMap = flowMap;
365 public Void call() throws Exception {
366 for (NodeId node : switchManager.getReadySwitches()) {
367 PolicyInfo info = policyResolver.getCurrentPolicy();
370 for (OfTable table : flowPipeline) {
372 table.update(node, info, flowMap);
373 } catch (Exception e) {
374 LOG.error("Failed to write flow table {}",
375 table.getClass().getSimpleName(), e);
384 * Update all flows on all switches as needed. Note that this will block
385 * one of the threads on the executor.
387 private class FlowUpdateTask implements Runnable {
390 LOG.debug("Beginning flow update task");
392 CompletionService<Void> ecs
393 = new ExecutorCompletionService<>(executor);
396 FlowMap flowMap = new FlowMap();
398 SwitchFlowUpdateTask swut = new SwitchFlowUpdateTask(flowMap);
402 for (int i = 0; i < n; i++) {
405 flowMap.commitToDataStore();
406 } catch (InterruptedException | ExecutionException e) {
407 LOG.error("Failed to update flow tables", e);
410 LOG.debug("Flow update completed");