2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.groupbasedpolicy.renderer.ofoverlay;
11 import java.util.ArrayList;
12 import java.util.HashSet;
13 import java.util.List;
14 import java.util.Map.Entry;
16 import java.util.concurrent.Callable;
17 import java.util.concurrent.CompletionService;
18 import java.util.concurrent.ConcurrentHashMap;
19 import java.util.concurrent.ConcurrentMap;
20 import java.util.concurrent.ExecutionException;
21 import java.util.concurrent.ExecutorCompletionService;
22 import java.util.concurrent.ScheduledExecutorService;
23 import java.util.concurrent.TimeUnit;
25 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
26 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
27 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
28 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
29 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
30 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
31 import org.opendaylight.groupbasedpolicy.endpoint.EpKey;
32 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.DestinationMapper;
33 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.FlowUtils;
34 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.GroupTable;
35 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.OfTable;
36 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.PolicyEnforcer;
37 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.PortSecurity;
38 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.SourceMapper;
39 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.node.SwitchListener;
40 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.node.SwitchManager;
41 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.sf.SubjectFeatures;
42 import org.opendaylight.groupbasedpolicy.resolver.EgKey;
43 import org.opendaylight.groupbasedpolicy.resolver.PolicyInfo;
44 import org.opendaylight.groupbasedpolicy.resolver.PolicyListener;
45 import org.opendaylight.groupbasedpolicy.resolver.PolicyResolver;
46 import org.opendaylight.groupbasedpolicy.resolver.PolicyScope;
47 import org.opendaylight.groupbasedpolicy.util.SingletonTask;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.ofoverlay.rev140528.OfOverlayConfig.LearningMode;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.SubjectFeatureDefinitions;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
54 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
58 import com.google.common.base.Optional;
59 import com.google.common.base.Preconditions;
60 import com.google.common.collect.ImmutableList;
61 import com.google.common.collect.Sets;
62 import com.google.common.util.concurrent.CheckedFuture;
63 import com.google.common.util.concurrent.FutureCallback;
64 import com.google.common.util.concurrent.Futures;
67 * Manage policies on switches by subscribing to updates from the
68 * policy resolver and information about endpoints from the endpoint
71 public class PolicyManager
72 implements SwitchListener, PolicyListener, EndpointListener {
73 private static final Logger LOG =
74 LoggerFactory.getLogger(PolicyManager.class);
76 private final SwitchManager switchManager;
77 private final PolicyResolver policyResolver;
79 private final PolicyScope policyScope;
81 private final ScheduledExecutorService executor;
82 private final SingletonTask flowUpdateTask;
83 private final DataBroker dataBroker;
86 * The flow tables that make up the processing pipeline
88 private final List<? extends OfTable> flowPipeline;
91 * The delay before triggering the flow update task in response to an
92 * event in milliseconds.
94 private final static int FLOW_UPDATE_DELAY = 250;
98 public PolicyManager(DataBroker dataBroker,
99 PolicyResolver policyResolver,
100 SwitchManager switchManager,
101 EndpointManager endpointManager,
102 RpcProviderRegistry rpcRegistry,
103 ScheduledExecutorService executor) {
105 this.switchManager = switchManager;
106 this.executor = executor;
107 this.policyResolver = policyResolver;
108 this.dataBroker = dataBroker;
111 if (dataBroker != null) {
112 WriteTransaction t = dataBroker.newWriteOnlyTransaction();
113 t.put(LogicalDatastoreType.OPERATIONAL,
115 .builder(SubjectFeatureDefinitions.class)
117 SubjectFeatures.OF_OVERLAY_FEATURES);
121 OfContext ctx = new OfContext(dataBroker, rpcRegistry,
122 this, policyResolver, switchManager,
123 endpointManager, executor);
124 flowPipeline = ImmutableList.of(new PortSecurity(ctx),
126 new SourceMapper(ctx),
127 new DestinationMapper(ctx),
128 new PolicyEnforcer(ctx));
130 policyScope = policyResolver.registerListener(this);
131 if (switchManager != null)
132 switchManager.registerListener(this);
133 endpointManager.registerListener(this);
135 flowUpdateTask = new SingletonTask(executor, new FlowUpdateTask());
138 LOG.debug("Initialized OFOverlay policy manager");
146 public void switchReady(final NodeId nodeId) {
151 public void switchRemoved(NodeId sw) {
152 // XXX TODO purge switch flows
157 public void switchUpdated(NodeId sw) {
166 public void endpointUpdated(EpKey epKey) {
171 public void nodeEndpointUpdated(NodeId nodeId, EpKey epKey){
176 public void groupEndpointUpdated(EgKey egKey, EpKey epKey) {
177 policyScope.addToScope(egKey.getTenantId(), egKey.getEgId());
186 public void policyUpdated(Set<EgKey> updatedConsumers) {
195 * Set the learning mode to the specified value
196 * @param learningMode the learning mode to set
198 public void setLearningMode(LearningMode learningMode) {
208 public class FlowMap{
209 private ConcurrentMap<InstanceIdentifier<Table>, TableBuilder> flowMap = new ConcurrentHashMap<>();
214 public TableBuilder getTableForNode(NodeId nodeId, short tableId) {
215 InstanceIdentifier<Table> tableIid = FlowUtils.createTablePath(nodeId, tableId);
216 if(this.flowMap.get(tableIid) == null) {
217 this.flowMap.put(tableIid, new TableBuilder().setId(tableId));
218 this.flowMap.get(tableIid).setFlow(new ArrayList<Flow>());
220 return this.flowMap.get(tableIid);
223 public void writeFlow(NodeId nodeId,short tableId, Flow flow) {
224 TableBuilder tableBuilder = this.getTableForNode(nodeId, tableId);
225 if (!tableBuilder.getFlow().contains(flow)) {
226 tableBuilder.getFlow().add(Preconditions.checkNotNull(flow));
230 public void commitToDataStore() {
231 if (dataBroker != null) {
232 for( Entry<InstanceIdentifier<Table>, TableBuilder> entry : flowMap.entrySet()) {
235 * Get the currently configured flows for
238 updateFlowTable(entry);
239 } catch (Exception e) {
240 LOG.warn("Couldn't read flow table {}", entry.getKey());
246 private void updateFlowTable(Entry<InstanceIdentifier<Table>,
247 TableBuilder> entry) throws Exception {
248 Set<Flow> update = new HashSet<Flow>(entry.getValue().getFlow());
249 Set<Flow> curr = new HashSet<Flow>();
251 ReadWriteTransaction t = dataBroker.newReadWriteTransaction();
253 t.read(LogicalDatastoreType.CONFIGURATION, entry.getKey()).get();
256 Table curTable = r.get();
257 curr = new HashSet<Flow>(curTable.getFlow());
259 Sets.SetView<Flow> deletions = Sets.difference(curr, update);
260 Sets.SetView<Flow> additions = Sets.difference(update, curr);
261 if (!deletions.isEmpty()) {
262 for (Flow f: deletions) {
263 t.delete(LogicalDatastoreType.CONFIGURATION,
264 FlowUtils.createFlowPath(entry.getKey(), f.getId()));
267 if (!additions.isEmpty()) {
268 for (Flow f: additions) {
269 t.put(LogicalDatastoreType.CONFIGURATION,
270 FlowUtils.createFlowPath(entry.getKey(), f.getId()), f, true);
273 CheckedFuture<Void, TransactionCommitFailedException> f = t.submit();
274 Futures.addCallback(f, new FutureCallback<Void>() {
276 public void onFailure(Throwable t) {
277 LOG.error("Could not write flow table {}", t);
281 public void onSuccess(Void result) {
282 LOG.debug("Flow table updated.");
287 private void purgeFromDataStore() {
288 // TODO: tbachman: Remove for Lithium -- this is a workaround
289 // where some flow-mods aren't getting installed
290 // on vSwitches when changing L3 contexts
291 WriteTransaction d = dataBroker.newWriteOnlyTransaction();
293 for( Entry<InstanceIdentifier<Table>, TableBuilder> entry : flowMap.entrySet()) {
294 d.delete(LogicalDatastoreType.CONFIGURATION, entry.getKey());
297 CheckedFuture<Void, TransactionCommitFailedException> fu = d.submit();
298 Futures.addCallback(fu, new FutureCallback<Void>() {
300 public void onFailure(Throwable th) {
301 LOG.error("Could not write flow table.", th);
305 public void onSuccess(Void result) {
306 LOG.debug("Flow table updated.");
313 private void scheduleUpdate() {
314 if (switchManager != null) {
315 LOG.trace("Scheduling flow update task");
316 flowUpdateTask.reschedule(FLOW_UPDATE_DELAY, TimeUnit.MILLISECONDS);
321 * Update the flows on a particular switch
323 private class SwitchFlowUpdateTask implements Callable<Void> {
324 private FlowMap flowMap;
326 public SwitchFlowUpdateTask(FlowMap flowMap) {
328 this.flowMap = flowMap;
332 public Void call() throws Exception {
333 for (NodeId node : switchManager.getReadySwitches()) {
334 PolicyInfo info = policyResolver.getCurrentPolicy();
337 for (OfTable table : flowPipeline) {
339 table.update(node, info, flowMap);
340 } catch (Exception e) {
341 LOG.error("Failed to write flow table {}",
342 table.getClass().getSimpleName(), e);
351 * Update all flows on all switches as needed. Note that this will block
352 * one of the threads on the executor.
354 private class FlowUpdateTask implements Runnable {
357 LOG.debug("Beginning flow update task");
359 CompletionService<Void> ecs
360 = new ExecutorCompletionService<Void>(executor);
363 FlowMap flowMap = new FlowMap();
365 SwitchFlowUpdateTask swut = new SwitchFlowUpdateTask(flowMap);
369 for (int i = 0; i < n; i++) {
372 flowMap.commitToDataStore();
373 } catch (InterruptedException | ExecutionException e) {
374 LOG.error("Failed to update flow tables", e);
377 LOG.debug("Flow update completed");