Bug 3302: fix for GroupTable
[groupbasedpolicy.git] / renderers / ofoverlay / src / main / java / org / opendaylight / groupbasedpolicy / renderer / ofoverlay / PolicyManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.groupbasedpolicy.renderer.ofoverlay;
10
11 import java.util.ArrayList;
12 import java.util.List;
13 import java.util.Map.Entry;
14 import java.util.Set;
15 import java.util.concurrent.Callable;
16 import java.util.concurrent.CompletionService;
17 import java.util.concurrent.ExecutionException;
18 import java.util.concurrent.ExecutorCompletionService;
19 import java.util.concurrent.ScheduledExecutorService;
20 import java.util.concurrent.TimeUnit;
21
22 import com.google.common.base.Function;
23 import com.google.common.base.Optional;
24 import com.google.common.collect.ImmutableList;
25 import com.google.common.util.concurrent.AsyncFunction;
26 import com.google.common.util.concurrent.Futures;
27 import com.google.common.util.concurrent.ListenableFuture;
28 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
29 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
30 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
31 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
32 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
33 import org.opendaylight.groupbasedpolicy.endpoint.EpKey;
34 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.DestinationMapper;
35 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.EgressNatMapper;
36 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.ExternalMapper;
37 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.FlowUtils;
38 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.GroupTable;
39 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.IngressNatMapper;
40 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.OfTable;
41 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.PolicyEnforcer;
42 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.PortSecurity;
43 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.SourceMapper;
44 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.node.SwitchListener;
45 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.node.SwitchManager;
46 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.sf.Action;
47 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.sf.SubjectFeatures;
48 import org.opendaylight.groupbasedpolicy.resolver.EgKey;
49 import org.opendaylight.groupbasedpolicy.resolver.PolicyInfo;
50 import org.opendaylight.groupbasedpolicy.resolver.PolicyListener;
51 import org.opendaylight.groupbasedpolicy.resolver.PolicyResolver;
52 import org.opendaylight.groupbasedpolicy.resolver.PolicyScope;
53 import org.opendaylight.groupbasedpolicy.util.SingletonTask;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.ActionDefinitionId;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.ofoverlay.rev140528.OfOverlayConfig.LearningMode;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.SubjectFeatureDefinitions;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId;
60 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
61 import org.slf4j.Logger;
62 import org.slf4j.LoggerFactory;
63
64 /**
65  * Manage policies on switches by subscribing to updates from the
66  * policy resolver and information about endpoints from the endpoint
67  * registry
68  */
69 public class PolicyManager
70      implements SwitchListener, PolicyListener, EndpointListener {
71     private static final Logger LOG =
72             LoggerFactory.getLogger(PolicyManager.class);
73
74     private short tableOffset;
75     private static final short TABLEID_PORTSECURITY = 0;
76     private static final short TABLEID_INGRESS_NAT =  1;
77     private static final short TABLEID_SOURCE_MAPPER = 2;
78     private static final short TABLEID_DESTINATION_MAPPER = 3;
79     private static final short TABLEID_POLICY_ENFORCER = 4;
80     private static final short TABLEID_EGRESS_NAT = 5;
81     private static final short TABLEID_EXTERNAL_MAPPER = 6;
82
83     private final SwitchManager switchManager;
84     private final PolicyResolver policyResolver;
85
86     private final PolicyScope policyScope;
87
88     private final ScheduledExecutorService executor;
89     private final SingletonTask flowUpdateTask;
90     private final DataBroker dataBroker;
91     private final OfContext ofCtx;
92     /**
93      * The flow tables that make up the processing pipeline
94      */
95     private List<? extends OfTable> flowPipeline;
96
97     /**
98      * The delay before triggering the flow update task in response to an
99      * event in milliseconds.
100      */
101     private final static int FLOW_UPDATE_DELAY = 250;
102
103     public PolicyManager(DataBroker dataBroker,
104                          PolicyResolver policyResolver,
105                          SwitchManager switchManager,
106                          EndpointManager endpointManager,
107                          RpcProviderRegistry rpcRegistry,
108                          ScheduledExecutorService executor,
109                          short tableOffset) {
110         super();
111         this.switchManager = switchManager;
112         this.executor = executor;
113         this.policyResolver = policyResolver;
114         this.dataBroker = dataBroker;
115         this.tableOffset = tableOffset;
116         try {
117             // to validate against model
118             verifyMaxTableId(tableOffset);
119         } catch (IllegalArgumentException e) {
120             throw new IllegalArgumentException("Failed to start OF-Overlay renderer\n."
121                     + "Max. table ID would be out of range. Check config-subsystem.\n{}", e);
122         }
123
124         if (dataBroker != null) {
125             WriteTransaction t = dataBroker.newWriteOnlyTransaction();
126             t.put(LogicalDatastoreType.OPERATIONAL,
127                   InstanceIdentifier
128                       .builder(SubjectFeatureDefinitions.class)
129                       .build(),
130                   SubjectFeatures.OF_OVERLAY_FEATURES);
131             t.submit();
132         }
133
134         for(Entry<ActionDefinitionId, Action> entry : SubjectFeatures.getActions().entrySet()) {
135             policyResolver.registerActionDefinitions(entry.getKey(), entry.getValue());
136         }
137
138         ofCtx = new OfContext(dataBroker, rpcRegistry,
139                                         this, policyResolver, switchManager,
140                                         endpointManager, executor);
141
142         flowPipeline = createFlowPipeline();
143
144         policyScope = policyResolver.registerListener(this);
145         if (switchManager != null)
146             switchManager.registerListener(this);
147         endpointManager.registerListener(this);
148
149         flowUpdateTask = new SingletonTask(executor, new FlowUpdateTask());
150         scheduleUpdate();
151
152         LOG.debug("Initialized OFOverlay policy manager");
153     }
154
155     private List<? extends OfTable> createFlowPipeline() {
156         // TODO - PORTSECURITY is kept in table 0.
157         // According to openflow spec,processing on vSwitch always starts from table 0.
158         // Packets will be droped if table 0 is empty.
159         // Alternative workaround - table-miss flow entries in table 0.
160         return ImmutableList.of(new PortSecurity(ofCtx, (short) 0),
161                                         new GroupTable(ofCtx),
162                                         new IngressNatMapper(ofCtx, getTABLEID_INGRESS_NAT()),
163                                         new SourceMapper(ofCtx, getTABLEID_SOURCE_MAPPER()),
164                                         new DestinationMapper(ofCtx, getTABLEID_DESTINATION_MAPPER()),
165                                         new PolicyEnforcer(ofCtx, getTABLEID_POLICY_ENFORCER()),
166                                         new EgressNatMapper(ofCtx, getTABLEID_EGRESS_NAT()),
167                                         new ExternalMapper(ofCtx, getTABLEID_EXTERNAL_MAPPER())
168                                         );
169     }
170
171     /**
172      * @param tableOffset - new offset value
173      * @return ListenableFuture<List> - to indicate that tables have been synced
174      */
175     public ListenableFuture<Void> changeOpenFlowTableOffset(final short tableOffset) {
176         try {
177             verifyMaxTableId(tableOffset);
178         } catch (IllegalArgumentException e) {
179             LOG.error("Cannot update table offset. Max. table ID would be out of range.\n{}", e);
180             // TODO - invalid offset value remains in conf DS
181             // It's not possible to validate offset value by using constrains in model,
182             // because number of tables in pipeline varies.
183             return Futures.immediateFuture(null);
184         }
185         List<Short> tableIDs = getTableIDs();
186         this.tableOffset = tableOffset;
187         return Futures.transform(removeUnusedTables(tableIDs), new Function<Void, Void>() {
188
189             @Override
190             public Void apply(Void tablesRemoved) {
191                 flowPipeline = createFlowPipeline();
192                 scheduleUpdate();
193                 return null;
194             }
195         });
196     }
197
198     /**
199      * @param  tableIDs - IDs of tables to delete
200      * @return ListenableFuture<Void> - which will be filled when clearing is done
201      */
202     private ListenableFuture<Void> removeUnusedTables(final List<Short> tableIDs) {
203         List<ListenableFuture<Void>> checkList = new ArrayList<>();
204         final ReadWriteTransaction rwTx = dataBroker.newReadWriteTransaction();
205         for (Short tableId : tableIDs) {
206             for (NodeId nodeId : switchManager.getReadySwitches()) {
207                 final InstanceIdentifier<Table> tablePath = FlowUtils.createTablePath(nodeId, tableId);
208                 checkList.add(deteleTableIfExists(rwTx, tablePath));
209             }
210         }
211         ListenableFuture<List<Void>> allAsListFuture = Futures.allAsList(checkList);
212         return Futures.transform(allAsListFuture, new AsyncFunction<List<Void>, Void>() {
213
214             @Override
215             public ListenableFuture<Void> apply(List<Void> readyToSubmit) {
216                 return rwTx.submit();
217             }
218         });
219     }
220
221     private List<Short> getTableIDs() {
222         List<Short> tableIds = new ArrayList<>();
223         tableIds.add(getTABLEID_PORTSECURITY());
224         tableIds.add(getTABLEID_INGRESS_NAT());
225         tableIds.add(getTABLEID_SOURCE_MAPPER());
226         tableIds.add(getTABLEID_DESTINATION_MAPPER());
227         tableIds.add(getTABLEID_POLICY_ENFORCER());
228         tableIds.add(getTABLEID_EGRESS_NAT());
229         tableIds.add(getTABLEID_EXTERNAL_MAPPER());
230         return tableIds;
231     }
232
233     private ListenableFuture<Void> deteleTableIfExists(final ReadWriteTransaction rwTx, final InstanceIdentifier<Table> tablePath){
234     return Futures.transform(rwTx.read(LogicalDatastoreType.CONFIGURATION, tablePath), new Function<Optional<Table>, Void>() {
235
236         @Override
237         public Void apply(Optional<Table> optTable) {
238             if(optTable.isPresent()){
239                 rwTx.delete(LogicalDatastoreType.CONFIGURATION, tablePath);
240             }
241             return null;
242         }});
243     }
244
245     // **************
246     // SwitchListener
247     // **************
248
249     public short getTABLEID_PORTSECURITY() {
250         return (short)(tableOffset+TABLEID_PORTSECURITY);
251     }
252
253
254     public short getTABLEID_INGRESS_NAT() {
255         return (short)(tableOffset+TABLEID_INGRESS_NAT);
256     }
257
258
259     public short getTABLEID_SOURCE_MAPPER() {
260         return (short)(tableOffset+TABLEID_SOURCE_MAPPER);
261     }
262
263
264     public short getTABLEID_DESTINATION_MAPPER() {
265         return (short)(tableOffset+TABLEID_DESTINATION_MAPPER);
266     }
267
268
269     public short getTABLEID_POLICY_ENFORCER() {
270         return (short)(tableOffset+TABLEID_POLICY_ENFORCER);
271     }
272
273
274     public short getTABLEID_EGRESS_NAT() {
275         return (short)(tableOffset+TABLEID_EGRESS_NAT);
276     }
277
278
279     public short getTABLEID_EXTERNAL_MAPPER() {
280         return (short)(tableOffset+TABLEID_EXTERNAL_MAPPER);
281     }
282
283
284     public TableId verifyMaxTableId(short tableOffset) {
285         return new TableId((short)(tableOffset+TABLEID_EXTERNAL_MAPPER));
286     }
287
288     @Override
289     public void switchReady(final NodeId nodeId) {
290         scheduleUpdate();
291     }
292
293     @Override
294     public void switchRemoved(NodeId sw) {
295         // XXX TODO purge switch flows
296         scheduleUpdate();
297     }
298
299     @Override
300     public void switchUpdated(NodeId sw) {
301         scheduleUpdate();
302     }
303
304     // ****************
305     // EndpointListener
306     // ****************
307
308     @Override
309     public void endpointUpdated(EpKey epKey) {
310         scheduleUpdate();
311     }
312
313     @Override
314     public void nodeEndpointUpdated(NodeId nodeId, EpKey epKey){
315         scheduleUpdate();
316     }
317
318     @Override
319     public void groupEndpointUpdated(EgKey egKey, EpKey epKey) {
320         policyScope.addToScope(egKey.getTenantId(), egKey.getEgId());
321         scheduleUpdate();
322     }
323
324     // **************
325     // PolicyListener
326     // **************
327
328     @Override
329     public void policyUpdated(Set<EgKey> updatedConsumers) {
330         scheduleUpdate();
331     }
332
333     // *************
334     // PolicyManager
335     // *************
336
337     /**
338      * Set the learning mode to the specified value
339      * @param learningMode the learning mode to set
340      */
341     public void setLearningMode(LearningMode learningMode) {
342         // No-op for now
343     }
344
345     // **************
346     // Implementation
347     // **************
348
349     private void scheduleUpdate() {
350         if (switchManager != null) {
351             LOG.trace("Scheduling flow update task");
352             flowUpdateTask.reschedule(FLOW_UPDATE_DELAY, TimeUnit.MILLISECONDS);
353         }
354     }
355
356     /**
357      * Update the flows on a particular switch
358      */
359     private class SwitchFlowUpdateTask implements Callable<Void> {
360         private OfWriter ofWriter;
361
362         public SwitchFlowUpdateTask(OfWriter ofWriter) {
363             super();
364             this.ofWriter = ofWriter;
365         }
366
367         @Override
368         public Void call() throws Exception {
369             for (NodeId node : switchManager.getReadySwitches()) {
370                 PolicyInfo info = policyResolver.getCurrentPolicy();
371                 if (info == null)
372                     return null;
373                 for (OfTable table : flowPipeline) {
374                     try {
375                         table.update(node, info, ofWriter);
376                     } catch (Exception e) {
377                         LOG.error("Failed to write Openflow table {}",
378                                 table.getClass().getSimpleName(), e);
379                     }
380                 }
381             }
382             return null;
383         }
384     }
385
386     /**
387      * Update all flows on all switches as needed.  Note that this will block
388      * one of the threads on the executor.
389      */
390     private class FlowUpdateTask implements Runnable {
391         @Override
392         public void run() {
393             LOG.debug("Beginning flow update task");
394
395             CompletionService<Void> ecs
396                 = new ExecutorCompletionService<>(executor);
397             int n = 0;
398
399             OfWriter ofWriter = new OfWriter();
400
401             SwitchFlowUpdateTask swut = new SwitchFlowUpdateTask(ofWriter);
402             ecs.submit(swut);
403             n+=1;
404
405             for (int i = 0; i < n; i++) {
406                 try {
407                     ecs.take().get();
408                     ofWriter.commitToDataStore(dataBroker);
409                 } catch (InterruptedException | ExecutionException e) {
410                     LOG.error("Failed to update flow tables", e);
411                 }
412             }
413             LOG.debug("Flow update completed");
414         }
415     }
416
417
418
419
420
421 }