2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.groupbasedpolicy.renderer.ofoverlay;
11 import java.util.Collections;
12 import java.util.List;
14 import java.util.concurrent.Callable;
15 import java.util.concurrent.CompletionService;
16 import java.util.concurrent.ConcurrentHashMap;
17 import java.util.concurrent.ConcurrentMap;
18 import java.util.concurrent.ExecutionException;
19 import java.util.concurrent.ExecutorCompletionService;
20 import java.util.concurrent.ScheduledExecutorService;
21 import java.util.concurrent.TimeUnit;
22 import java.util.concurrent.atomic.AtomicInteger;
23 import java.util.concurrent.atomic.AtomicReference;
25 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
26 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
27 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
28 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
29 import org.opendaylight.groupbasedpolicy.endpoint.EpKey;
30 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.DestinationMapper;
31 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.GroupTable;
32 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.OfTable;
33 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.OfTable.OfTableCtx;
34 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.PolicyEnforcer;
35 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.PortSecurity;
36 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.SourceMapper;
37 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.sf.SubjectFeatures;
38 import org.opendaylight.groupbasedpolicy.resolver.ConditionGroup;
39 import org.opendaylight.groupbasedpolicy.resolver.EgKey;
40 import org.opendaylight.groupbasedpolicy.resolver.PolicyInfo;
41 import org.opendaylight.groupbasedpolicy.resolver.PolicyListener;
42 import org.opendaylight.groupbasedpolicy.resolver.PolicyResolver;
43 import org.opendaylight.groupbasedpolicy.resolver.PolicyScope;
44 import org.opendaylight.groupbasedpolicy.util.SetUtils;
45 import org.opendaylight.groupbasedpolicy.util.SingletonTask;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.TenantId;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.UniqueId;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.ofoverlay.rev140528.OfOverlayConfig.LearningMode;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.SubjectFeatureDefinitions;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
51 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
55 import com.google.common.collect.ImmutableList;
58 * Manage policies on switches by subscribing to updates from the
59 * policy resolver and information about endpoints from the endpoint
63 public class PolicyManager
64 implements SwitchListener, PolicyListener, EndpointListener {
65 private static final Logger LOG =
66 LoggerFactory.getLogger(PolicyManager.class);
68 private final SwitchManager switchManager;
69 private final PolicyResolver policyResolver;
71 private final PolicyScope policyScope;
73 private final AtomicReference<Dirty> dirty;
75 private final ScheduledExecutorService executor;
76 private final SingletonTask flowUpdateTask;
79 * The flow tables that make up the processing pipeline
81 private final List<? extends OfTable> flowPipeline;
84 * The delay before triggering the flow update task in response to an
85 * event in milliseconds.
87 private final static int FLOW_UPDATE_DELAY = 250;
90 * Counter used to allocate ordinal values for forwarding contexts
93 private final AtomicInteger policyOrdinal = new AtomicInteger(1);
96 * Keep track of currently-allocated ordinals
98 // XXX For the endpoint groups, we need a globally unique ordinal, so
99 // should ultimately involve some sort of distributed agreement
100 // or a leader to allocate them. For now we'll just use a counter and
101 // this local map. Also theoretically need to garbage collect periodically
102 private final ConcurrentMap<String, Integer> ordinals =
103 new ConcurrentHashMap<>();
104 // XXX - need to garbage collect
105 private final ConcurrentMap<ConditionGroup, Integer> cgOrdinals =
106 new ConcurrentHashMap<>();
108 public PolicyManager(DataBroker dataBroker,
109 PolicyResolver policyResolver,
110 SwitchManager switchManager,
111 EndpointManager endpointManager,
112 RpcProviderRegistry rpcRegistry,
113 ScheduledExecutorService executor) {
115 this.switchManager = switchManager;
116 this.executor = executor;
117 this.policyResolver = policyResolver;
119 if (dataBroker != null) {
120 WriteTransaction t = dataBroker.newWriteOnlyTransaction();
121 t.put(LogicalDatastoreType.OPERATIONAL,
123 .builder(SubjectFeatureDefinitions.class)
125 SubjectFeatures.OF_OVERLAY_FEATURES);
129 OfTableCtx ctx = new OfTableCtx(dataBroker, rpcRegistry,
130 this, policyResolver, switchManager,
131 endpointManager, executor);
132 flowPipeline = ImmutableList.of(new PortSecurity(ctx),
134 new SourceMapper(ctx),
135 new DestinationMapper(ctx),
136 new PolicyEnforcer(ctx));
138 policyScope = policyResolver.registerListener(this);
139 if (switchManager != null)
140 switchManager.registerListener(this);
141 endpointManager.registerListener(this);
143 dirty = new AtomicReference<>(new Dirty());
145 flowUpdateTask = new SingletonTask(executor, new FlowUpdateTask());
148 LOG.debug("Initialized OFOverlay policy manager");
156 public void switchReady(final NodeId nodeId) {
157 // WriteTransaction t = dataBroker.newWriteOnlyTransaction();
159 // NodeBuilder nb = new NodeBuilder()
161 // .addAugmentation(FlowCapableNode.class,
162 // new FlowCapableNodeBuilder()
164 // t.merge(LogicalDatastoreType.CONFIGURATION,
165 // FlowUtils.createNodePath(nodeId),
166 // nb.build(), true);
167 // ListenableFuture<Void> result = t.submit();
168 // Futures.addCallback(result,
169 // new FutureCallback<Void>() {
171 // public void onSuccess(Void result) {
172 // dirty.get().addNode(nodeId);
177 // public void onFailure(Throwable t) {
178 // LOG.error("Could not add switch {}", nodeId, t);
185 public void switchRemoved(NodeId sw) {
186 // XXX TODO purge switch flows
187 dirty.get().addNode(sw);
192 public void switchUpdated(NodeId sw) {
193 dirty.get().addNode(sw);
202 public void endpointUpdated(EpKey epKey) {
203 dirty.get().addEndpoint(epKey);
208 public void nodeEndpointUpdated(NodeId nodeId, EpKey epKey){
209 dirty.get().addNodeEp(nodeId, epKey);
214 public void groupEndpointUpdated(EgKey egKey, EpKey epKey) {
215 dirty.get().addEndpointGroupEp(egKey, epKey);
216 policyScope.addToScope(egKey.getTenantId(), egKey.getEgId());
225 public void policyUpdated(Set<EgKey> updatedConsumers) {
226 for (EgKey key : updatedConsumers) {
227 dirty.get().addEndpointGroup(key);
237 * Set the learning mode to the specified value
238 * @param learningMode the learning mode to set
240 public void setLearningMode(LearningMode learningMode) {
245 * Get a unique ordinal for the given condition group, suitable for
246 * use in the data plane. This is unique only for this node, and not
248 * @param cg the {@link ConditionGroup}
249 * @return the unique ID
251 public int getCondGroupOrdinal(final ConditionGroup cg) {
252 if (cg == null) return 0;
253 Integer ord = cgOrdinals.get(cg);
255 ord = policyOrdinal.getAndIncrement();
256 Integer old = cgOrdinals.putIfAbsent(cg, ord);
257 if (old != null) ord = old;
259 return ord.intValue();
263 * Get a 32-bit context ordinal suitable for use in the OF data plane
264 * for the given policy item.
265 * @param tenantId the tenant ID of the element
266 * @param id the unique ID for the element
267 * @return the 32-bit ordinal value
269 public int getContextOrdinal(final TenantId tenantId,
270 final UniqueId id) throws Exception {
271 if (tenantId == null || id == null) return 0;
272 return getContextOrdinal(tenantId.getValue() + "|" + id.getValue());
276 * Get a 32-bit context ordinal suitable for use in the OF data plane
277 * for the given policy item.
278 * @param id the unique ID for the element
279 * @return the 32-bit ordinal value
281 public int getContextOrdinal(final String id) throws Exception {
283 Integer ord = ordinals.get(id);
285 ord = policyOrdinal.getAndIncrement();
286 Integer old = ordinals.putIfAbsent(id, ord);
287 if (old != null) ord = old;
289 return ord.intValue();
296 private void scheduleUpdate() {
297 if (switchManager != null) {
298 LOG.trace("Scheduling flow update task");
299 flowUpdateTask.reschedule(FLOW_UPDATE_DELAY, TimeUnit.MILLISECONDS);
304 * Update the flows on a particular switch
306 private class SwitchFlowUpdateTask implements Callable<Void> {
307 private final Dirty dirty;
308 private final NodeId nodeId;
310 public SwitchFlowUpdateTask(Dirty dirty, NodeId nodeId) {
313 this.nodeId = nodeId;
317 public Void call() throws Exception {
318 if (!switchManager.isSwitchReady(nodeId)) return null;
319 PolicyInfo info = policyResolver.getCurrentPolicy();
320 if (info == null) return null;
321 for (OfTable table : flowPipeline) {
323 table.update(nodeId, info, dirty);
324 } catch (Exception e) {
325 LOG.error("Failed to write flow table {}",
326 table.getClass().getSimpleName(), e);
334 * Update all flows on all switches as needed. Note that this will block
335 * one of the threads on the executor.
338 private class FlowUpdateTask implements Runnable {
341 LOG.debug("Beginning flow update task");
343 Dirty d = dirty.getAndSet(new Dirty());
344 CompletionService<Void> ecs
345 = new ExecutorCompletionService<Void>(executor);
347 for (NodeId node : switchManager.getReadySwitches()) {
348 SwitchFlowUpdateTask swut = new SwitchFlowUpdateTask(d, node);
352 for (int i = 0; i < n; i++) {
355 } catch (InterruptedException | ExecutionException e) {
356 LOG.error("Failed to update flow tables", e);
359 LOG.debug("Flow update completed");
364 * Dirty state since our last successful flow table sync.
366 public static class Dirty {
367 private Set<EpKey> endpoints;
368 private Set<NodeId> nodes;
369 private Set<EgKey> groups;
370 private ConcurrentMap<EgKey, Set<EpKey>> groupEps;
371 private ConcurrentMap<NodeId, Set<EpKey>> nodeEps;
374 ConcurrentHashMap<EpKey,Boolean> epmap = new ConcurrentHashMap<>();
375 endpoints = Collections.newSetFromMap(epmap);
376 ConcurrentHashMap<NodeId,Boolean> nomap = new ConcurrentHashMap<>();
377 nodes = Collections.newSetFromMap(nomap);
378 ConcurrentHashMap<EgKey,Boolean> grmap = new ConcurrentHashMap<>();
379 groups = Collections.newSetFromMap(grmap);
381 groupEps = new ConcurrentHashMap<>();
382 nodeEps = new ConcurrentHashMap<>();
385 public void addEndpointGroupEp(EgKey egKey, EpKey epKey) {
386 SetUtils.getNestedSet(egKey, groupEps)
389 public void addNodeEp(NodeId id, EpKey epKey) {
390 SetUtils.getNestedSet(id, nodeEps).add(epKey);
392 public void addNode(NodeId id) {
395 public void addEndpointGroup(EgKey key) {
398 public void addEndpoint(EpKey epKey) {
399 endpoints.add(epKey);
402 public Set<EpKey> getEndpoints() {
406 public Set<NodeId> getNodes() {
410 public Set<EgKey> getGroups() {
414 public ConcurrentMap<EgKey, Set<EpKey>> getGroupEps() {
418 public ConcurrentMap<NodeId, Set<EpKey>> getNodeEps() {