Merge "Move utility function to common place."
[groupbasedpolicy.git] / renderers / ofoverlay / src / main / java / org / opendaylight / groupbasedpolicy / renderer / ofoverlay / PolicyManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.groupbasedpolicy.renderer.ofoverlay;
10
11 import java.util.Collections;
12 import java.util.List;
13 import java.util.Set;
14 import java.util.concurrent.Callable;
15 import java.util.concurrent.CompletionService;
16 import java.util.concurrent.ConcurrentHashMap;
17 import java.util.concurrent.ConcurrentMap;
18 import java.util.concurrent.ExecutionException;
19 import java.util.concurrent.ExecutorCompletionService;
20 import java.util.concurrent.ScheduledExecutorService;
21 import java.util.concurrent.TimeUnit;
22 import java.util.concurrent.atomic.AtomicInteger;
23 import java.util.concurrent.atomic.AtomicReference;
24
25 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
26 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
27 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
28 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
29 import org.opendaylight.groupbasedpolicy.endpoint.EpKey;
30 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.DestinationMapper;
31 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.GroupTable;
32 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.OfTable;
33 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.PolicyEnforcer;
34 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.PortSecurity;
35 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.SourceMapper;
36 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.sf.SubjectFeatures;
37 import org.opendaylight.groupbasedpolicy.resolver.ConditionGroup;
38 import org.opendaylight.groupbasedpolicy.resolver.EgKey;
39 import org.opendaylight.groupbasedpolicy.resolver.PolicyInfo;
40 import org.opendaylight.groupbasedpolicy.resolver.PolicyListener;
41 import org.opendaylight.groupbasedpolicy.resolver.PolicyResolver;
42 import org.opendaylight.groupbasedpolicy.resolver.PolicyScope;
43 import org.opendaylight.groupbasedpolicy.util.SetUtils;
44 import org.opendaylight.groupbasedpolicy.util.SingletonTask;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.TenantId;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.UniqueId;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.ofoverlay.rev140528.OfOverlayConfig.LearningMode;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.SubjectFeatureDefinitions;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
50 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53
54 import com.google.common.collect.ImmutableList;
55
56 /**
57  * Manage policies on switches by subscribing to updates from the 
58  * policy resolver and information about endpoints from the endpoint 
59  * registry
60  * @author readams
61  */
62 public class PolicyManager 
63      implements SwitchListener, PolicyListener, EndpointListener {
64     private static final Logger LOG = 
65             LoggerFactory.getLogger(PolicyManager.class);
66
67     private final SwitchManager switchManager;
68     private final PolicyResolver policyResolver;
69     
70     private final PolicyScope policyScope;
71     
72     private final AtomicReference<Dirty> dirty;
73     
74     private final ScheduledExecutorService executor;
75     private final SingletonTask flowUpdateTask;
76
77     /**
78      * The flow tables that make up the processing pipeline
79      */
80     private final List<? extends OfTable> flowPipeline;
81
82     /**
83      * The delay before triggering the flow update task in response to an
84      * event in milliseconds.
85      */
86     private final static int FLOW_UPDATE_DELAY = 250;
87
88     /**
89      * Counter used to allocate ordinal values for forwarding contexts
90      * and VNIDs
91      */
92     private final AtomicInteger policyOrdinal = new AtomicInteger(1);
93     
94     /**
95      * Keep track of currently-allocated ordinals
96      */
97     // XXX For the endpoint groups, we need a globally unique ordinal, so
98     // should ultimately involve some sort of distributed agreement
99     // or a leader to allocate them.  For now we'll just use a counter and
100     // this local map.  Also theoretically need to garbage collect periodically
101     private final ConcurrentMap<String, Integer> ordinals = 
102             new ConcurrentHashMap<>();
103     // XXX - need to garbage collect
104     private final ConcurrentMap<ConditionGroup, Integer> cgOrdinals = 
105             new ConcurrentHashMap<>();
106             
107     public PolicyManager(DataBroker dataBroker,
108                          PolicyResolver policyResolver,
109                          SwitchManager switchManager,
110                          EndpointManager endpointManager, 
111                          RpcProviderRegistry rpcRegistry,
112                          ScheduledExecutorService executor) {
113         super();
114         this.switchManager = switchManager;
115         this.executor = executor;
116         this.policyResolver = policyResolver;
117
118         if (dataBroker != null) {
119             WriteTransaction t = dataBroker.newWriteOnlyTransaction();
120             t.put(LogicalDatastoreType.OPERATIONAL, 
121                   InstanceIdentifier
122                       .builder(SubjectFeatureDefinitions.class)
123                       .build(),
124                   SubjectFeatures.OF_OVERLAY_FEATURES);
125             t.submit();
126         }
127
128         OfContext ctx = new OfContext(dataBroker, rpcRegistry,
129                                         this, policyResolver, switchManager, 
130                                         endpointManager, executor);
131         flowPipeline = ImmutableList.of(new PortSecurity(ctx),
132                                         new GroupTable(ctx),
133                                         new SourceMapper(ctx),
134                                         new DestinationMapper(ctx),
135                                         new PolicyEnforcer(ctx));
136
137         policyScope = policyResolver.registerListener(this);
138         if (switchManager != null)
139             switchManager.registerListener(this);
140         endpointManager.registerListener(this);
141         
142         dirty = new AtomicReference<>(new Dirty());
143         
144         flowUpdateTask = new SingletonTask(executor, new FlowUpdateTask());
145         scheduleUpdate();
146         
147         LOG.debug("Initialized OFOverlay policy manager");
148     }
149
150     // **************
151     // SwitchListener
152     // **************
153
154     @Override
155     public void switchReady(final NodeId nodeId) {
156 //        WriteTransaction t = dataBroker.newWriteOnlyTransaction();
157 //        
158 //        NodeBuilder nb = new NodeBuilder()
159 //            .setId(nodeId)
160 //            .addAugmentation(FlowCapableNode.class, 
161 //                             new FlowCapableNodeBuilder()
162 //                                .build());
163 //        t.merge(LogicalDatastoreType.CONFIGURATION, 
164 //                FlowUtils.createNodePath(nodeId),
165 //                nb.build(), true);
166 //        ListenableFuture<Void> result = t.submit();
167 //        Futures.addCallback(result, 
168 //                            new FutureCallback<Void>() {
169 //            @Override
170 //            public void onSuccess(Void result) {
171 //                dirty.get().addNode(nodeId);
172 //                scheduleUpdate();
173 //            }
174 //
175 //            @Override
176 //            public void onFailure(Throwable t) {
177 //                LOG.error("Could not add switch {}", nodeId, t);
178 //            }
179 //        });
180         
181     }
182
183     @Override
184     public void switchRemoved(NodeId sw) {
185         // XXX TODO purge switch flows
186         dirty.get().addNode(sw);
187         scheduleUpdate();
188     }
189     
190     @Override
191     public void switchUpdated(NodeId sw) {
192         dirty.get().addNode(sw);
193         scheduleUpdate();
194     }
195
196     // ****************
197     // EndpointListener
198     // ****************
199     
200     @Override
201     public void endpointUpdated(EpKey epKey) {
202         dirty.get().addEndpoint(epKey);
203         scheduleUpdate();
204     }
205
206     @Override
207     public void nodeEndpointUpdated(NodeId nodeId, EpKey epKey){
208         dirty.get().addNodeEp(nodeId, epKey);
209         scheduleUpdate();
210     }
211
212     @Override
213     public void groupEndpointUpdated(EgKey egKey, EpKey epKey) {
214         dirty.get().addEndpointGroupEp(egKey, epKey);
215         policyScope.addToScope(egKey.getTenantId(), egKey.getEgId());
216         scheduleUpdate();
217     }
218
219     // **************
220     // PolicyListener
221     // **************
222     
223     @Override
224     public void policyUpdated(Set<EgKey> updatedConsumers) {
225         for (EgKey key : updatedConsumers) {
226             dirty.get().addEndpointGroup(key);
227         }
228         scheduleUpdate();
229     }
230
231     // *************
232     // PolicyManager
233     // *************
234
235     /**
236      * Set the learning mode to the specified value
237      * @param learningMode the learning mode to set
238      */
239     public void setLearningMode(LearningMode learningMode) {
240         // No-op for now
241     }
242
243     /**
244      * Get a unique ordinal for the given condition group, suitable for
245      * use in the data plane.  This is unique only for this node, and not 
246      * globally.
247      * @param cg the {@link ConditionGroup}
248      * @return the unique ID
249      */
250     public int getCondGroupOrdinal(final ConditionGroup cg) {
251         if (cg == null) return 0;
252         Integer ord = cgOrdinals.get(cg);
253         if (ord == null) {
254             ord = policyOrdinal.getAndIncrement();
255             Integer old = cgOrdinals.putIfAbsent(cg, ord);
256             if (old != null) ord = old; 
257         }
258         return ord.intValue();
259     }
260     
261     /**
262      * Get a 32-bit context ordinal suitable for use in the OF data plane
263      * for the given policy item. 
264      * @param tenantId the tenant ID of the element
265      * @param id the unique ID for the element
266      * @return the 32-bit ordinal value
267      */
268     public int getContextOrdinal(final TenantId tenantId, 
269                                  final UniqueId id) throws Exception {
270         if (tenantId == null || id == null) return 0;
271         return getContextOrdinal(tenantId.getValue() + "|" + id.getValue());
272     }
273
274     /**
275      * Get a 32-bit context ordinal suitable for use in the OF data plane
276      * for the given policy item.
277      * @param id the unique ID for the element
278      * @return the 32-bit ordinal value
279      */
280     public int getContextOrdinal(final String id) throws Exception {
281
282         Integer ord = ordinals.get(id);
283         if (ord == null) {
284             ord = policyOrdinal.getAndIncrement();
285             Integer old = ordinals.putIfAbsent(id, ord);
286             if (old != null) ord = old;
287         }
288         return ord.intValue();
289     }
290     
291     // **************
292     // Implementation
293     // **************
294
295     private void scheduleUpdate() {
296         if (switchManager != null) {
297             LOG.trace("Scheduling flow update task");
298             flowUpdateTask.reschedule(FLOW_UPDATE_DELAY, TimeUnit.MILLISECONDS);
299         }
300     }
301     
302     /**
303      * Update the flows on a particular switch
304      */
305     private class SwitchFlowUpdateTask implements Callable<Void> {
306         private final Dirty dirty;
307         private final NodeId nodeId;
308
309         public SwitchFlowUpdateTask(Dirty dirty, NodeId nodeId) {
310             super();
311             this.dirty = dirty;
312             this.nodeId = nodeId;
313         }
314
315         @Override
316         public Void call() throws Exception {
317             if (!switchManager.isSwitchReady(nodeId)) return null;
318             PolicyInfo info = policyResolver.getCurrentPolicy();
319             if (info == null) return null;
320             for (OfTable table : flowPipeline) {
321                 try {
322                     table.update(nodeId, info, dirty);
323                 } catch (Exception e) {
324                     LOG.error("Failed to write flow table {}", 
325                               table.getClass().getSimpleName(), e);
326                 }
327             }
328             return null;
329         }
330     }
331
332     /**
333      * Update all flows on all switches as needed.  Note that this will block
334      * one of the threads on the executor.
335      * @author readams
336      */
337     private class FlowUpdateTask implements Runnable {
338         @Override
339         public void run() {
340             LOG.debug("Beginning flow update task");
341
342             Dirty d = dirty.getAndSet(new Dirty());
343             CompletionService<Void> ecs
344                 = new ExecutorCompletionService<Void>(executor);
345             int n = 0;
346             for (NodeId node : switchManager.getReadySwitches()) {
347                 SwitchFlowUpdateTask swut = new SwitchFlowUpdateTask(d, node);
348                 ecs.submit(swut);
349                 n += 1;
350             }
351             for (int i = 0; i < n; i++) {
352                 try {
353                     ecs.take().get();
354                 } catch (InterruptedException | ExecutionException e) {
355                     LOG.error("Failed to update flow tables", e);
356                 }
357             }
358             LOG.debug("Flow update completed");
359         }
360     }
361     
362     /**
363      * Dirty state since our last successful flow table sync.
364      */
365     public static class Dirty {
366         private Set<EpKey> endpoints;
367         private Set<NodeId> nodes;
368         private Set<EgKey> groups;
369         private ConcurrentMap<EgKey, Set<EpKey>> groupEps;
370         private ConcurrentMap<NodeId, Set<EpKey>> nodeEps;
371         
372         public Dirty() {
373             ConcurrentHashMap<EpKey,Boolean> epmap = new ConcurrentHashMap<>();
374             endpoints = Collections.newSetFromMap(epmap);
375             ConcurrentHashMap<NodeId,Boolean> nomap = new ConcurrentHashMap<>();
376             nodes = Collections.newSetFromMap(nomap);
377             ConcurrentHashMap<EgKey,Boolean> grmap = new ConcurrentHashMap<>();
378             groups = Collections.newSetFromMap(grmap);
379
380             groupEps = new ConcurrentHashMap<>();
381             nodeEps = new ConcurrentHashMap<>();
382         }
383         
384         public void addEndpointGroupEp(EgKey egKey, EpKey epKey) {
385             SetUtils.getNestedSet(egKey, groupEps)
386                 .add(epKey);
387         }
388         public void addNodeEp(NodeId id, EpKey epKey) {
389             SetUtils.getNestedSet(id, nodeEps).add(epKey);
390         }
391         public void addNode(NodeId id) {
392             nodes.add(id);
393         }
394         public void addEndpointGroup(EgKey key) {
395             groups.add(key);
396         }
397         public void addEndpoint(EpKey epKey) {
398             endpoints.add(epKey);
399         }
400
401         public Set<EpKey> getEndpoints() {
402             return endpoints;
403         }
404
405         public Set<NodeId> getNodes() {
406             return nodes;
407         }
408
409         public Set<EgKey> getGroups() {
410             return groups;
411         }
412
413         public ConcurrentMap<EgKey, Set<EpKey>> getGroupEps() {
414             return groupEps;
415         }
416
417         public ConcurrentMap<NodeId, Set<EpKey>> getNodeEps() {
418             return nodeEps;
419         }
420         
421     }
422 }