18e4b94531dd4ef153c040e6ce0d0f25dcd382c6
[groupbasedpolicy.git] / renderers / ofoverlay / src / main / java / org / opendaylight / groupbasedpolicy / renderer / ofoverlay / PolicyManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.groupbasedpolicy.renderer.ofoverlay;
10
11 import java.util.ArrayList;
12 import java.util.List;
13 import java.util.Map.Entry;
14 import java.util.Set;
15 import java.util.concurrent.Callable;
16 import java.util.concurrent.CompletionService;
17 import java.util.concurrent.ExecutionException;
18 import java.util.concurrent.ExecutorCompletionService;
19 import java.util.concurrent.ScheduledExecutorService;
20 import java.util.concurrent.TimeUnit;
21
22 import com.google.common.base.Function;
23 import com.google.common.base.Optional;
24 import com.google.common.collect.ImmutableList;
25 import com.google.common.util.concurrent.AsyncFunction;
26 import com.google.common.util.concurrent.Futures;
27 import com.google.common.util.concurrent.ListenableFuture;
28 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
29 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
30 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
31 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
32 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
33 import org.opendaylight.groupbasedpolicy.endpoint.EpKey;
34 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.endpoint.EndpointManager;
35 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.DestinationMapper;
36 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.EgressNatMapper;
37 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.ExternalMapper;
38 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.FlowUtils;
39 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.GroupTable;
40 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.IngressNatMapper;
41 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.OfTable;
42 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.PolicyEnforcer;
43 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.PortSecurity;
44 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.SourceMapper;
45 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.node.SwitchListener;
46 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.node.SwitchManager;
47 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.sf.Action;
48 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.sf.SubjectFeatures;
49 import org.opendaylight.groupbasedpolicy.resolver.EgKey;
50 import org.opendaylight.groupbasedpolicy.resolver.PolicyInfo;
51 import org.opendaylight.groupbasedpolicy.resolver.PolicyListener;
52 import org.opendaylight.groupbasedpolicy.resolver.PolicyResolver;
53 import org.opendaylight.groupbasedpolicy.resolver.PolicyScope;
54 import org.opendaylight.groupbasedpolicy.util.SingletonTask;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.ActionDefinitionId;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.ofoverlay.rev140528.OfOverlayConfig.LearningMode;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.SubjectFeatureDefinitions;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId;
61 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
62 import org.slf4j.Logger;
63 import org.slf4j.LoggerFactory;
64
65 /**
66  * Manage policies on switches by subscribing to updates from the
67  * policy resolver and information about endpoints from the endpoint
68  * registry
69  */
70 public class PolicyManager
71      implements SwitchListener, PolicyListener, EndpointListener {
72     private static final Logger LOG =
73             LoggerFactory.getLogger(PolicyManager.class);
74
75     private short tableOffset;
76     private static final short TABLEID_PORTSECURITY = 0;
77     private static final short TABLEID_INGRESS_NAT =  1;
78     private static final short TABLEID_SOURCE_MAPPER = 2;
79     private static final short TABLEID_DESTINATION_MAPPER = 3;
80     private static final short TABLEID_POLICY_ENFORCER = 4;
81     private static final short TABLEID_EGRESS_NAT = 5;
82     private static final short TABLEID_EXTERNAL_MAPPER = 6;
83
84     private final SwitchManager switchManager;
85     private final PolicyResolver policyResolver;
86
87     private final PolicyScope policyScope;
88
89     private final ScheduledExecutorService executor;
90     private final SingletonTask flowUpdateTask;
91     private final DataBroker dataBroker;
92     private final OfContext ofCtx;
93     /**
94      * The flow tables that make up the processing pipeline
95      */
96     private List<? extends OfTable> flowPipeline;
97
98     /**
99      * The delay before triggering the flow update task in response to an
100      * event in milliseconds.
101      */
102     private final static int FLOW_UPDATE_DELAY = 250;
103
104     public PolicyManager(DataBroker dataBroker,
105                          PolicyResolver policyResolver,
106                          SwitchManager switchManager,
107                          EndpointManager endpointManager,
108                          RpcProviderRegistry rpcRegistry,
109                          ScheduledExecutorService executor,
110                          short tableOffset) {
111         super();
112         this.switchManager = switchManager;
113         this.executor = executor;
114         this.policyResolver = policyResolver;
115         this.dataBroker = dataBroker;
116         this.tableOffset = tableOffset;
117         try {
118             // to validate against model
119             verifyMaxTableId(tableOffset);
120         } catch (IllegalArgumentException e) {
121             throw new IllegalArgumentException("Failed to start OF-Overlay renderer\n."
122                     + "Max. table ID would be out of range. Check config-subsystem.\n{}", e);
123         }
124
125         if (dataBroker != null) {
126             WriteTransaction t = dataBroker.newWriteOnlyTransaction();
127             t.put(LogicalDatastoreType.OPERATIONAL,
128                   InstanceIdentifier
129                       .builder(SubjectFeatureDefinitions.class)
130                       .build(),
131                   SubjectFeatures.OF_OVERLAY_FEATURES);
132             t.submit();
133         }
134
135         for(Entry<ActionDefinitionId, Action> entry : SubjectFeatures.getActions().entrySet()) {
136             policyResolver.registerActionDefinitions(entry.getKey(), entry.getValue());
137         }
138
139         ofCtx = new OfContext(dataBroker, rpcRegistry,
140                                         this, policyResolver, switchManager,
141                                         endpointManager, executor);
142
143         flowPipeline = createFlowPipeline();
144
145         policyScope = policyResolver.registerListener(this);
146         if (switchManager != null)
147             switchManager.registerListener(this);
148         endpointManager.registerListener(this);
149
150         flowUpdateTask = new SingletonTask(executor, new FlowUpdateTask());
151         scheduleUpdate();
152
153         LOG.debug("Initialized OFOverlay policy manager");
154     }
155
156     private List<? extends OfTable> createFlowPipeline() {
157         // TODO - PORTSECURITY is kept in table 0.
158         // According to openflow spec,processing on vSwitch always starts from table 0.
159         // Packets will be droped if table 0 is empty.
160         // Alternative workaround - table-miss flow entries in table 0.
161         return ImmutableList.of(new PortSecurity(ofCtx, (short) 0),
162                                         new GroupTable(ofCtx),
163                                         new IngressNatMapper(ofCtx, getTABLEID_INGRESS_NAT()),
164                                         new SourceMapper(ofCtx, getTABLEID_SOURCE_MAPPER()),
165                                         new DestinationMapper(ofCtx, getTABLEID_DESTINATION_MAPPER()),
166                                         new PolicyEnforcer(ofCtx, getTABLEID_POLICY_ENFORCER()),
167                                         new EgressNatMapper(ofCtx, getTABLEID_EGRESS_NAT()),
168                                         new ExternalMapper(ofCtx, getTABLEID_EXTERNAL_MAPPER())
169                                         );
170     }
171
172     /**
173      * @param tableOffset the new offset value
174      * @return {@link ListenableFuture} to indicate that tables have been synced
175      */
176     public ListenableFuture<Void> changeOpenFlowTableOffset(final short tableOffset) {
177         try {
178             verifyMaxTableId(tableOffset);
179         } catch (IllegalArgumentException e) {
180             LOG.error("Cannot update table offset. Max. table ID would be out of range.\n{}", e);
181             // TODO - invalid offset value remains in conf DS
182             // It's not possible to validate offset value by using constrains in model,
183             // because number of tables in pipeline varies.
184             return Futures.immediateFuture(null);
185         }
186         List<Short> tableIDs = getTableIDs();
187         this.tableOffset = tableOffset;
188         return Futures.transform(removeUnusedTables(tableIDs), new Function<Void, Void>() {
189
190             @Override
191             public Void apply(Void tablesRemoved) {
192                 flowPipeline = createFlowPipeline();
193                 scheduleUpdate();
194                 return null;
195             }
196         });
197     }
198
199     /**
200      * @param  tableIDs - IDs of tables to delete
201      * @return ListenableFuture<Void> - which will be filled when clearing is done
202      */
203     private ListenableFuture<Void> removeUnusedTables(final List<Short> tableIDs) {
204         List<ListenableFuture<Void>> checkList = new ArrayList<>();
205         final ReadWriteTransaction rwTx = dataBroker.newReadWriteTransaction();
206         for (Short tableId : tableIDs) {
207             for (NodeId nodeId : switchManager.getReadySwitches()) {
208                 final InstanceIdentifier<Table> tablePath = FlowUtils.createTablePath(nodeId, tableId);
209                 checkList.add(deteleTableIfExists(rwTx, tablePath));
210             }
211         }
212         ListenableFuture<List<Void>> allAsListFuture = Futures.allAsList(checkList);
213         return Futures.transform(allAsListFuture, new AsyncFunction<List<Void>, Void>() {
214
215             @Override
216             public ListenableFuture<Void> apply(List<Void> readyToSubmit) {
217                 return rwTx.submit();
218             }
219         });
220     }
221
222     private List<Short> getTableIDs() {
223         List<Short> tableIds = new ArrayList<>();
224         tableIds.add(getTABLEID_PORTSECURITY());
225         tableIds.add(getTABLEID_INGRESS_NAT());
226         tableIds.add(getTABLEID_SOURCE_MAPPER());
227         tableIds.add(getTABLEID_DESTINATION_MAPPER());
228         tableIds.add(getTABLEID_POLICY_ENFORCER());
229         tableIds.add(getTABLEID_EGRESS_NAT());
230         tableIds.add(getTABLEID_EXTERNAL_MAPPER());
231         return tableIds;
232     }
233
234     private ListenableFuture<Void> deteleTableIfExists(final ReadWriteTransaction rwTx, final InstanceIdentifier<Table> tablePath){
235     return Futures.transform(rwTx.read(LogicalDatastoreType.CONFIGURATION, tablePath), new Function<Optional<Table>, Void>() {
236
237         @Override
238         public Void apply(Optional<Table> optTable) {
239             if(optTable.isPresent()){
240                 rwTx.delete(LogicalDatastoreType.CONFIGURATION, tablePath);
241             }
242             return null;
243         }});
244     }
245
246     // **************
247     // SwitchListener
248     // **************
249
250     public short getTABLEID_PORTSECURITY() {
251         return (short)(tableOffset+TABLEID_PORTSECURITY);
252     }
253
254
255     public short getTABLEID_INGRESS_NAT() {
256         return (short)(tableOffset+TABLEID_INGRESS_NAT);
257     }
258
259
260     public short getTABLEID_SOURCE_MAPPER() {
261         return (short)(tableOffset+TABLEID_SOURCE_MAPPER);
262     }
263
264
265     public short getTABLEID_DESTINATION_MAPPER() {
266         return (short)(tableOffset+TABLEID_DESTINATION_MAPPER);
267     }
268
269
270     public short getTABLEID_POLICY_ENFORCER() {
271         return (short)(tableOffset+TABLEID_POLICY_ENFORCER);
272     }
273
274
275     public short getTABLEID_EGRESS_NAT() {
276         return (short)(tableOffset+TABLEID_EGRESS_NAT);
277     }
278
279
280     public short getTABLEID_EXTERNAL_MAPPER() {
281         return (short)(tableOffset+TABLEID_EXTERNAL_MAPPER);
282     }
283
284
285     public TableId verifyMaxTableId(short tableOffset) {
286         return new TableId((short)(tableOffset+TABLEID_EXTERNAL_MAPPER));
287     }
288
289     @Override
290     public void switchReady(final NodeId nodeId) {
291         scheduleUpdate();
292     }
293
294     @Override
295     public void switchRemoved(NodeId sw) {
296         // XXX TODO purge switch flows
297         scheduleUpdate();
298     }
299
300     @Override
301     public void switchUpdated(NodeId sw) {
302         scheduleUpdate();
303     }
304
305     // ****************
306     // EndpointListener
307     // ****************
308
309     @Override
310     public void endpointUpdated(EpKey epKey) {
311         scheduleUpdate();
312     }
313
314     @Override
315     public void nodeEndpointUpdated(NodeId nodeId, EpKey epKey){
316         scheduleUpdate();
317     }
318
319     @Override
320     public void groupEndpointUpdated(EgKey egKey, EpKey epKey) {
321         policyScope.addToScope(egKey.getTenantId(), egKey.getEgId());
322         scheduleUpdate();
323     }
324
325     // **************
326     // PolicyListener
327     // **************
328
329     @Override
330     public void policyUpdated(Set<EgKey> updatedConsumers) {
331         scheduleUpdate();
332     }
333
334     // *************
335     // PolicyManager
336     // *************
337
338     /**
339      * Set the learning mode to the specified value
340      * @param learningMode the learning mode to set
341      */
342     public void setLearningMode(LearningMode learningMode) {
343         // No-op for now
344     }
345
346     // **************
347     // Implementation
348     // **************
349
350     private void scheduleUpdate() {
351         if (switchManager != null) {
352             LOG.trace("Scheduling flow update task");
353             flowUpdateTask.reschedule(FLOW_UPDATE_DELAY, TimeUnit.MILLISECONDS);
354         }
355     }
356
357     /**
358      * Update the flows on a particular switch
359      */
360     private class SwitchFlowUpdateTask implements Callable<Void> {
361         private OfWriter ofWriter;
362
363         public SwitchFlowUpdateTask(OfWriter ofWriter) {
364             super();
365             this.ofWriter = ofWriter;
366         }
367
368         @Override
369         public Void call() throws Exception {
370             for (NodeId node : switchManager.getReadySwitches()) {
371                 PolicyInfo info = policyResolver.getCurrentPolicy();
372                 if (info == null)
373                     return null;
374                 for (OfTable table : flowPipeline) {
375                     try {
376                         table.update(node, info, ofWriter);
377                     } catch (Exception e) {
378                         LOG.error("Failed to write Openflow table {}",
379                                 table.getClass().getSimpleName(), e);
380                     }
381                 }
382             }
383             return null;
384         }
385     }
386
387     /**
388      * Update all flows on all switches as needed.  Note that this will block
389      * one of the threads on the executor.
390      */
391     private class FlowUpdateTask implements Runnable {
392         @Override
393         public void run() {
394             LOG.debug("Beginning flow update task");
395
396             CompletionService<Void> ecs
397                 = new ExecutorCompletionService<>(executor);
398             int n = 0;
399
400             OfWriter ofWriter = new OfWriter();
401
402             SwitchFlowUpdateTask swut = new SwitchFlowUpdateTask(ofWriter);
403             ecs.submit(swut);
404             n+=1;
405
406             for (int i = 0; i < n; i++) {
407                 try {
408                     ecs.take().get();
409                     ofWriter.commitToDataStore(dataBroker);
410                 } catch (InterruptedException | ExecutionException e) {
411                     LOG.error("Failed to update flow tables", e);
412                 }
413             }
414             LOG.debug("Flow update completed");
415         }
416     }
417
418
419
420
421
422 }