60b7e97927728c3920646586fbbb5ffdbb8808f5
[groupbasedpolicy.git] / renderers / ofoverlay / src / main / java / org / opendaylight / groupbasedpolicy / renderer / ofoverlay / PolicyManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.groupbasedpolicy.renderer.ofoverlay;
10
11 import java.util.ArrayList;
12 import java.util.HashSet;
13 import java.util.List;
14 import java.util.Map.Entry;
15 import java.util.Set;
16 import java.util.concurrent.Callable;
17 import java.util.concurrent.CompletionService;
18 import java.util.concurrent.ConcurrentHashMap;
19 import java.util.concurrent.ConcurrentMap;
20 import java.util.concurrent.ExecutionException;
21 import java.util.concurrent.ExecutorCompletionService;
22 import java.util.concurrent.ScheduledExecutorService;
23 import java.util.concurrent.TimeUnit;
24
25 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
26 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
27 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
28 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
29 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
30 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
31 import org.opendaylight.groupbasedpolicy.endpoint.EpKey;
32 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.equivalence.EquivalenceFabric;
33 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.DestinationMapper;
34 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.EgressNatMapper;
35 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.ExternalMapper;
36 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.FlowUtils;
37 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.GroupTable;
38 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.IngressNatMapper;
39 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.OfTable;
40 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.PolicyEnforcer;
41 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.PortSecurity;
42 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.SourceMapper;
43 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.node.SwitchListener;
44 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.node.SwitchManager;
45 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.sf.Action;
46 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.sf.SubjectFeatures;
47 import org.opendaylight.groupbasedpolicy.resolver.EgKey;
48 import org.opendaylight.groupbasedpolicy.resolver.PolicyInfo;
49 import org.opendaylight.groupbasedpolicy.resolver.PolicyListener;
50 import org.opendaylight.groupbasedpolicy.resolver.PolicyResolver;
51 import org.opendaylight.groupbasedpolicy.resolver.PolicyScope;
52 import org.opendaylight.groupbasedpolicy.util.SingletonTask;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.ActionDefinitionId;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.ofoverlay.rev140528.OfOverlayConfig.LearningMode;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.SubjectFeatureDefinitions;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
60 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
61 import org.slf4j.Logger;
62 import org.slf4j.LoggerFactory;
63
64 import com.google.common.base.Equivalence;
65 import com.google.common.base.Optional;
66 import com.google.common.base.Preconditions;
67 import com.google.common.collect.Collections2;
68 import com.google.common.collect.ImmutableList;
69 import com.google.common.collect.Sets;
70 import com.google.common.util.concurrent.CheckedFuture;
71 import com.google.common.util.concurrent.FutureCallback;
72 import com.google.common.util.concurrent.Futures;
73
74 /**
75  * Manage policies on switches by subscribing to updates from the
76  * policy resolver and information about endpoints from the endpoint
77  * registry
78  */
79 public class PolicyManager
80      implements SwitchListener, PolicyListener, EndpointListener {
81     private static final Logger LOG =
82             LoggerFactory.getLogger(PolicyManager.class);
83
84     private short tableOffset;
85     private static final short TABLEID_PORTSECURITY = 0;
86     private static final short TABLEID_INGRESS_NAT =  1;
87     private static final short TABLEID_SOURCE_MAPPER = 2;
88     private static final short TABLEID_DESTINATION_MAPPER = 3;
89     private static final short TABLEID_POLICY_ENFORCER = 4;
90     private static final short TABLEID_EGRESS_NAT = 5;
91     private static final short TABLEID_EXTERNAL_MAPPER = 6;
92
93     private final SwitchManager switchManager;
94     private final PolicyResolver policyResolver;
95
96     private final PolicyScope policyScope;
97
98     private final ScheduledExecutorService executor;
99     private final SingletonTask flowUpdateTask;
100     private final DataBroker dataBroker;
101
102     /**
103      * The flow tables that make up the processing pipeline
104      */
105     private final List<? extends OfTable> flowPipeline;
106
107     /**
108      * The delay before triggering the flow update task in response to an
109      * event in milliseconds.
110      */
111     private final static int FLOW_UPDATE_DELAY = 250;
112
113     public PolicyManager(DataBroker dataBroker,
114                          PolicyResolver policyResolver,
115                          SwitchManager switchManager,
116                          EndpointManager endpointManager,
117                          RpcProviderRegistry rpcRegistry,
118                          ScheduledExecutorService executor,
119                          short tableOffset) {
120         super();
121         this.switchManager = switchManager;
122         this.executor = executor;
123         this.policyResolver = policyResolver;
124         this.dataBroker = dataBroker;
125         this.tableOffset = tableOffset;
126
127
128         if (dataBroker != null) {
129             WriteTransaction t = dataBroker.newWriteOnlyTransaction();
130             t.put(LogicalDatastoreType.OPERATIONAL,
131                   InstanceIdentifier
132                       .builder(SubjectFeatureDefinitions.class)
133                       .build(),
134                   SubjectFeatures.OF_OVERLAY_FEATURES);
135             t.submit();
136         }
137
138         for(Entry<ActionDefinitionId, Action> entry : SubjectFeatures.getActions().entrySet()) {
139             policyResolver.registerActionDefinitions(entry.getKey(), entry.getValue());
140         }
141
142         OfContext ctx = new OfContext(dataBroker, rpcRegistry,
143                                         this, policyResolver, switchManager,
144                                         endpointManager, executor);
145
146         flowPipeline = ImmutableList.of(new PortSecurity(ctx, (short)(tableOffset+TABLEID_PORTSECURITY)),
147                                         new GroupTable(ctx),
148                                         new IngressNatMapper(ctx, (short)(tableOffset+TABLEID_INGRESS_NAT)),
149                                         new SourceMapper(ctx, (short)(tableOffset+TABLEID_SOURCE_MAPPER)),
150                                         new DestinationMapper(ctx, (short)(tableOffset+TABLEID_DESTINATION_MAPPER)),
151                                         new PolicyEnforcer(ctx, (short)(tableOffset+TABLEID_POLICY_ENFORCER)),
152                                         new EgressNatMapper(ctx, (short)(tableOffset+TABLEID_EGRESS_NAT)),
153                                         new ExternalMapper(ctx, (short)(tableOffset+TABLEID_EXTERNAL_MAPPER))
154                                         );
155
156         policyScope = policyResolver.registerListener(this);
157         if (switchManager != null)
158             switchManager.registerListener(this);
159         endpointManager.registerListener(this);
160
161         flowUpdateTask = new SingletonTask(executor, new FlowUpdateTask());
162         scheduleUpdate();
163
164         LOG.debug("Initialized OFOverlay policy manager");
165     }
166
167     // **************
168     // SwitchListener
169     // **************
170
171
172     public short getTABLEID_PORTSECURITY() {
173         return (short)(tableOffset+TABLEID_PORTSECURITY);
174     }
175
176
177     public short getTABLEID_INGRESS_NAT() {
178         return (short)(tableOffset+TABLEID_INGRESS_NAT);
179     }
180
181
182     public short getTABLEID_SOURCE_MAPPER() {
183         return (short)(tableOffset+TABLEID_SOURCE_MAPPER);
184     }
185
186
187     public short getTABLEID_DESTINATION_MAPPER() {
188         return (short)(tableOffset+TABLEID_DESTINATION_MAPPER);
189     }
190
191
192     public short getTABLEID_POLICY_ENFORCER() {
193         return (short)(tableOffset+TABLEID_POLICY_ENFORCER);
194     }
195
196
197     public short getTABLEID_EGRESS_NAT() {
198         return (short)(tableOffset+TABLEID_EGRESS_NAT);
199     }
200
201
202     public short getTABLEID_EXTERNAL_MAPPER() {
203         return (short)(tableOffset+TABLEID_EXTERNAL_MAPPER);
204     }
205
206     @Override
207     public void switchReady(final NodeId nodeId) {
208         scheduleUpdate();
209     }
210
211     @Override
212     public void switchRemoved(NodeId sw) {
213         // XXX TODO purge switch flows
214         scheduleUpdate();
215     }
216
217     @Override
218     public void switchUpdated(NodeId sw) {
219         scheduleUpdate();
220     }
221
222     // ****************
223     // EndpointListener
224     // ****************
225
226     @Override
227     public void endpointUpdated(EpKey epKey) {
228         scheduleUpdate();
229     }
230
231     @Override
232     public void nodeEndpointUpdated(NodeId nodeId, EpKey epKey){
233         scheduleUpdate();
234     }
235
236     @Override
237     public void groupEndpointUpdated(EgKey egKey, EpKey epKey) {
238         policyScope.addToScope(egKey.getTenantId(), egKey.getEgId());
239         scheduleUpdate();
240     }
241
242     // **************
243     // PolicyListener
244     // **************
245
246     @Override
247     public void policyUpdated(Set<EgKey> updatedConsumers) {
248         scheduleUpdate();
249     }
250
251     // *************
252     // PolicyManager
253     // *************
254
255     /**
256      * Set the learning mode to the specified value
257      * @param learningMode the learning mode to set
258      */
259     public void setLearningMode(LearningMode learningMode) {
260         // No-op for now
261     }
262
263     // **************
264     // Implementation
265     // **************
266
267     public class FlowMap{
268         private ConcurrentMap<InstanceIdentifier<Table>, TableBuilder> flowMap = new ConcurrentHashMap<>();
269
270         public FlowMap() {
271         }
272
273         public TableBuilder getTableForNode(NodeId nodeId, short tableId) {
274             InstanceIdentifier<Table> tableIid = FlowUtils.createTablePath(nodeId, tableId);
275             if(this.flowMap.get(tableIid) == null) {
276                 this.flowMap.put(tableIid, new TableBuilder().setId(tableId));
277                 this.flowMap.get(tableIid).setFlow(new ArrayList<Flow>());
278             }
279             return this.flowMap.get(tableIid);
280         }
281
282         public void writeFlow(NodeId nodeId, short tableId, Flow flow) {
283             TableBuilder tableBuilder = this.getTableForNode(nodeId, tableId);
284             // transforming List<Flow> to Set (with customized equals/hashCode) to eliminate duplicate entries
285             List<Flow> flows = tableBuilder.getFlow();
286             Set<Equivalence.Wrapper<Flow>> wrappedFlows =
287                     new HashSet<>(Collections2.transform(flows, EquivalenceFabric.FLOW_WRAPPER_FUNCTION));
288
289             Equivalence.Wrapper<Flow> wFlow = EquivalenceFabric.FLOW_EQUIVALENCE.wrap(flow);
290
291             if (!wrappedFlows.contains(wFlow)) {
292                 tableBuilder.getFlow().add(Preconditions.checkNotNull(flow));
293             } else {
294                 LOG.debug("Flow already exists in FlowMap - {}", flow);
295             }
296         }
297
298         public void commitToDataStore() {
299             if (dataBroker != null) {
300                 for( Entry<InstanceIdentifier<Table>, TableBuilder> entry : flowMap.entrySet()) {
301                     try {
302                         /*
303                          * Get the currently configured flows for
304                          * this table.
305                          */
306                         updateFlowTable(entry);
307                     } catch (Exception e) {
308                         LOG.warn("Couldn't read flow table {}", entry.getKey());
309                     }
310                 }
311             }
312         }
313
314         private void updateFlowTable(Entry<InstanceIdentifier<Table>,
315                                      TableBuilder> entry)  throws Exception {
316             // flows to update
317             Set<Flow> update = new HashSet<>(entry.getValue().getFlow());
318             // flows currently in the table
319             Set<Flow> curr = new HashSet<>();
320
321             ReadWriteTransaction t = dataBroker.newReadWriteTransaction();
322             Optional<Table> r =
323                    t.read(LogicalDatastoreType.CONFIGURATION, entry.getKey()).get();
324
325             if (r.isPresent()) {
326                 Table currentTable = r.get();
327                 curr = new HashSet<>(currentTable.getFlow());
328             }
329
330             // Sets with custom equivalence rules
331             Set<Equivalence.Wrapper<Flow>> oldFlows =
332                     new HashSet<>(Collections2.transform(curr, EquivalenceFabric.FLOW_WRAPPER_FUNCTION));
333             Set<Equivalence.Wrapper<Flow>> updatedFlows =
334                     new HashSet<>(Collections2.transform(update, EquivalenceFabric.FLOW_WRAPPER_FUNCTION));
335
336             // what is still there but was not updated, needs to be deleted
337             Sets.SetView<Equivalence.Wrapper<Flow>> deletions =
338                     Sets.difference(oldFlows, updatedFlows);
339             // new flows (they were not there before)
340             Sets.SetView<Equivalence.Wrapper<Flow>> additions =
341                     Sets.difference(updatedFlows, oldFlows);
342
343             if (!deletions.isEmpty()) {
344                 for (Equivalence.Wrapper<Flow> wf: deletions) {
345                     Flow f = wf.get();
346                     if (f != null) {
347                         t.delete(LogicalDatastoreType.CONFIGURATION,
348                                 FlowUtils.createFlowPath(entry.getKey(), f.getId()));
349                     }
350                 }
351             }
352             if (!additions.isEmpty()) {
353                 for (Equivalence.Wrapper<Flow> wf: additions) {
354                     Flow f = wf.get();
355                     if (f != null) {
356                         t.put(LogicalDatastoreType.CONFIGURATION,
357                                 FlowUtils.createFlowPath(entry.getKey(), f.getId()), f, true);
358                     }
359                 }
360             }
361             CheckedFuture<Void, TransactionCommitFailedException> f = t.submit();
362             Futures.addCallback(f, new FutureCallback<Void>() {
363                 @Override
364                 public void onFailure(Throwable t) {
365                     LOG.error("Could not write flow table {}", t);
366                 }
367
368                 @Override
369                 public void onSuccess(Void result) {
370                     LOG.debug("Flow table updated.");
371                 }
372             });
373         }
374
375     }
376
377     private void scheduleUpdate() {
378         if (switchManager != null) {
379             LOG.trace("Scheduling flow update task");
380             flowUpdateTask.reschedule(FLOW_UPDATE_DELAY, TimeUnit.MILLISECONDS);
381         }
382     }
383
384     /**
385      * Update the flows on a particular switch
386      */
387     private class SwitchFlowUpdateTask implements Callable<Void> {
388         private FlowMap flowMap;
389
390         public SwitchFlowUpdateTask(FlowMap flowMap) {
391             super();
392             this.flowMap = flowMap;
393         }
394
395         @Override
396         public Void call() throws Exception {
397             for (NodeId node : switchManager.getReadySwitches()) {
398                 PolicyInfo info = policyResolver.getCurrentPolicy();
399                 if (info == null)
400                     return null;
401                 for (OfTable table : flowPipeline) {
402                     try {
403                         table.update(node, info, flowMap);
404                     } catch (Exception e) {
405                         LOG.error("Failed to write flow table {}",
406                                 table.getClass().getSimpleName(), e);
407                     }
408                 }
409             }
410             return null;
411         }
412     }
413
414     /**
415      * Update all flows on all switches as needed.  Note that this will block
416      * one of the threads on the executor.
417      */
418     private class FlowUpdateTask implements Runnable {
419         @Override
420         public void run() {
421             LOG.debug("Beginning flow update task");
422
423             CompletionService<Void> ecs
424                 = new ExecutorCompletionService<>(executor);
425             int n = 0;
426
427             FlowMap flowMap = new FlowMap();
428
429             SwitchFlowUpdateTask swut = new SwitchFlowUpdateTask(flowMap);
430             ecs.submit(swut);
431             n+=1;
432
433             for (int i = 0; i < n; i++) {
434                 try {
435                     ecs.take().get();
436                     flowMap.commitToDataStore();
437                 } catch (InterruptedException | ExecutionException e) {
438                     LOG.error("Failed to update flow tables", e);
439                 }
440             }
441             LOG.debug("Flow update completed");
442         }
443     }
444
445
446
447
448
449 }