2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.groupbasedpolicy.renderer.ofoverlay;
11 import java.util.ArrayList;
12 import java.util.List;
13 import java.util.Map.Entry;
15 import java.util.concurrent.Callable;
16 import java.util.concurrent.CompletionService;
17 import java.util.concurrent.ConcurrentHashMap;
18 import java.util.concurrent.ConcurrentMap;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.ExecutorCompletionService;
21 import java.util.concurrent.ScheduledExecutorService;
22 import java.util.concurrent.TimeUnit;
24 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
25 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
26 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
27 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
28 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
29 import org.opendaylight.groupbasedpolicy.endpoint.EpKey;
30 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.DestinationMapper;
31 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.FlowUtils;
32 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.GroupTable;
33 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.OfTable;
34 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.PolicyEnforcer;
35 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.PortSecurity;
36 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.SourceMapper;
37 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.sf.SubjectFeatures;
38 import org.opendaylight.groupbasedpolicy.resolver.EgKey;
39 import org.opendaylight.groupbasedpolicy.resolver.PolicyInfo;
40 import org.opendaylight.groupbasedpolicy.resolver.PolicyListener;
41 import org.opendaylight.groupbasedpolicy.resolver.PolicyResolver;
42 import org.opendaylight.groupbasedpolicy.resolver.PolicyScope;
43 import org.opendaylight.groupbasedpolicy.util.SingletonTask;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.ofoverlay.rev140528.OfOverlayConfig.LearningMode;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.SubjectFeatureDefinitions;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
50 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
54 import com.google.common.base.Preconditions;
55 import com.google.common.collect.ImmutableList;
56 import com.google.common.util.concurrent.CheckedFuture;
57 import com.google.common.util.concurrent.FutureCallback;
58 import com.google.common.util.concurrent.Futures;
61 * Manage policies on switches by subscribing to updates from the
62 * policy resolver and information about endpoints from the endpoint
65 public class PolicyManager
66 implements SwitchListener, PolicyListener, EndpointListener {
67 private static final Logger LOG =
68 LoggerFactory.getLogger(PolicyManager.class);
70 private final SwitchManager switchManager;
71 private final PolicyResolver policyResolver;
73 private final PolicyScope policyScope;
75 private final ScheduledExecutorService executor;
76 private final SingletonTask flowUpdateTask;
77 private final DataBroker dataBroker;
80 * The flow tables that make up the processing pipeline
82 private final List<? extends OfTable> flowPipeline;
85 * The delay before triggering the flow update task in response to an
86 * event in milliseconds.
88 private final static int FLOW_UPDATE_DELAY = 250;
92 public PolicyManager(DataBroker dataBroker,
93 PolicyResolver policyResolver,
94 SwitchManager switchManager,
95 EndpointManager endpointManager,
96 RpcProviderRegistry rpcRegistry,
97 ScheduledExecutorService executor) {
99 this.switchManager = switchManager;
100 this.executor = executor;
101 this.policyResolver = policyResolver;
102 this.dataBroker = dataBroker;
105 if (dataBroker != null) {
106 WriteTransaction t = dataBroker.newWriteOnlyTransaction();
107 t.put(LogicalDatastoreType.OPERATIONAL,
109 .builder(SubjectFeatureDefinitions.class)
111 SubjectFeatures.OF_OVERLAY_FEATURES);
115 OfContext ctx = new OfContext(dataBroker, rpcRegistry,
116 this, policyResolver, switchManager,
117 endpointManager, executor);
118 flowPipeline = ImmutableList.of(new PortSecurity(ctx),
120 new SourceMapper(ctx),
121 new DestinationMapper(ctx),
122 new PolicyEnforcer(ctx));
124 policyScope = policyResolver.registerListener(this);
125 if (switchManager != null)
126 switchManager.registerListener(this);
127 endpointManager.registerListener(this);
129 flowUpdateTask = new SingletonTask(executor, new FlowUpdateTask());
132 LOG.debug("Initialized OFOverlay policy manager");
140 public void switchReady(final NodeId nodeId) {
141 //TODO Apr15 alagalah : OVSDB CRUD tunnels may go here.
142 // WriteTransaction t = dataBroker.newWriteOnlyTransaction();
144 // NodeBuilder nb = new NodeBuilder()
146 // .addAugmentation(FlowCapableNode.class,
147 // new FlowCapableNodeBuilder()
149 // t.merge(LogicalDatastoreType.CONFIGURATION,
150 // FlowUtils.createNodePath(nodeId),
151 // nb.build(), true);
152 // ListenableFuture<Void> result = t.submit();
153 // Futures.addCallback(result,
154 // new FutureCallback<Void>() {
156 // public void onSuccess(Void result) {
157 // dirty.get().addNode(nodeId);
162 // public void onFailure(Throwable t) {
163 // LOG.error("Could not add switch {}", nodeId, t);
170 public void switchRemoved(NodeId sw) {
171 // XXX TODO purge switch flows
176 public void switchUpdated(NodeId sw) {
185 public void endpointUpdated(EpKey epKey) {
190 public void nodeEndpointUpdated(NodeId nodeId, EpKey epKey){
195 public void groupEndpointUpdated(EgKey egKey, EpKey epKey) {
196 policyScope.addToScope(egKey.getTenantId(), egKey.getEgId());
205 public void policyUpdated(Set<EgKey> updatedConsumers) {
214 * Set the learning mode to the specified value
215 * @param learningMode the learning mode to set
217 public void setLearningMode(LearningMode learningMode) {
227 public class FlowMap{
228 private ConcurrentMap<InstanceIdentifier<Table>, TableBuilder> flowMap = new ConcurrentHashMap<>();
233 public TableBuilder getTableForNode(NodeId nodeId, short tableId) {
234 InstanceIdentifier<Table> tableIid = FlowUtils.createTablePath(nodeId, tableId);
235 if(this.flowMap.get(tableIid) == null) {
236 this.flowMap.put(tableIid, new TableBuilder().setId(tableId));
237 this.flowMap.get(tableIid).setFlow(new ArrayList<Flow>());
239 return this.flowMap.get(tableIid);
242 public void writeFlow(NodeId nodeId,short tableId, Flow flow) {
243 TableBuilder tableBuilder = this.getTableForNode(nodeId, tableId);
244 if (!tableBuilder.getFlow().contains(flow)) {
245 tableBuilder.getFlow().add(Preconditions.checkNotNull(flow));
249 public void commitToDataStore() {
250 if (dataBroker != null) {
251 WriteTransaction t = dataBroker.newWriteOnlyTransaction();
253 for( Entry<InstanceIdentifier<Table>, TableBuilder> entry : flowMap.entrySet()) {
254 t.put(LogicalDatastoreType.CONFIGURATION,
255 entry.getKey(), entry.getValue().build(),true);
258 CheckedFuture<Void, TransactionCommitFailedException> f = t.submit();
259 Futures.addCallback(f, new FutureCallback<Void>() {
261 public void onFailure(Throwable t) {
262 LOG.error("Could not write flow table.", t);
266 public void onSuccess(Void result) {
267 LOG.debug("Flow table updated.");
275 private void scheduleUpdate() {
276 if (switchManager != null) {
277 LOG.trace("Scheduling flow update task");
278 flowUpdateTask.reschedule(FLOW_UPDATE_DELAY, TimeUnit.MILLISECONDS);
283 * Update the flows on a particular switch
285 private class SwitchFlowUpdateTask implements Callable<Void> {
286 private FlowMap flowMap;
288 public SwitchFlowUpdateTask(FlowMap flowMap) {
290 this.flowMap = flowMap;
294 public Void call() throws Exception {
295 for (NodeId node : switchManager.getReadySwitches()) {
296 if (!switchManager.isSwitchReady(node))
298 PolicyInfo info = policyResolver.getCurrentPolicy();
301 for (OfTable table : flowPipeline) {
303 table.update(node, info, flowMap);
304 } catch (Exception e) {
305 LOG.error("Failed to write flow table {}",
306 table.getClass().getSimpleName(), e);
315 * Update all flows on all switches as needed. Note that this will block
316 * one of the threads on the executor.
318 private class FlowUpdateTask implements Runnable {
321 LOG.debug("Beginning flow update task");
323 CompletionService<Void> ecs
324 = new ExecutorCompletionService<Void>(executor);
327 FlowMap flowMap = new FlowMap();
329 SwitchFlowUpdateTask swut = new SwitchFlowUpdateTask(flowMap);
333 for (int i = 0; i < n; i++) {
336 flowMap.commitToDataStore();
337 } catch (InterruptedException | ExecutionException e) {
338 LOG.error("Failed to update flow tables", e);
341 LOG.debug("Flow update completed");