update deprecated transform and addCallback methods
[groupbasedpolicy.git] / renderers / ofoverlay / src / main / java / org / opendaylight / groupbasedpolicy / renderer / ofoverlay / PolicyManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.groupbasedpolicy.renderer.ofoverlay;
10
11 import java.io.Closeable;
12 import java.io.IOException;
13 import java.util.ArrayList;
14 import java.util.Collection;
15 import java.util.HashMap;
16 import java.util.List;
17 import java.util.Map;
18 import java.util.concurrent.Callable;
19 import java.util.concurrent.CompletionService;
20 import java.util.concurrent.ExecutionException;
21 import java.util.concurrent.ExecutorCompletionService;
22 import java.util.concurrent.ScheduledExecutorService;
23 import java.util.concurrent.TimeUnit;
24
25 import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
26 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
27 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
28 import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
29 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
30 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
31 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
32 import org.opendaylight.groupbasedpolicy.dto.EgKey;
33 import org.opendaylight.groupbasedpolicy.dto.EpKey;
34 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.endpoint.EndpointManager;
35 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.FlowUtils;
36 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.GroupTable;
37 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.OfTable;
38 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.mapper.destination.DestinationMapper;
39 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.mapper.egressnat.EgressNatMapper;
40 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.mapper.external.ExternalMapper;
41 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.mapper.ingressnat.IngressNatMapper;
42 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.mapper.policyenforcer.PolicyEnforcer;
43 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.mapper.portsecurity.PortSecurity;
44 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.mapper.source.SourceMapper;
45 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.node.SwitchListener;
46 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.node.SwitchManager;
47 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.sfcutils.SfcIidFactory;
48 import org.opendaylight.groupbasedpolicy.util.DataStoreHelper;
49 import org.opendaylight.groupbasedpolicy.util.IidFactory;
50 import org.opendaylight.groupbasedpolicy.util.SingletonTask;
51 import org.opendaylight.yang.gen.v1.urn.ericsson.params.xml.ns.yang.sfc.of.renderer.rev151123.SfcOfRendererConfig;
52 import org.opendaylight.yang.gen.v1.urn.ericsson.params.xml.ns.yang.sfc.of.renderer.rev151123.SfcOfRendererConfigBuilder;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.Endpoint;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.ofoverlay.rev140528.OfOverlayConfig.LearningMode;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.renderer.rev151103.renderers.renderer.interests.followed.tenants.followed.tenant.FollowedEndpointGroup;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.renderer.rev151103.renderers.renderer.interests.followed.tenants.followed.tenant.FollowedEndpointGroupBuilder;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.resolved.policy.rev150828.ResolvedPolicies;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.resolved.policy.rev150828.resolved.policies.ResolvedPolicy;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId;
63 import org.opendaylight.yangtools.concepts.ListenerRegistration;
64 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
65 import org.slf4j.Logger;
66 import org.slf4j.LoggerFactory;
67
68 import com.google.common.base.Function;
69 import com.google.common.base.Optional;
70 import com.google.common.collect.ImmutableList;
71 import com.google.common.util.concurrent.AsyncFunction;
72 import com.google.common.util.concurrent.Futures;
73 import com.google.common.util.concurrent.ListenableFuture;
74 import com.google.common.util.concurrent.MoreExecutors;
75
76 /**
77  * Manage policies on switches by subscribing to updates from the
78  * policy resolver and information about endpoints from the endpoint
79  * registry
80  */
81 public class PolicyManager
82         implements SwitchListener, EndpointListener, ClusteredDataTreeChangeListener<ResolvedPolicy>, Closeable {
83
84     private static final Logger LOG = LoggerFactory.getLogger(PolicyManager.class);
85
86     private Map<InstanceIdentifier<Table>, TableBuilder> previousGbpFlows  = new HashMap<>();
87
88     private short tableOffset;
89     private static final short TABLEID_PORTSECURITY = 0;
90     private static final short TABLEID_INGRESS_NAT = 1;
91     private static final short TABLEID_SOURCE_MAPPER = 2;
92     private static final short TABLEID_DESTINATION_MAPPER = 3;
93     private static final short TABLEID_POLICY_ENFORCER = 4;
94     private static final short TABLEID_EGRESS_NAT = 5;
95     private static final short TABLEID_EXTERNAL_MAPPER = 6;
96     private static final short TABLEID_SFC_INGRESS = 7;
97     private static final short TABLEID_SFC_EGRESS = 0;
98
99     private final SwitchManager switchManager;
100     private final EndpointManager endpointManager;
101
102     private final ListenerRegistration<PolicyManager> registerDataTreeChangeListener;
103
104     private final ScheduledExecutorService executor;
105     private final SingletonTask flowUpdateTask;
106     private final DataBroker dataBroker;
107
108     /**
109      * The delay before triggering the flow update task in response to an
110      * event in milliseconds.
111      */
112     private final static int FLOW_UPDATE_DELAY = 250;
113
114     public PolicyManager(DataBroker dataBroker, SwitchManager switchManager, EndpointManager endpointManager,
115             ScheduledExecutorService executor, short tableOffset) {
116         super();
117         this.switchManager = switchManager;
118         this.executor = executor;
119         this.dataBroker = dataBroker;
120         this.tableOffset = tableOffset;
121         try {
122             // to validate against model
123             verifyMaxTableId(tableOffset);
124         } catch (IllegalArgumentException e) {
125             throw new IllegalArgumentException("Failed to start OF-Overlay renderer\n."
126                     + "Max. table ID would be out of range. Check config-subsystem.\n{}", e);
127         }
128
129         if (dataBroker != null) {
130             registerDataTreeChangeListener = dataBroker.registerDataTreeChangeListener(
131                     new DataTreeIdentifier<>(LogicalDatastoreType.OPERATIONAL,
132                             InstanceIdentifier.builder(ResolvedPolicies.class).child(ResolvedPolicy.class).build()),
133                     this);
134         } else {
135             registerDataTreeChangeListener = null;
136             LOG.error("DataBroker is null. Listener for {} was not registered.",
137                     ResolvedPolicy.class.getCanonicalName());
138         }
139         if (switchManager != null)
140             switchManager.registerListener(this);
141         this.endpointManager = endpointManager;
142         endpointManager.registerListener(this);
143
144         if (!setSfcTableOffset(TABLEID_SFC_INGRESS, TABLEID_SFC_EGRESS)) {
145             LOG.error("Could not set SFC Ingress Table offset.");
146         }
147         flowUpdateTask = new SingletonTask(executor, new FlowUpdateTask());
148         scheduleUpdate();
149
150         LOG.debug("Initialized OFOverlay policy manager");
151     }
152
153     private boolean setSfcTableOffset(short tableidSfcIngress, short tableidSfcEgress) {
154         SfcOfRendererConfig sfcOfRendererConfig = new SfcOfRendererConfigBuilder()
155             .setSfcOfTableOffset(tableidSfcIngress).setSfcOfAppEgressTableOffset(tableidSfcEgress).build();
156         WriteTransaction wTx = dataBroker.newWriteOnlyTransaction();
157         wTx.put(LogicalDatastoreType.CONFIGURATION, SfcIidFactory.sfcOfRendererConfigIid(), sfcOfRendererConfig);
158         return DataStoreHelper.submitToDs(wTx);
159     }
160
161     private List<? extends OfTable> createFlowPipeline(OfContext ofCtx) {
162         // TODO - PORTSECURITY is kept in table 0.
163         // According to openflow spec,processing on vSwitch always starts from table 0.
164         // Packets will be droped if table 0 is empty.
165         // Alternative workaround - table-miss flow entries in table 0.
166         return ImmutableList.of(new PortSecurity(ofCtx, (short) 0), new GroupTable(ofCtx),
167                 new IngressNatMapper(ofCtx, getTABLEID_INGRESS_NAT()),
168                 new SourceMapper(ofCtx, getTABLEID_SOURCE_MAPPER()),
169                 new DestinationMapper(ofCtx, getTABLEID_DESTINATION_MAPPER()),
170                 new PolicyEnforcer(ofCtx, getTABLEID_POLICY_ENFORCER()),
171                 new EgressNatMapper(ofCtx, getTABLEID_EGRESS_NAT()),
172                 new ExternalMapper(ofCtx, getTABLEID_EXTERNAL_MAPPER()));
173     }
174
175     /**
176      * @param tableOffset the new offset value
177      * @return {@link ListenableFuture} to indicate that tables have been synced
178      */
179     public ListenableFuture<Void> changeOpenFlowTableOffset(final short tableOffset) {
180         try {
181             verifyMaxTableId(tableOffset);
182         } catch (IllegalArgumentException e) {
183             LOG.error("Cannot update table offset. Max. table ID would be out of range.\n{}", e);
184             // TODO - invalid offset value remains in conf DS
185             // It's not possible to validate offset value by using constrains in model,
186             // because number of tables in pipeline varies.
187             return Futures.immediateFuture(null);
188         }
189         List<Short> tableIDs = getTableIDs();
190         this.tableOffset = tableOffset;
191         return Futures.transform(removeUnusedTables(tableIDs), new Function<Void, Void>() {
192
193             @Override
194             public Void apply(Void tablesRemoved) {
195                 scheduleUpdate();
196                 return null;
197             }
198         }, MoreExecutors.directExecutor());
199     }
200
201     /**
202      * @param tableIDs - IDs of tables to delete
203      * @return ListenableFuture<Void> - which will be filled when clearing is done
204      */
205     private ListenableFuture<Void> removeUnusedTables(final List<Short> tableIDs) {
206         List<ListenableFuture<Void>> checkList = new ArrayList<>();
207         final ReadWriteTransaction rwTx = dataBroker.newReadWriteTransaction();
208         for (Short tableId : tableIDs) {
209             for (NodeId nodeId : switchManager.getReadySwitches()) {
210                 final InstanceIdentifier<Table> tablePath = FlowUtils.createTablePath(nodeId, tableId);
211                 checkList.add(deleteTableIfExists(rwTx, tablePath));
212             }
213         }
214         ListenableFuture<List<Void>> allAsListFuture = Futures.allAsList(checkList);
215         return Futures.transformAsync(allAsListFuture, new AsyncFunction<List<Void>, Void>() {
216
217             @Override
218             public ListenableFuture<Void> apply(List<Void> readyToSubmit) {
219                 return rwTx.submit();
220             }
221         }, MoreExecutors.directExecutor());
222     }
223
224     private List<Short> getTableIDs() {
225         List<Short> tableIds = new ArrayList<>();
226         tableIds.add(getTABLEID_PORTSECURITY());
227         tableIds.add(getTABLEID_INGRESS_NAT());
228         tableIds.add(getTABLEID_SOURCE_MAPPER());
229         tableIds.add(getTABLEID_DESTINATION_MAPPER());
230         tableIds.add(getTABLEID_POLICY_ENFORCER());
231         tableIds.add(getTABLEID_EGRESS_NAT());
232         tableIds.add(getTABLEID_EXTERNAL_MAPPER());
233         return tableIds;
234     }
235
236     private ListenableFuture<Void> deleteTableIfExists(final ReadWriteTransaction rwTx,
237             final InstanceIdentifier<Table> tablePath) {
238         return Futures.transform(rwTx.read(LogicalDatastoreType.CONFIGURATION, tablePath),
239                 new Function<Optional<Table>, Void>() {
240
241                     @Override
242                     public Void apply(Optional<Table> optTable) {
243                         if (optTable.isPresent()) {
244                             rwTx.delete(LogicalDatastoreType.CONFIGURATION, tablePath);
245                         }
246                         return null;
247                     }
248                 }, MoreExecutors.directExecutor());
249     }
250
251     // **************
252     // SwitchListener
253     // **************
254
255     public short getTABLEID_PORTSECURITY() {
256         return (short) (tableOffset + TABLEID_PORTSECURITY);
257     }
258
259     public short getTABLEID_INGRESS_NAT() {
260         return (short) (tableOffset + TABLEID_INGRESS_NAT);
261     }
262
263     public short getTABLEID_SOURCE_MAPPER() {
264         return (short) (tableOffset + TABLEID_SOURCE_MAPPER);
265     }
266
267     public short getTABLEID_DESTINATION_MAPPER() {
268         return (short) (tableOffset + TABLEID_DESTINATION_MAPPER);
269     }
270
271     public short getTABLEID_POLICY_ENFORCER() {
272         return (short) (tableOffset + TABLEID_POLICY_ENFORCER);
273     }
274
275     public short getTABLEID_EGRESS_NAT() {
276         return (short) (tableOffset + TABLEID_EGRESS_NAT);
277     }
278
279     public short getTABLEID_EXTERNAL_MAPPER() {
280         return (short) (tableOffset + TABLEID_EXTERNAL_MAPPER);
281     }
282
283     public short getTABLEID_SFC_EGRESS() {
284         return TABLEID_SFC_EGRESS;
285     }
286
287     public short getTABLEID_SFC_INGRESS() {
288         return TABLEID_SFC_INGRESS;
289     }
290
291     public TableId verifyMaxTableId(short tableOffset) {
292         return new TableId((short) (tableOffset + TABLEID_EXTERNAL_MAPPER));
293     }
294
295     @Override
296     public void switchReady(final NodeId nodeId) {
297         scheduleUpdate();
298     }
299
300     @Override
301     public void switchRemoved(NodeId sw) {
302         // XXX TODO purge switch flows
303         scheduleUpdate();
304     }
305
306     @Override
307     public void switchUpdated(NodeId sw) {
308         scheduleUpdate();
309     }
310
311     // ****************
312     // EndpointListener
313     // ****************
314
315     @Override
316     public void endpointUpdated(EpKey epKey) {
317         scheduleUpdate();
318     }
319
320     @Override
321     public void nodeEndpointUpdated(NodeId nodeId, EpKey epKey) {
322         scheduleUpdate();
323     }
324
325     @Override
326     public void groupEndpointUpdated(EgKey egKey, EpKey epKey) {
327         // TODO a renderer should remove followed-EPG and followed-tenant at some point
328         if (dataBroker == null) {
329             LOG.error("DataBroker is null. Cannot write followed-epg {}", epKey);
330             return;
331         }
332         WriteTransaction wTx = dataBroker.newWriteOnlyTransaction();
333         FollowedEndpointGroup followedEpg = new FollowedEndpointGroupBuilder().setId(egKey.getEgId()).build();
334         wTx.put(LogicalDatastoreType.OPERATIONAL, IidFactory.followedEndpointgroupIid(OFOverlayRenderer.RENDERER_NAME,
335                 egKey.getTenantId(), egKey.getEgId()), followedEpg, true);
336         DataStoreHelper.submitToDs(wTx);
337         scheduleUpdate();
338     }
339
340     // **************
341     // ClusteredDataTreeChangeListener<ResolvedPolicy>
342     // **************
343
344     @Override
345     public void onDataTreeChanged(Collection<DataTreeModification<ResolvedPolicy>> changes) {
346         scheduleUpdate();
347     }
348
349     // *************
350     // PolicyManager
351     // *************
352
353     /**
354      * Set the learning mode to the specified value
355      *
356      * @param learningMode the learning mode to set
357      */
358     public void setLearningMode(LearningMode learningMode) {
359         // No-op for now
360     }
361
362     // **************
363     // Implementation
364     // **************
365
366     private void scheduleUpdate() {
367         if (switchManager != null) {
368             LOG.trace("Scheduling flow update task");
369             flowUpdateTask.reschedule(FLOW_UPDATE_DELAY, TimeUnit.MILLISECONDS);
370         }
371     }
372
373     /**
374      * Update the flows on a particular switch
375      */
376     private class SwitchFlowUpdateTask implements Callable<Void> {
377
378         private final OfWriter ofWriter;
379
380         public SwitchFlowUpdateTask(OfWriter ofWriter) {
381             this.ofWriter = ofWriter;
382         }
383
384         @Override
385         public Void call() throws Exception {
386             OfContext ofCtx = new OfContext(dataBroker, PolicyManager.this, switchManager, endpointManager, executor);
387             if (ofCtx.getCurrentPolicy() == null)
388                 return null;
389             List<? extends OfTable> flowPipeline = createFlowPipeline(ofCtx);
390             for (OfTable table : flowPipeline) {
391                 try {
392                     for (Endpoint endpoint : endpointManager.getEndpoints()) {
393                         if (switchManager.getReadySwitches().contains(endpointManager.getEndpointNodeId(endpoint))) {
394                             table.sync(endpoint, ofWriter);
395                         }
396                     }
397                 } catch (Exception e) {
398                     LOG.error("Failed to write Openflow table {}", table.getClass().getSimpleName(), e);
399                 }
400             }
401
402             return null;
403         }
404     }
405
406     /**
407      * Update all flows on all switches as needed. Note that this will block
408      * one of the threads on the executor.
409      */
410     private class FlowUpdateTask implements Runnable {
411
412         @Override
413         public void run() {
414             LOG.debug("Beginning flow update task");
415
416             CompletionService<Void> ecs = new ExecutorCompletionService<>(executor);
417
418             OfWriter ofWriter = new OfWriter();
419
420             SwitchFlowUpdateTask swut = new SwitchFlowUpdateTask(ofWriter);
421             ecs.submit(swut);
422
423             try {
424                 ecs.take().get();
425                 // Current gbp flow must be independent, find out where this run() ends,
426                 // set flows to one field and reset another
427                 Map<InstanceIdentifier<Table>, TableBuilder> actualGbpFlows = new HashMap<>();
428                 actualGbpFlows.putAll(ofWriter.commitToDataStore(dataBroker, previousGbpFlows));
429                 previousGbpFlows = actualGbpFlows;
430             } catch (InterruptedException | ExecutionException e) {
431                 LOG.error("Failed to update flow tables", e);
432             }
433             LOG.debug("Flow update completed");
434         }
435     }
436
437     @Override
438     public void close() throws IOException {
439         if (registerDataTreeChangeListener != null)
440             registerDataTreeChangeListener.close();
441         // TODO unregister classifier and action instance validators
442     }
443
444 }