2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.groupbasedpolicy.renderer.ofoverlay;
11 import java.util.Collections;
12 import java.util.List;
14 import java.util.concurrent.Callable;
15 import java.util.concurrent.CompletionService;
16 import java.util.concurrent.ConcurrentHashMap;
17 import java.util.concurrent.ConcurrentMap;
18 import java.util.concurrent.ExecutionException;
19 import java.util.concurrent.ExecutorCompletionService;
20 import java.util.concurrent.ScheduledExecutorService;
21 import java.util.concurrent.TimeUnit;
22 import java.util.concurrent.atomic.AtomicInteger;
23 import java.util.concurrent.atomic.AtomicReference;
25 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
26 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
27 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
28 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
29 import org.opendaylight.groupbasedpolicy.endpoint.EpKey;
30 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.DestinationMapper;
31 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.GroupTable;
32 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.OfTable;
33 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.PolicyEnforcer;
34 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.PortSecurity;
35 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.SourceMapper;
36 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.sf.SubjectFeatures;
37 import org.opendaylight.groupbasedpolicy.resolver.ConditionGroup;
38 import org.opendaylight.groupbasedpolicy.resolver.EgKey;
39 import org.opendaylight.groupbasedpolicy.resolver.PolicyInfo;
40 import org.opendaylight.groupbasedpolicy.resolver.PolicyListener;
41 import org.opendaylight.groupbasedpolicy.resolver.PolicyResolver;
42 import org.opendaylight.groupbasedpolicy.resolver.PolicyScope;
43 import org.opendaylight.groupbasedpolicy.util.SetUtils;
44 import org.opendaylight.groupbasedpolicy.util.SingletonTask;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.TenantId;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.UniqueId;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.ofoverlay.rev140528.OfOverlayConfig.LearningMode;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.SubjectFeatureDefinitions;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
50 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
54 import com.google.common.collect.ImmutableList;
57 * Manage policies on switches by subscribing to updates from the
58 * policy resolver and information about endpoints from the endpoint
62 public class PolicyManager
63 implements SwitchListener, PolicyListener, EndpointListener {
64 private static final Logger LOG =
65 LoggerFactory.getLogger(PolicyManager.class);
67 private final SwitchManager switchManager;
68 private final PolicyResolver policyResolver;
70 private final PolicyScope policyScope;
72 private final AtomicReference<Dirty> dirty;
74 private final ScheduledExecutorService executor;
75 private final SingletonTask flowUpdateTask;
78 * The flow tables that make up the processing pipeline
80 private final List<? extends OfTable> flowPipeline;
83 * The delay before triggering the flow update task in response to an
84 * event in milliseconds.
86 private final static int FLOW_UPDATE_DELAY = 250;
89 * Counter used to allocate ordinal values for forwarding contexts
92 private final AtomicInteger policyOrdinal = new AtomicInteger(1);
95 * Keep track of currently-allocated ordinals
97 // XXX For the endpoint groups, we need a globally unique ordinal, so
98 // should ultimately involve some sort of distributed agreement
99 // or a leader to allocate them. For now we'll just use a counter and
100 // this local map. Also theoretically need to garbage collect periodically
101 private final ConcurrentMap<String, Integer> ordinals =
102 new ConcurrentHashMap<>();
103 // XXX - need to garbage collect
104 private final ConcurrentMap<ConditionGroup, Integer> cgOrdinals =
105 new ConcurrentHashMap<>();
107 public PolicyManager(DataBroker dataBroker,
108 PolicyResolver policyResolver,
109 SwitchManager switchManager,
110 EndpointManager endpointManager,
111 RpcProviderRegistry rpcRegistry,
112 ScheduledExecutorService executor) {
114 this.switchManager = switchManager;
115 this.executor = executor;
116 this.policyResolver = policyResolver;
118 if (dataBroker != null) {
119 WriteTransaction t = dataBroker.newWriteOnlyTransaction();
120 t.put(LogicalDatastoreType.OPERATIONAL,
122 .builder(SubjectFeatureDefinitions.class)
124 SubjectFeatures.OF_OVERLAY_FEATURES);
128 OfContext ctx = new OfContext(dataBroker, rpcRegistry,
129 this, policyResolver, switchManager,
130 endpointManager, executor);
131 flowPipeline = ImmutableList.of(new PortSecurity(ctx),
133 new SourceMapper(ctx),
134 new DestinationMapper(ctx),
135 new PolicyEnforcer(ctx));
137 policyScope = policyResolver.registerListener(this);
138 if (switchManager != null)
139 switchManager.registerListener(this);
140 endpointManager.registerListener(this);
142 dirty = new AtomicReference<>(new Dirty());
144 flowUpdateTask = new SingletonTask(executor, new FlowUpdateTask());
147 LOG.debug("Initialized OFOverlay policy manager");
155 public void switchReady(final NodeId nodeId) {
156 // WriteTransaction t = dataBroker.newWriteOnlyTransaction();
158 // NodeBuilder nb = new NodeBuilder()
160 // .addAugmentation(FlowCapableNode.class,
161 // new FlowCapableNodeBuilder()
163 // t.merge(LogicalDatastoreType.CONFIGURATION,
164 // FlowUtils.createNodePath(nodeId),
165 // nb.build(), true);
166 // ListenableFuture<Void> result = t.submit();
167 // Futures.addCallback(result,
168 // new FutureCallback<Void>() {
170 // public void onSuccess(Void result) {
171 // dirty.get().addNode(nodeId);
176 // public void onFailure(Throwable t) {
177 // LOG.error("Could not add switch {}", nodeId, t);
184 public void switchRemoved(NodeId sw) {
185 // XXX TODO purge switch flows
186 dirty.get().addNode(sw);
191 public void switchUpdated(NodeId sw) {
192 dirty.get().addNode(sw);
201 public void endpointUpdated(EpKey epKey) {
202 dirty.get().addEndpoint(epKey);
207 public void nodeEndpointUpdated(NodeId nodeId, EpKey epKey){
208 dirty.get().addNodeEp(nodeId, epKey);
213 public void groupEndpointUpdated(EgKey egKey, EpKey epKey) {
214 dirty.get().addEndpointGroupEp(egKey, epKey);
215 policyScope.addToScope(egKey.getTenantId(), egKey.getEgId());
224 public void policyUpdated(Set<EgKey> updatedConsumers) {
225 for (EgKey key : updatedConsumers) {
226 dirty.get().addEndpointGroup(key);
236 * Set the learning mode to the specified value
237 * @param learningMode the learning mode to set
239 public void setLearningMode(LearningMode learningMode) {
244 * Get a unique ordinal for the given condition group, suitable for
245 * use in the data plane. This is unique only for this node, and not
247 * @param cg the {@link ConditionGroup}
248 * @return the unique ID
250 public int getCondGroupOrdinal(final ConditionGroup cg) {
251 if (cg == null) return 0;
252 Integer ord = cgOrdinals.get(cg);
254 ord = policyOrdinal.getAndIncrement();
255 Integer old = cgOrdinals.putIfAbsent(cg, ord);
256 if (old != null) ord = old;
258 return ord.intValue();
262 * Get a 32-bit context ordinal suitable for use in the OF data plane
263 * for the given policy item.
264 * @param tenantId the tenant ID of the element
265 * @param id the unique ID for the element
266 * @return the 32-bit ordinal value
268 public int getContextOrdinal(final TenantId tenantId,
269 final UniqueId id) throws Exception {
270 if (tenantId == null || id == null) return 0;
271 return getContextOrdinal(tenantId.getValue() + "|" + id.getValue());
275 * Get a 32-bit context ordinal suitable for use in the OF data plane
276 * for the given policy item.
277 * @param id the unique ID for the element
278 * @return the 32-bit ordinal value
280 public int getContextOrdinal(final String id) throws Exception {
282 Integer ord = ordinals.get(id);
284 ord = policyOrdinal.getAndIncrement();
285 Integer old = ordinals.putIfAbsent(id, ord);
286 if (old != null) ord = old;
288 return ord.intValue();
295 private void scheduleUpdate() {
296 if (switchManager != null) {
297 LOG.trace("Scheduling flow update task");
298 flowUpdateTask.reschedule(FLOW_UPDATE_DELAY, TimeUnit.MILLISECONDS);
303 * Update the flows on a particular switch
305 private class SwitchFlowUpdateTask implements Callable<Void> {
306 private final Dirty dirty;
307 private final NodeId nodeId;
309 public SwitchFlowUpdateTask(Dirty dirty, NodeId nodeId) {
312 this.nodeId = nodeId;
316 public Void call() throws Exception {
317 if (!switchManager.isSwitchReady(nodeId)) return null;
318 PolicyInfo info = policyResolver.getCurrentPolicy();
319 if (info == null) return null;
320 for (OfTable table : flowPipeline) {
322 table.update(nodeId, info, dirty);
323 } catch (Exception e) {
324 LOG.error("Failed to write flow table {}",
325 table.getClass().getSimpleName(), e);
333 * Update all flows on all switches as needed. Note that this will block
334 * one of the threads on the executor.
337 private class FlowUpdateTask implements Runnable {
340 LOG.debug("Beginning flow update task");
342 Dirty d = dirty.getAndSet(new Dirty());
343 CompletionService<Void> ecs
344 = new ExecutorCompletionService<Void>(executor);
346 for (NodeId node : switchManager.getReadySwitches()) {
347 SwitchFlowUpdateTask swut = new SwitchFlowUpdateTask(d, node);
351 for (int i = 0; i < n; i++) {
354 } catch (InterruptedException | ExecutionException e) {
355 LOG.error("Failed to update flow tables", e);
358 LOG.debug("Flow update completed");
363 * Dirty state since our last successful flow table sync.
365 public static class Dirty {
366 private Set<EpKey> endpoints;
367 private Set<NodeId> nodes;
368 private Set<EgKey> groups;
369 private ConcurrentMap<EgKey, Set<EpKey>> groupEps;
370 private ConcurrentMap<NodeId, Set<EpKey>> nodeEps;
373 ConcurrentHashMap<EpKey,Boolean> epmap = new ConcurrentHashMap<>();
374 endpoints = Collections.newSetFromMap(epmap);
375 ConcurrentHashMap<NodeId,Boolean> nomap = new ConcurrentHashMap<>();
376 nodes = Collections.newSetFromMap(nomap);
377 ConcurrentHashMap<EgKey,Boolean> grmap = new ConcurrentHashMap<>();
378 groups = Collections.newSetFromMap(grmap);
380 groupEps = new ConcurrentHashMap<>();
381 nodeEps = new ConcurrentHashMap<>();
384 public void addEndpointGroupEp(EgKey egKey, EpKey epKey) {
385 SetUtils.getNestedSet(egKey, groupEps)
388 public void addNodeEp(NodeId id, EpKey epKey) {
389 SetUtils.getNestedSet(id, nodeEps).add(epKey);
391 public void addNode(NodeId id) {
394 public void addEndpointGroup(EgKey key) {
397 public void addEndpoint(EpKey epKey) {
398 endpoints.add(epKey);
401 public Set<EpKey> getEndpoints() {
405 public Set<NodeId> getNodes() {
409 public Set<EgKey> getGroups() {
413 public ConcurrentMap<EgKey, Set<EpKey>> getGroupEps() {
417 public ConcurrentMap<NodeId, Set<EpKey>> getNodeEps() {