Added range type to subject-feature-definition/parameter
[groupbasedpolicy.git] / groupbasedpolicy / src / main / java / org / opendaylight / groupbasedpolicy / renderer / ofoverlay / PolicyManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.groupbasedpolicy.renderer.ofoverlay;
10
11 import java.util.Collections;
12 import java.util.List;
13 import java.util.Set;
14 import java.util.concurrent.Callable;
15 import java.util.concurrent.CompletionService;
16 import java.util.concurrent.ConcurrentHashMap;
17 import java.util.concurrent.ConcurrentMap;
18 import java.util.concurrent.ExecutionException;
19 import java.util.concurrent.ExecutorCompletionService;
20 import java.util.concurrent.ScheduledExecutorService;
21 import java.util.concurrent.TimeUnit;
22 import java.util.concurrent.atomic.AtomicInteger;
23 import java.util.concurrent.atomic.AtomicReference;
24
25 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
26 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
27 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
28 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
29 import org.opendaylight.groupbasedpolicy.endpoint.EpKey;
30 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.DestinationMapper;
31 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.GroupTable;
32 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.OfTable;
33 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.OfTable.OfTableCtx;
34 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.PolicyEnforcer;
35 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.PortSecurity;
36 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.SourceMapper;
37 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.sf.SubjectFeatures;
38 import org.opendaylight.groupbasedpolicy.resolver.ConditionGroup;
39 import org.opendaylight.groupbasedpolicy.resolver.EgKey;
40 import org.opendaylight.groupbasedpolicy.resolver.PolicyInfo;
41 import org.opendaylight.groupbasedpolicy.resolver.PolicyListener;
42 import org.opendaylight.groupbasedpolicy.resolver.PolicyResolver;
43 import org.opendaylight.groupbasedpolicy.resolver.PolicyScope;
44 import org.opendaylight.groupbasedpolicy.util.SetUtils;
45 import org.opendaylight.groupbasedpolicy.util.SingletonTask;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.TenantId;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.UniqueId;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.ofoverlay.rev140528.OfOverlayConfig.LearningMode;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.SubjectFeatureDefinitions;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
51 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
54
55 import com.google.common.collect.ImmutableList;
56
57 /**
58  * Manage policies on switches by subscribing to updates from the 
59  * policy resolver and information about endpoints from the endpoint 
60  * registry
61  * @author readams
62  */
63 public class PolicyManager 
64      implements SwitchListener, PolicyListener, EndpointListener {
65     private static final Logger LOG = 
66             LoggerFactory.getLogger(PolicyManager.class);
67
68     private final SwitchManager switchManager;
69     private final PolicyResolver policyResolver;
70     
71     private final PolicyScope policyScope;
72     
73     private final AtomicReference<Dirty> dirty;
74     
75     private final ScheduledExecutorService executor;
76     private final SingletonTask flowUpdateTask;
77
78     /**
79      * The flow tables that make up the processing pipeline
80      */
81     private final List<? extends OfTable> flowPipeline;
82
83     /**
84      * The delay before triggering the flow update task in response to an
85      * event in milliseconds.
86      */
87     private final static int FLOW_UPDATE_DELAY = 250;
88
89     /**
90      * Counter used to allocate ordinal values for forwarding contexts
91      * and VNIDs
92      */
93     private final AtomicInteger policyOrdinal = new AtomicInteger(1);
94     
95     /**
96      * Keep track of currently-allocated ordinals
97      */
98     // XXX For the endpoint groups, we need a globally unique ordinal, so
99     // should ultimately involve some sort of distributed agreement
100     // or a leader to allocate them.  For now we'll just use a counter and
101     // this local map.  Also theoretically need to garbage collect periodically
102     private final ConcurrentMap<String, Integer> ordinals = 
103             new ConcurrentHashMap<>();
104     // XXX - need to garbage collect
105     private final ConcurrentMap<ConditionGroup, Integer> cgOrdinals = 
106             new ConcurrentHashMap<>();
107             
108     public PolicyManager(DataBroker dataBroker,
109                          PolicyResolver policyResolver,
110                          SwitchManager switchManager,
111                          EndpointManager endpointManager, 
112                          RpcProviderRegistry rpcRegistry,
113                          ScheduledExecutorService executor) {
114         super();
115         this.switchManager = switchManager;
116         this.executor = executor;
117         this.policyResolver = policyResolver;
118
119         if (dataBroker != null) {
120             WriteTransaction t = dataBroker.newWriteOnlyTransaction();
121             t.put(LogicalDatastoreType.OPERATIONAL, 
122                   InstanceIdentifier
123                       .builder(SubjectFeatureDefinitions.class)
124                       .build(),
125                   SubjectFeatures.OF_OVERLAY_FEATURES);
126             t.submit();
127         }
128
129         OfTableCtx ctx = new OfTableCtx(dataBroker, rpcRegistry, 
130                                         this, policyResolver, switchManager, 
131                                         endpointManager, executor);
132         flowPipeline = ImmutableList.of(new PortSecurity(ctx),
133                                         new GroupTable(ctx),
134                                         new SourceMapper(ctx),
135                                         new DestinationMapper(ctx),
136                                         new PolicyEnforcer(ctx));
137
138         policyScope = policyResolver.registerListener(this);
139         if (switchManager != null)
140             switchManager.registerListener(this);
141         endpointManager.registerListener(this);
142         
143         dirty = new AtomicReference<>(new Dirty());
144         
145         flowUpdateTask = new SingletonTask(executor, new FlowUpdateTask());
146         scheduleUpdate();
147         
148         LOG.debug("Initialized OFOverlay policy manager");
149     }
150
151     // **************
152     // SwitchListener
153     // **************
154
155     @Override
156     public void switchReady(final NodeId nodeId) {
157 //        WriteTransaction t = dataBroker.newWriteOnlyTransaction();
158 //        
159 //        NodeBuilder nb = new NodeBuilder()
160 //            .setId(nodeId)
161 //            .addAugmentation(FlowCapableNode.class, 
162 //                             new FlowCapableNodeBuilder()
163 //                                .build());
164 //        t.merge(LogicalDatastoreType.CONFIGURATION, 
165 //                FlowUtils.createNodePath(nodeId),
166 //                nb.build(), true);
167 //        ListenableFuture<Void> result = t.submit();
168 //        Futures.addCallback(result, 
169 //                            new FutureCallback<Void>() {
170 //            @Override
171 //            public void onSuccess(Void result) {
172 //                dirty.get().addNode(nodeId);
173 //                scheduleUpdate();
174 //            }
175 //
176 //            @Override
177 //            public void onFailure(Throwable t) {
178 //                LOG.error("Could not add switch {}", nodeId, t);
179 //            }
180 //        });
181         
182     }
183
184     @Override
185     public void switchRemoved(NodeId sw) {
186         // XXX TODO purge switch flows
187         dirty.get().addNode(sw);
188         scheduleUpdate();
189     }
190     
191     @Override
192     public void switchUpdated(NodeId sw) {
193         dirty.get().addNode(sw);
194         scheduleUpdate();
195     }
196
197     // ****************
198     // EndpointListener
199     // ****************
200     
201     @Override
202     public void endpointUpdated(EpKey epKey) {
203         dirty.get().addEndpoint(epKey);
204         scheduleUpdate();
205     }
206
207     @Override
208     public void nodeEndpointUpdated(NodeId nodeId, EpKey epKey){
209         dirty.get().addNodeEp(nodeId, epKey);
210         scheduleUpdate();
211     }
212
213     @Override
214     public void groupEndpointUpdated(EgKey egKey, EpKey epKey) {
215         dirty.get().addEndpointGroupEp(egKey, epKey);
216         policyScope.addToScope(egKey.getTenantId(), egKey.getEgId());
217         scheduleUpdate();
218     }
219
220     // **************
221     // PolicyListener
222     // **************
223     
224     @Override
225     public void policyUpdated(Set<EgKey> updatedConsumers) {
226         for (EgKey key : updatedConsumers) {
227             dirty.get().addEndpointGroup(key);
228         }
229         scheduleUpdate();
230     }
231
232     // *************
233     // PolicyManager
234     // *************
235
236     /**
237      * Set the learning mode to the specified value
238      * @param learningMode the learning mode to set
239      */
240     public void setLearningMode(LearningMode learningMode) {
241         // No-op for now
242     }
243
244     /**
245      * Get a unique ordinal for the given condition group, suitable for
246      * use in the data plane.  This is unique only for this node, and not 
247      * globally.
248      * @param cg the {@link ConditionGroup}
249      * @return the unique ID
250      */
251     public int getCondGroupOrdinal(final ConditionGroup cg) {
252         if (cg == null) return 0;
253         Integer ord = cgOrdinals.get(cg);
254         if (ord == null) {
255             ord = policyOrdinal.getAndIncrement();
256             Integer old = cgOrdinals.putIfAbsent(cg, ord);
257             if (old != null) ord = old; 
258         }
259         return ord.intValue();
260     }
261     
262     /**
263      * Get a 32-bit context ordinal suitable for use in the OF data plane
264      * for the given policy item. 
265      * @param tenantId the tenant ID of the element
266      * @param id the unique ID for the element
267      * @return the 32-bit ordinal value
268      */
269     public int getContextOrdinal(final TenantId tenantId, 
270                                  final UniqueId id) throws Exception {
271         if (tenantId == null || id == null) return 0;
272         return getContextOrdinal(tenantId.getValue() + "|" + id.getValue());
273     }
274
275     /**
276      * Get a 32-bit context ordinal suitable for use in the OF data plane
277      * for the given policy item.
278      * @param id the unique ID for the element
279      * @return the 32-bit ordinal value
280      */
281     public int getContextOrdinal(final String id) throws Exception {
282
283         Integer ord = ordinals.get(id);
284         if (ord == null) {
285             ord = policyOrdinal.getAndIncrement();
286             Integer old = ordinals.putIfAbsent(id, ord);
287             if (old != null) ord = old;
288         }
289         return ord.intValue();
290     }
291     
292     // **************
293     // Implementation
294     // **************
295
296     private void scheduleUpdate() {
297         if (switchManager != null) {
298             LOG.trace("Scheduling flow update task");
299             flowUpdateTask.reschedule(FLOW_UPDATE_DELAY, TimeUnit.MILLISECONDS);
300         }
301     }
302     
303     /**
304      * Update the flows on a particular switch
305      */
306     private class SwitchFlowUpdateTask implements Callable<Void> {
307         private final Dirty dirty;
308         private final NodeId nodeId;
309
310         public SwitchFlowUpdateTask(Dirty dirty, NodeId nodeId) {
311             super();
312             this.dirty = dirty;
313             this.nodeId = nodeId;
314         }
315
316         @Override
317         public Void call() throws Exception {
318             if (!switchManager.isSwitchReady(nodeId)) return null;
319             PolicyInfo info = policyResolver.getCurrentPolicy();
320             if (info == null) return null;
321             for (OfTable table : flowPipeline) {
322                 try {
323                     table.update(nodeId, info, dirty);
324                 } catch (Exception e) {
325                     LOG.error("Failed to write flow table {}", 
326                               table.getClass().getSimpleName(), e);
327                 }
328             }
329             return null;
330         }
331     }
332
333     /**
334      * Update all flows on all switches as needed.  Note that this will block
335      * one of the threads on the executor.
336      * @author readams
337      */
338     private class FlowUpdateTask implements Runnable {
339         @Override
340         public void run() {
341             LOG.debug("Beginning flow update task");
342
343             Dirty d = dirty.getAndSet(new Dirty());
344             CompletionService<Void> ecs
345                 = new ExecutorCompletionService<Void>(executor);
346             int n = 0;
347             for (NodeId node : switchManager.getReadySwitches()) {
348                 SwitchFlowUpdateTask swut = new SwitchFlowUpdateTask(d, node);
349                 ecs.submit(swut);
350                 n += 1;
351             }
352             for (int i = 0; i < n; i++) {
353                 try {
354                     ecs.take().get();
355                 } catch (InterruptedException | ExecutionException e) {
356                     LOG.error("Failed to update flow tables", e);
357                 }
358             }
359             LOG.debug("Flow update completed");
360         }
361     }
362     
363     /**
364      * Dirty state since our last successful flow table sync.
365      */
366     public static class Dirty {
367         private Set<EpKey> endpoints;
368         private Set<NodeId> nodes;
369         private Set<EgKey> groups;
370         private ConcurrentMap<EgKey, Set<EpKey>> groupEps;
371         private ConcurrentMap<NodeId, Set<EpKey>> nodeEps;
372         
373         public Dirty() {
374             ConcurrentHashMap<EpKey,Boolean> epmap = new ConcurrentHashMap<>();
375             endpoints = Collections.newSetFromMap(epmap);
376             ConcurrentHashMap<NodeId,Boolean> nomap = new ConcurrentHashMap<>();
377             nodes = Collections.newSetFromMap(nomap);
378             ConcurrentHashMap<EgKey,Boolean> grmap = new ConcurrentHashMap<>();
379             groups = Collections.newSetFromMap(grmap);
380
381             groupEps = new ConcurrentHashMap<>();
382             nodeEps = new ConcurrentHashMap<>();
383         }
384         
385         public void addEndpointGroupEp(EgKey egKey, EpKey epKey) {
386             SetUtils.getNestedSet(egKey, groupEps)
387                 .add(epKey);
388         }
389         public void addNodeEp(NodeId id, EpKey epKey) {
390             SetUtils.getNestedSet(id, nodeEps).add(epKey);
391         }
392         public void addNode(NodeId id) {
393             nodes.add(id);
394         }
395         public void addEndpointGroup(EgKey key) {
396             groups.add(key);
397         }
398         public void addEndpoint(EpKey epKey) {
399             endpoints.add(epKey);
400         }
401
402         public Set<EpKey> getEndpoints() {
403             return endpoints;
404         }
405
406         public Set<NodeId> getNodes() {
407             return nodes;
408         }
409
410         public Set<EgKey> getGroups() {
411             return groups;
412         }
413
414         public ConcurrentMap<EgKey, Set<EpKey>> getGroupEps() {
415             return groupEps;
416         }
417
418         public ConcurrentMap<NodeId, Set<EpKey>> getNodeEps() {
419             return nodeEps;
420         }
421         
422     }
423 }