2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.groupbasedpolicy.renderer.ofoverlay;
11 import java.util.ArrayList;
12 import java.util.HashSet;
13 import java.util.List;
14 import java.util.Map.Entry;
16 import java.util.concurrent.Callable;
17 import java.util.concurrent.CompletionService;
18 import java.util.concurrent.ConcurrentHashMap;
19 import java.util.concurrent.ConcurrentMap;
20 import java.util.concurrent.ExecutionException;
21 import java.util.concurrent.ExecutorCompletionService;
22 import java.util.concurrent.ScheduledExecutorService;
23 import java.util.concurrent.TimeUnit;
25 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
26 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
27 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
28 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
29 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
30 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
31 import org.opendaylight.groupbasedpolicy.endpoint.EpKey;
32 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.DestinationMapper;
33 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.FlowUtils;
34 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.GroupTable;
35 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.OfTable;
36 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.PolicyEnforcer;
37 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.PortSecurity;
38 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.SourceMapper;
39 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.node.SwitchListener;
40 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.sf.Action;
41 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.node.SwitchManager;
42 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.sf.SubjectFeatures;
43 import org.opendaylight.groupbasedpolicy.resolver.EgKey;
44 import org.opendaylight.groupbasedpolicy.resolver.PolicyInfo;
45 import org.opendaylight.groupbasedpolicy.resolver.PolicyListener;
46 import org.opendaylight.groupbasedpolicy.resolver.PolicyResolver;
47 import org.opendaylight.groupbasedpolicy.resolver.PolicyScope;
48 import org.opendaylight.groupbasedpolicy.util.SingletonTask;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.ActionDefinitionId;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.ofoverlay.rev140528.OfOverlayConfig.LearningMode;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.SubjectFeatureDefinitions;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
56 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
57 import org.slf4j.Logger;
58 import org.slf4j.LoggerFactory;
60 import com.google.common.base.Optional;
61 import com.google.common.base.Preconditions;
62 import com.google.common.collect.ImmutableList;
63 import com.google.common.collect.Sets;
64 import com.google.common.util.concurrent.CheckedFuture;
65 import com.google.common.util.concurrent.FutureCallback;
66 import com.google.common.util.concurrent.Futures;
69 * Manage policies on switches by subscribing to updates from the
70 * policy resolver and information about endpoints from the endpoint
73 public class PolicyManager
74 implements SwitchListener, PolicyListener, EndpointListener {
75 private static final Logger LOG =
76 LoggerFactory.getLogger(PolicyManager.class);
78 private final SwitchManager switchManager;
79 private final PolicyResolver policyResolver;
81 private final PolicyScope policyScope;
83 private final ScheduledExecutorService executor;
84 private final SingletonTask flowUpdateTask;
85 private final DataBroker dataBroker;
88 * The flow tables that make up the processing pipeline
90 private final List<? extends OfTable> flowPipeline;
93 * The delay before triggering the flow update task in response to an
94 * event in milliseconds.
96 private final static int FLOW_UPDATE_DELAY = 250;
100 public PolicyManager(DataBroker dataBroker,
101 PolicyResolver policyResolver,
102 SwitchManager switchManager,
103 EndpointManager endpointManager,
104 RpcProviderRegistry rpcRegistry,
105 ScheduledExecutorService executor) {
107 this.switchManager = switchManager;
108 this.executor = executor;
109 this.policyResolver = policyResolver;
110 this.dataBroker = dataBroker;
113 if (dataBroker != null) {
114 WriteTransaction t = dataBroker.newWriteOnlyTransaction();
115 t.put(LogicalDatastoreType.OPERATIONAL,
117 .builder(SubjectFeatureDefinitions.class)
119 SubjectFeatures.OF_OVERLAY_FEATURES);
123 for(Entry<ActionDefinitionId, Action> entry : SubjectFeatures.getActions().entrySet()) {
124 policyResolver.registerActionDefinitions(entry.getKey(), entry.getValue());
127 OfContext ctx = new OfContext(dataBroker, rpcRegistry,
128 this, policyResolver, switchManager,
129 endpointManager, executor);
130 flowPipeline = ImmutableList.of(new PortSecurity(ctx),
132 new SourceMapper(ctx),
133 new DestinationMapper(ctx),
134 new PolicyEnforcer(ctx));
136 policyScope = policyResolver.registerListener(this);
137 if (switchManager != null)
138 switchManager.registerListener(this);
139 endpointManager.registerListener(this);
141 flowUpdateTask = new SingletonTask(executor, new FlowUpdateTask());
144 LOG.debug("Initialized OFOverlay policy manager");
152 public void switchReady(final NodeId nodeId) {
157 public void switchRemoved(NodeId sw) {
158 // XXX TODO purge switch flows
163 public void switchUpdated(NodeId sw) {
172 public void endpointUpdated(EpKey epKey) {
177 public void nodeEndpointUpdated(NodeId nodeId, EpKey epKey){
182 public void groupEndpointUpdated(EgKey egKey, EpKey epKey) {
183 policyScope.addToScope(egKey.getTenantId(), egKey.getEgId());
192 public void policyUpdated(Set<EgKey> updatedConsumers) {
201 * Set the learning mode to the specified value
202 * @param learningMode the learning mode to set
204 public void setLearningMode(LearningMode learningMode) {
214 public class FlowMap{
215 private ConcurrentMap<InstanceIdentifier<Table>, TableBuilder> flowMap = new ConcurrentHashMap<>();
220 public TableBuilder getTableForNode(NodeId nodeId, short tableId) {
221 InstanceIdentifier<Table> tableIid = FlowUtils.createTablePath(nodeId, tableId);
222 if(this.flowMap.get(tableIid) == null) {
223 this.flowMap.put(tableIid, new TableBuilder().setId(tableId));
224 this.flowMap.get(tableIid).setFlow(new ArrayList<Flow>());
226 return this.flowMap.get(tableIid);
229 public void writeFlow(NodeId nodeId,short tableId, Flow flow) {
230 TableBuilder tableBuilder = this.getTableForNode(nodeId, tableId);
231 if (!tableBuilder.getFlow().contains(flow)) {
232 tableBuilder.getFlow().add(Preconditions.checkNotNull(flow));
236 public void commitToDataStore() {
237 if (dataBroker != null) {
238 for( Entry<InstanceIdentifier<Table>, TableBuilder> entry : flowMap.entrySet()) {
241 * Get the currently configured flows for
244 updateFlowTable(entry);
245 } catch (Exception e) {
246 LOG.warn("Couldn't read flow table {}", entry.getKey());
252 private void updateFlowTable(Entry<InstanceIdentifier<Table>,
253 TableBuilder> entry) throws Exception {
254 Set<Flow> update = new HashSet<Flow>(entry.getValue().getFlow());
255 Set<Flow> curr = new HashSet<Flow>();
257 ReadWriteTransaction t = dataBroker.newReadWriteTransaction();
259 t.read(LogicalDatastoreType.CONFIGURATION, entry.getKey()).get();
262 Table curTable = r.get();
263 curr = new HashSet<Flow>(curTable.getFlow());
265 Sets.SetView<Flow> deletions = Sets.difference(curr, update);
266 Sets.SetView<Flow> additions = Sets.difference(update, curr);
267 if (!deletions.isEmpty()) {
268 for (Flow f: deletions) {
269 t.delete(LogicalDatastoreType.CONFIGURATION,
270 FlowUtils.createFlowPath(entry.getKey(), f.getId()));
273 if (!additions.isEmpty()) {
274 for (Flow f: additions) {
275 t.put(LogicalDatastoreType.CONFIGURATION,
276 FlowUtils.createFlowPath(entry.getKey(), f.getId()), f, true);
279 CheckedFuture<Void, TransactionCommitFailedException> f = t.submit();
280 Futures.addCallback(f, new FutureCallback<Void>() {
282 public void onFailure(Throwable t) {
283 LOG.error("Could not write flow table {}", t);
287 public void onSuccess(Void result) {
288 LOG.debug("Flow table updated.");
293 private void purgeFromDataStore() {
294 // TODO: tbachman: Remove for Lithium -- this is a workaround
295 // where some flow-mods aren't getting installed
296 // on vSwitches when changing L3 contexts
297 WriteTransaction d = dataBroker.newWriteOnlyTransaction();
299 for( Entry<InstanceIdentifier<Table>, TableBuilder> entry : flowMap.entrySet()) {
300 d.delete(LogicalDatastoreType.CONFIGURATION, entry.getKey());
303 CheckedFuture<Void, TransactionCommitFailedException> fu = d.submit();
304 Futures.addCallback(fu, new FutureCallback<Void>() {
306 public void onFailure(Throwable th) {
307 LOG.error("Could not write flow table.", th);
311 public void onSuccess(Void result) {
312 LOG.debug("Flow table updated.");
319 private void scheduleUpdate() {
320 if (switchManager != null) {
321 LOG.trace("Scheduling flow update task");
322 flowUpdateTask.reschedule(FLOW_UPDATE_DELAY, TimeUnit.MILLISECONDS);
327 * Update the flows on a particular switch
329 private class SwitchFlowUpdateTask implements Callable<Void> {
330 private FlowMap flowMap;
332 public SwitchFlowUpdateTask(FlowMap flowMap) {
334 this.flowMap = flowMap;
338 public Void call() throws Exception {
339 for (NodeId node : switchManager.getReadySwitches()) {
340 PolicyInfo info = policyResolver.getCurrentPolicy();
343 for (OfTable table : flowPipeline) {
345 table.update(node, info, flowMap);
346 } catch (Exception e) {
347 LOG.error("Failed to write flow table {}",
348 table.getClass().getSimpleName(), e);
357 * Update all flows on all switches as needed. Note that this will block
358 * one of the threads on the executor.
360 private class FlowUpdateTask implements Runnable {
363 LOG.debug("Beginning flow update task");
365 CompletionService<Void> ecs
366 = new ExecutorCompletionService<Void>(executor);
369 FlowMap flowMap = new FlowMap();
371 SwitchFlowUpdateTask swut = new SwitchFlowUpdateTask(flowMap);
375 for (int i = 0; i < n; i++) {
378 flowMap.commitToDataStore();
379 } catch (InterruptedException | ExecutionException e) {
380 LOG.error("Failed to update flow tables", e);
383 LOG.debug("Flow update completed");