Bug 3353 - added table offset to of-overlay-config
[groupbasedpolicy.git] / renderers / ofoverlay / src / main / java / org / opendaylight / groupbasedpolicy / renderer / ofoverlay / PolicyManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.groupbasedpolicy.renderer.ofoverlay;
10
11 import java.util.ArrayList;
12 import java.util.HashSet;
13 import java.util.List;
14 import java.util.Map.Entry;
15 import java.util.Set;
16 import java.util.concurrent.Callable;
17 import java.util.concurrent.CompletionService;
18 import java.util.concurrent.ConcurrentHashMap;
19 import java.util.concurrent.ConcurrentMap;
20 import java.util.concurrent.ExecutionException;
21 import java.util.concurrent.ExecutorCompletionService;
22 import java.util.concurrent.ScheduledExecutorService;
23 import java.util.concurrent.TimeUnit;
24
25 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
26 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
27 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
28 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
29 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
30 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
31 import org.opendaylight.groupbasedpolicy.endpoint.EpKey;
32 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.equivalence.EquivalenceFabric;
33 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.DestinationMapper;
34 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.EgressNatMapper;
35 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.ExternalMapper;
36 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.FlowUtils;
37 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.GroupTable;
38 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.IngressNatMapper;
39 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.OfTable;
40 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.PolicyEnforcer;
41 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.PortSecurity;
42 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.SourceMapper;
43 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.node.SwitchListener;
44 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.node.SwitchManager;
45 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.sf.Action;
46 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.sf.SubjectFeatures;
47 import org.opendaylight.groupbasedpolicy.resolver.EgKey;
48 import org.opendaylight.groupbasedpolicy.resolver.PolicyInfo;
49 import org.opendaylight.groupbasedpolicy.resolver.PolicyListener;
50 import org.opendaylight.groupbasedpolicy.resolver.PolicyResolver;
51 import org.opendaylight.groupbasedpolicy.resolver.PolicyScope;
52 import org.opendaylight.groupbasedpolicy.util.SingletonTask;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.ActionDefinitionId;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.ofoverlay.rev140528.OfOverlayConfig.LearningMode;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.SubjectFeatureDefinitions;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId;
61 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
62 import org.slf4j.Logger;
63 import org.slf4j.LoggerFactory;
64
65 import com.google.common.base.Equivalence;
66 import com.google.common.base.Function;
67 import com.google.common.base.Optional;
68 import com.google.common.base.Preconditions;
69 import com.google.common.collect.Collections2;
70 import com.google.common.collect.ImmutableList;
71 import com.google.common.collect.Sets;
72 import com.google.common.util.concurrent.AsyncFunction;
73 import com.google.common.util.concurrent.CheckedFuture;
74 import com.google.common.util.concurrent.FutureCallback;
75 import com.google.common.util.concurrent.Futures;
76 import com.google.common.util.concurrent.ListenableFuture;
77
78 /**
79  * Manage policies on switches by subscribing to updates from the
80  * policy resolver and information about endpoints from the endpoint
81  * registry
82  */
83 public class PolicyManager
84      implements SwitchListener, PolicyListener, EndpointListener {
85     private static final Logger LOG =
86             LoggerFactory.getLogger(PolicyManager.class);
87
88     private short tableOffset;
89     private static final short TABLEID_PORTSECURITY = 0;
90     private static final short TABLEID_INGRESS_NAT =  1;
91     private static final short TABLEID_SOURCE_MAPPER = 2;
92     private static final short TABLEID_DESTINATION_MAPPER = 3;
93     private static final short TABLEID_POLICY_ENFORCER = 4;
94     private static final short TABLEID_EGRESS_NAT = 5;
95     private static final short TABLEID_EXTERNAL_MAPPER = 6;
96
97     private final SwitchManager switchManager;
98     private final PolicyResolver policyResolver;
99
100     private final PolicyScope policyScope;
101
102     private final ScheduledExecutorService executor;
103     private final SingletonTask flowUpdateTask;
104     private final DataBroker dataBroker;
105     private final OfContext ofCtx;
106     /**
107      * The flow tables that make up the processing pipeline
108      */
109     private List<? extends OfTable> flowPipeline;
110
111     /**
112      * The delay before triggering the flow update task in response to an
113      * event in milliseconds.
114      */
115     private final static int FLOW_UPDATE_DELAY = 250;
116
117     public PolicyManager(DataBroker dataBroker,
118                          PolicyResolver policyResolver,
119                          SwitchManager switchManager,
120                          EndpointManager endpointManager,
121                          RpcProviderRegistry rpcRegistry,
122                          ScheduledExecutorService executor,
123                          short tableOffset) {
124         super();
125         this.switchManager = switchManager;
126         this.executor = executor;
127         this.policyResolver = policyResolver;
128         this.dataBroker = dataBroker;
129         this.tableOffset = tableOffset;
130         try {
131             // to validate against model
132             verifyMaxTableId(tableOffset);
133         } catch (IllegalArgumentException e) {
134             throw new IllegalArgumentException("Failed to start OF-Overlay renderer\n."
135                     + "Max. table ID would be out of range. Check config-subsystem.\n{}", e);
136         }
137
138         if (dataBroker != null) {
139             WriteTransaction t = dataBroker.newWriteOnlyTransaction();
140             t.put(LogicalDatastoreType.OPERATIONAL,
141                   InstanceIdentifier
142                       .builder(SubjectFeatureDefinitions.class)
143                       .build(),
144                   SubjectFeatures.OF_OVERLAY_FEATURES);
145             t.submit();
146         }
147
148         for(Entry<ActionDefinitionId, Action> entry : SubjectFeatures.getActions().entrySet()) {
149             policyResolver.registerActionDefinitions(entry.getKey(), entry.getValue());
150         }
151
152         ofCtx = new OfContext(dataBroker, rpcRegistry,
153                                         this, policyResolver, switchManager,
154                                         endpointManager, executor);
155
156         flowPipeline = createFlowPipeline();
157
158         policyScope = policyResolver.registerListener(this);
159         if (switchManager != null)
160             switchManager.registerListener(this);
161         endpointManager.registerListener(this);
162
163         flowUpdateTask = new SingletonTask(executor, new FlowUpdateTask());
164         scheduleUpdate();
165
166         LOG.debug("Initialized OFOverlay policy manager");
167     }
168
169     private List<? extends OfTable> createFlowPipeline() {
170         // TODO - PORTSECURITY is kept in table 0.
171         // According to openflow spec,processing on vSwitch always starts from table 0.
172         // Packets will be droped if table 0 is empty.
173         // Alternative workaround - table-miss flow entries in table 0.
174         return ImmutableList.of(new PortSecurity(ofCtx, (short) 0),
175                                         new GroupTable(ofCtx),
176                                         new IngressNatMapper(ofCtx, getTABLEID_INGRESS_NAT()),
177                                         new SourceMapper(ofCtx, getTABLEID_SOURCE_MAPPER()),
178                                         new DestinationMapper(ofCtx, getTABLEID_DESTINATION_MAPPER()),
179                                         new PolicyEnforcer(ofCtx, getTABLEID_POLICY_ENFORCER()),
180                                         new EgressNatMapper(ofCtx, getTABLEID_EGRESS_NAT()),
181                                         new ExternalMapper(ofCtx, getTABLEID_EXTERNAL_MAPPER())
182                                         );
183     }
184
185     /**
186      * @param tableOffset - new offset value
187      * @return ListenableFuture<List> - to indicate that tables have been synced
188      */
189     public ListenableFuture<Void> changeOpenFlowTableOffset(final short tableOffset) {
190         try {
191             verifyMaxTableId(tableOffset);
192         } catch (IllegalArgumentException e) {
193             LOG.error("Cannot update table offset. Max. table ID would be out of range.\n{}", e);
194             // TODO - invalid offset value remains in conf DS
195             // It's not possible to validate offset value by using constrains in model,
196             // because number of tables in pipeline varies.
197             return Futures.immediateFuture(null);
198         }
199         List<Short> tableIDs = getTableIDs();
200         this.tableOffset = tableOffset;
201         return Futures.transform(removeUnusedTables(tableIDs), new Function<Void, Void>() {
202
203             @Override
204             public Void apply(Void tablesRemoved) {
205                 flowPipeline = createFlowPipeline();
206                 scheduleUpdate();
207                 return null;
208             }
209         });
210     }
211
212     /**
213      * @param  tableIDs - IDs of tables to delete
214      * @return ListenableFuture<Void> - which will be filled when clearing is done
215      */
216     private ListenableFuture<Void> removeUnusedTables(final List<Short> tableIDs) {
217         List<ListenableFuture<Void>> checkList = new ArrayList<>();
218         final ReadWriteTransaction rwTx = dataBroker.newReadWriteTransaction();
219         for (Short tableId : tableIDs) {
220             for (NodeId nodeId : switchManager.getReadySwitches()) {
221                 final InstanceIdentifier<Table> tablePath = FlowUtils.createTablePath(nodeId, tableId);
222                 checkList.add(deteleTableIfExists(rwTx, tablePath));
223             }
224         }
225         ListenableFuture<List<Void>> allAsListFuture = Futures.allAsList(checkList);
226         return Futures.transform(allAsListFuture, new AsyncFunction<List<Void>, Void>() {
227
228             @Override
229             public ListenableFuture<Void> apply(List<Void> readyToSubmit) {
230                 return rwTx.submit();
231             }
232         });
233     }
234
235     private List<Short> getTableIDs() {
236         List<Short> tableIds = new ArrayList<>();
237         tableIds.add(getTABLEID_PORTSECURITY());
238         tableIds.add(getTABLEID_INGRESS_NAT());
239         tableIds.add(getTABLEID_SOURCE_MAPPER());
240         tableIds.add(getTABLEID_DESTINATION_MAPPER());
241         tableIds.add(getTABLEID_POLICY_ENFORCER());
242         tableIds.add(getTABLEID_EGRESS_NAT());
243         tableIds.add(getTABLEID_EXTERNAL_MAPPER());
244         return tableIds;
245     }
246
247     private ListenableFuture<Void> deteleTableIfExists(final ReadWriteTransaction rwTx, final InstanceIdentifier<Table> tablePath){
248     return Futures.transform(rwTx.read(LogicalDatastoreType.CONFIGURATION, tablePath), new Function<Optional<Table>, Void>() {
249
250         @Override
251         public Void apply(Optional<Table> optTable) {
252             if(optTable.isPresent()){
253                 rwTx.delete(LogicalDatastoreType.CONFIGURATION, tablePath);
254             }
255             return null;
256         }});
257     }
258
259     // **************
260     // SwitchListener
261     // **************
262
263     public short getTABLEID_PORTSECURITY() {
264         return (short)(tableOffset+TABLEID_PORTSECURITY);
265     }
266
267
268     public short getTABLEID_INGRESS_NAT() {
269         return (short)(tableOffset+TABLEID_INGRESS_NAT);
270     }
271
272
273     public short getTABLEID_SOURCE_MAPPER() {
274         return (short)(tableOffset+TABLEID_SOURCE_MAPPER);
275     }
276
277
278     public short getTABLEID_DESTINATION_MAPPER() {
279         return (short)(tableOffset+TABLEID_DESTINATION_MAPPER);
280     }
281
282
283     public short getTABLEID_POLICY_ENFORCER() {
284         return (short)(tableOffset+TABLEID_POLICY_ENFORCER);
285     }
286
287
288     public short getTABLEID_EGRESS_NAT() {
289         return (short)(tableOffset+TABLEID_EGRESS_NAT);
290     }
291
292
293     public short getTABLEID_EXTERNAL_MAPPER() {
294         return (short)(tableOffset+TABLEID_EXTERNAL_MAPPER);
295     }
296
297
298     public TableId verifyMaxTableId(short tableOffset) {
299         return new TableId((short)(tableOffset+TABLEID_EXTERNAL_MAPPER));
300     }
301
302     @Override
303     public void switchReady(final NodeId nodeId) {
304         scheduleUpdate();
305     }
306
307     @Override
308     public void switchRemoved(NodeId sw) {
309         // XXX TODO purge switch flows
310         scheduleUpdate();
311     }
312
313     @Override
314     public void switchUpdated(NodeId sw) {
315         scheduleUpdate();
316     }
317
318     // ****************
319     // EndpointListener
320     // ****************
321
322     @Override
323     public void endpointUpdated(EpKey epKey) {
324         scheduleUpdate();
325     }
326
327     @Override
328     public void nodeEndpointUpdated(NodeId nodeId, EpKey epKey){
329         scheduleUpdate();
330     }
331
332     @Override
333     public void groupEndpointUpdated(EgKey egKey, EpKey epKey) {
334         policyScope.addToScope(egKey.getTenantId(), egKey.getEgId());
335         scheduleUpdate();
336     }
337
338     // **************
339     // PolicyListener
340     // **************
341
342     @Override
343     public void policyUpdated(Set<EgKey> updatedConsumers) {
344         scheduleUpdate();
345     }
346
347     // *************
348     // PolicyManager
349     // *************
350
351     /**
352      * Set the learning mode to the specified value
353      * @param learningMode the learning mode to set
354      */
355     public void setLearningMode(LearningMode learningMode) {
356         // No-op for now
357     }
358
359     // **************
360     // Implementation
361     // **************
362
363     public class FlowMap{
364         private ConcurrentMap<InstanceIdentifier<Table>, TableBuilder> flowMap = new ConcurrentHashMap<>();
365
366         public FlowMap() {
367         }
368
369         public TableBuilder getTableForNode(NodeId nodeId, short tableId) {
370             InstanceIdentifier<Table> tableIid = FlowUtils.createTablePath(nodeId, tableId);
371             if(this.flowMap.get(tableIid) == null) {
372                 this.flowMap.put(tableIid, new TableBuilder().setId(tableId));
373                 this.flowMap.get(tableIid).setFlow(new ArrayList<Flow>());
374             }
375             return this.flowMap.get(tableIid);
376         }
377
378         public void writeFlow(NodeId nodeId, short tableId, Flow flow) {
379             TableBuilder tableBuilder = this.getTableForNode(nodeId, tableId);
380             // transforming List<Flow> to Set (with customized equals/hashCode) to eliminate duplicate entries
381             List<Flow> flows = tableBuilder.getFlow();
382             Set<Equivalence.Wrapper<Flow>> wrappedFlows =
383                     new HashSet<>(Collections2.transform(flows, EquivalenceFabric.FLOW_WRAPPER_FUNCTION));
384
385             Equivalence.Wrapper<Flow> wFlow = EquivalenceFabric.FLOW_EQUIVALENCE.wrap(flow);
386
387             if (!wrappedFlows.contains(wFlow)) {
388                 tableBuilder.getFlow().add(Preconditions.checkNotNull(flow));
389             } else {
390                 LOG.debug("Flow already exists in FlowMap - {}", flow);
391             }
392         }
393
394         public void commitToDataStore() {
395             if (dataBroker != null) {
396                 for( Entry<InstanceIdentifier<Table>, TableBuilder> entry : flowMap.entrySet()) {
397                     try {
398                         /*
399                          * Get the currently configured flows for
400                          * this table.
401                          */
402                         updateFlowTable(entry);
403                     } catch (Exception e) {
404                         LOG.warn("Couldn't read flow table {}", entry.getKey());
405                     }
406                 }
407             }
408         }
409
410         private void updateFlowTable(Entry<InstanceIdentifier<Table>,
411                                      TableBuilder> entry)  throws Exception {
412             // flows to update
413             Set<Flow> update = new HashSet<>(entry.getValue().getFlow());
414             // flows currently in the table
415             Set<Flow> curr = new HashSet<>();
416
417             ReadWriteTransaction t = dataBroker.newReadWriteTransaction();
418             Optional<Table> r =
419                    t.read(LogicalDatastoreType.CONFIGURATION, entry.getKey()).get();
420
421             if (r.isPresent()) {
422                 Table currentTable = r.get();
423                 curr = new HashSet<>(currentTable.getFlow());
424             }
425
426             // Sets with custom equivalence rules
427             Set<Equivalence.Wrapper<Flow>> oldFlows =
428                     new HashSet<>(Collections2.transform(curr, EquivalenceFabric.FLOW_WRAPPER_FUNCTION));
429             Set<Equivalence.Wrapper<Flow>> updatedFlows =
430                     new HashSet<>(Collections2.transform(update, EquivalenceFabric.FLOW_WRAPPER_FUNCTION));
431
432             // what is still there but was not updated, needs to be deleted
433             Sets.SetView<Equivalence.Wrapper<Flow>> deletions =
434                     Sets.difference(oldFlows, updatedFlows);
435             // new flows (they were not there before)
436             Sets.SetView<Equivalence.Wrapper<Flow>> additions =
437                     Sets.difference(updatedFlows, oldFlows);
438
439             if (!deletions.isEmpty()) {
440                 for (Equivalence.Wrapper<Flow> wf: deletions) {
441                     Flow f = wf.get();
442                     if (f != null) {
443                         t.delete(LogicalDatastoreType.CONFIGURATION,
444                                 FlowUtils.createFlowPath(entry.getKey(), f.getId()));
445                     }
446                 }
447             }
448             if (!additions.isEmpty()) {
449                 for (Equivalence.Wrapper<Flow> wf: additions) {
450                     Flow f = wf.get();
451                     if (f != null) {
452                         t.put(LogicalDatastoreType.CONFIGURATION,
453                                 FlowUtils.createFlowPath(entry.getKey(), f.getId()), f, true);
454                     }
455                 }
456             }
457             CheckedFuture<Void, TransactionCommitFailedException> f = t.submit();
458             Futures.addCallback(f, new FutureCallback<Void>() {
459                 @Override
460                 public void onFailure(Throwable t) {
461                     LOG.error("Could not write flow table {}", t);
462                 }
463
464                 @Override
465                 public void onSuccess(Void result) {
466                     LOG.debug("Flow table updated.");
467                 }
468             });
469         }
470
471     }
472
473     private void scheduleUpdate() {
474         if (switchManager != null) {
475             LOG.trace("Scheduling flow update task");
476             flowUpdateTask.reschedule(FLOW_UPDATE_DELAY, TimeUnit.MILLISECONDS);
477         }
478     }
479
480     /**
481      * Update the flows on a particular switch
482      */
483     private class SwitchFlowUpdateTask implements Callable<Void> {
484         private FlowMap flowMap;
485
486         public SwitchFlowUpdateTask(FlowMap flowMap) {
487             super();
488             this.flowMap = flowMap;
489         }
490
491         @Override
492         public Void call() throws Exception {
493             for (NodeId node : switchManager.getReadySwitches()) {
494                 PolicyInfo info = policyResolver.getCurrentPolicy();
495                 if (info == null)
496                     return null;
497                 for (OfTable table : flowPipeline) {
498                     try {
499                         table.update(node, info, flowMap);
500                     } catch (Exception e) {
501                         LOG.error("Failed to write flow table {}",
502                                 table.getClass().getSimpleName(), e);
503                     }
504                 }
505             }
506             return null;
507         }
508     }
509
510     /**
511      * Update all flows on all switches as needed.  Note that this will block
512      * one of the threads on the executor.
513      */
514     private class FlowUpdateTask implements Runnable {
515         @Override
516         public void run() {
517             LOG.debug("Beginning flow update task");
518
519             CompletionService<Void> ecs
520                 = new ExecutorCompletionService<>(executor);
521             int n = 0;
522
523             FlowMap flowMap = new FlowMap();
524
525             SwitchFlowUpdateTask swut = new SwitchFlowUpdateTask(flowMap);
526             ecs.submit(swut);
527             n+=1;
528
529             for (int i = 0; i < n; i++) {
530                 try {
531                     ecs.take().get();
532                     flowMap.commitToDataStore();
533                 } catch (InterruptedException | ExecutionException e) {
534                     LOG.error("Failed to update flow tables", e);
535                 }
536             }
537             LOG.debug("Flow update completed");
538         }
539     }
540
541
542
543
544
545 }