BUG:6743 switch to clustered data listener
[groupbasedpolicy.git] / renderers / ofoverlay / src / main / java / org / opendaylight / groupbasedpolicy / renderer / ofoverlay / PolicyManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.groupbasedpolicy.renderer.ofoverlay;
10
11 import java.io.Closeable;
12 import java.io.IOException;
13 import java.util.ArrayList;
14 import java.util.Collection;
15 import java.util.HashMap;
16 import java.util.List;
17 import java.util.Map;
18 import java.util.concurrent.Callable;
19 import java.util.concurrent.CompletionService;
20 import java.util.concurrent.ExecutionException;
21 import java.util.concurrent.ExecutorCompletionService;
22 import java.util.concurrent.ScheduledExecutorService;
23 import java.util.concurrent.TimeUnit;
24
25 import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
26 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
27 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
28 import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
29 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
30 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
31 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
32 import org.opendaylight.groupbasedpolicy.dto.EgKey;
33 import org.opendaylight.groupbasedpolicy.dto.EpKey;
34 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.endpoint.EndpointManager;
35 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.FlowUtils;
36 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.GroupTable;
37 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.OfTable;
38 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.mapper.destination.DestinationMapper;
39 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.mapper.egressnat.EgressNatMapper;
40 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.mapper.external.ExternalMapper;
41 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.mapper.ingressnat.IngressNatMapper;
42 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.mapper.policyenforcer.PolicyEnforcer;
43 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.mapper.portsecurity.PortSecurity;
44 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.mapper.source.SourceMapper;
45 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.node.SwitchListener;
46 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.node.SwitchManager;
47 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.sfcutils.SfcIidFactory;
48 import org.opendaylight.groupbasedpolicy.util.DataStoreHelper;
49 import org.opendaylight.groupbasedpolicy.util.IidFactory;
50 import org.opendaylight.groupbasedpolicy.util.SingletonTask;
51 import org.opendaylight.yang.gen.v1.urn.ericsson.params.xml.ns.yang.sfc.of.renderer.rev151123.SfcOfRendererConfig;
52 import org.opendaylight.yang.gen.v1.urn.ericsson.params.xml.ns.yang.sfc.of.renderer.rev151123.SfcOfRendererConfigBuilder;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.Endpoint;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.ofoverlay.rev140528.OfOverlayConfig.LearningMode;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.renderer.rev151103.renderers.renderer.interests.followed.tenants.followed.tenant.FollowedEndpointGroup;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.renderer.rev151103.renderers.renderer.interests.followed.tenants.followed.tenant.FollowedEndpointGroupBuilder;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.resolved.policy.rev150828.ResolvedPolicies;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.resolved.policy.rev150828.resolved.policies.ResolvedPolicy;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId;
63 import org.opendaylight.yangtools.concepts.ListenerRegistration;
64 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
65 import org.slf4j.Logger;
66 import org.slf4j.LoggerFactory;
67
68 import com.google.common.base.Function;
69 import com.google.common.base.Optional;
70 import com.google.common.collect.ImmutableList;
71 import com.google.common.util.concurrent.AsyncFunction;
72 import com.google.common.util.concurrent.Futures;
73 import com.google.common.util.concurrent.ListenableFuture;
74
75 /**
76  * Manage policies on switches by subscribing to updates from the
77  * policy resolver and information about endpoints from the endpoint
78  * registry
79  */
80 public class PolicyManager
81         implements SwitchListener, EndpointListener, ClusteredDataTreeChangeListener<ResolvedPolicy>, Closeable {
82
83     private static final Logger LOG = LoggerFactory.getLogger(PolicyManager.class);
84
85     private Map<InstanceIdentifier<Table>, TableBuilder> previousGbpFlows  = new HashMap<>();
86
87     private short tableOffset;
88     private static final short TABLEID_PORTSECURITY = 0;
89     private static final short TABLEID_INGRESS_NAT = 1;
90     private static final short TABLEID_SOURCE_MAPPER = 2;
91     private static final short TABLEID_DESTINATION_MAPPER = 3;
92     private static final short TABLEID_POLICY_ENFORCER = 4;
93     private static final short TABLEID_EGRESS_NAT = 5;
94     private static final short TABLEID_EXTERNAL_MAPPER = 6;
95     private static final short TABLEID_SFC_INGRESS = 7;
96     private static final short TABLEID_SFC_EGRESS = 0;
97
98     private final SwitchManager switchManager;
99     private final EndpointManager endpointManager;
100
101     private final ListenerRegistration<PolicyManager> registerDataTreeChangeListener;
102
103     private final ScheduledExecutorService executor;
104     private final SingletonTask flowUpdateTask;
105     private final DataBroker dataBroker;
106
107     /**
108      * The delay before triggering the flow update task in response to an
109      * event in milliseconds.
110      */
111     private final static int FLOW_UPDATE_DELAY = 250;
112
113     public PolicyManager(DataBroker dataBroker, SwitchManager switchManager, EndpointManager endpointManager,
114             ScheduledExecutorService executor, short tableOffset) {
115         super();
116         this.switchManager = switchManager;
117         this.executor = executor;
118         this.dataBroker = dataBroker;
119         this.tableOffset = tableOffset;
120         try {
121             // to validate against model
122             verifyMaxTableId(tableOffset);
123         } catch (IllegalArgumentException e) {
124             throw new IllegalArgumentException("Failed to start OF-Overlay renderer\n."
125                     + "Max. table ID would be out of range. Check config-subsystem.\n{}", e);
126         }
127
128         if (dataBroker != null) {
129             registerDataTreeChangeListener = dataBroker.registerDataTreeChangeListener(
130                     new DataTreeIdentifier<>(LogicalDatastoreType.OPERATIONAL,
131                             InstanceIdentifier.builder(ResolvedPolicies.class).child(ResolvedPolicy.class).build()),
132                     this);
133         } else {
134             registerDataTreeChangeListener = null;
135             LOG.error("DataBroker is null. Listener for {} was not registered.",
136                     ResolvedPolicy.class.getCanonicalName());
137         }
138         if (switchManager != null)
139             switchManager.registerListener(this);
140         this.endpointManager = endpointManager;
141         endpointManager.registerListener(this);
142
143         if (!setSfcTableOffset(TABLEID_SFC_INGRESS, TABLEID_SFC_EGRESS)) {
144             LOG.error("Could not set SFC Ingress Table offset.");
145         }
146         flowUpdateTask = new SingletonTask(executor, new FlowUpdateTask());
147         scheduleUpdate();
148
149         LOG.debug("Initialized OFOverlay policy manager");
150     }
151
152     private boolean setSfcTableOffset(short tableidSfcIngress, short tableidSfcEgress) {
153         SfcOfRendererConfig sfcOfRendererConfig = new SfcOfRendererConfigBuilder()
154             .setSfcOfTableOffset(tableidSfcIngress).setSfcOfAppEgressTableOffset(tableidSfcEgress).build();
155         WriteTransaction wTx = dataBroker.newWriteOnlyTransaction();
156         wTx.put(LogicalDatastoreType.CONFIGURATION, SfcIidFactory.sfcOfRendererConfigIid(), sfcOfRendererConfig);
157         return DataStoreHelper.submitToDs(wTx);
158     }
159
160     private List<? extends OfTable> createFlowPipeline(OfContext ofCtx) {
161         // TODO - PORTSECURITY is kept in table 0.
162         // According to openflow spec,processing on vSwitch always starts from table 0.
163         // Packets will be droped if table 0 is empty.
164         // Alternative workaround - table-miss flow entries in table 0.
165         return ImmutableList.of(new PortSecurity(ofCtx, (short) 0), new GroupTable(ofCtx),
166                 new IngressNatMapper(ofCtx, getTABLEID_INGRESS_NAT()),
167                 new SourceMapper(ofCtx, getTABLEID_SOURCE_MAPPER()),
168                 new DestinationMapper(ofCtx, getTABLEID_DESTINATION_MAPPER()),
169                 new PolicyEnforcer(ofCtx, getTABLEID_POLICY_ENFORCER()),
170                 new EgressNatMapper(ofCtx, getTABLEID_EGRESS_NAT()),
171                 new ExternalMapper(ofCtx, getTABLEID_EXTERNAL_MAPPER()));
172     }
173
174     /**
175      * @param tableOffset the new offset value
176      * @return {@link ListenableFuture} to indicate that tables have been synced
177      */
178     public ListenableFuture<Void> changeOpenFlowTableOffset(final short tableOffset) {
179         try {
180             verifyMaxTableId(tableOffset);
181         } catch (IllegalArgumentException e) {
182             LOG.error("Cannot update table offset. Max. table ID would be out of range.\n{}", e);
183             // TODO - invalid offset value remains in conf DS
184             // It's not possible to validate offset value by using constrains in model,
185             // because number of tables in pipeline varies.
186             return Futures.immediateFuture(null);
187         }
188         List<Short> tableIDs = getTableIDs();
189         this.tableOffset = tableOffset;
190         return Futures.transform(removeUnusedTables(tableIDs), new Function<Void, Void>() {
191
192             @Override
193             public Void apply(Void tablesRemoved) {
194                 scheduleUpdate();
195                 return null;
196             }
197         });
198     }
199
200     /**
201      * @param tableIDs - IDs of tables to delete
202      * @return ListenableFuture<Void> - which will be filled when clearing is done
203      */
204     private ListenableFuture<Void> removeUnusedTables(final List<Short> tableIDs) {
205         List<ListenableFuture<Void>> checkList = new ArrayList<>();
206         final ReadWriteTransaction rwTx = dataBroker.newReadWriteTransaction();
207         for (Short tableId : tableIDs) {
208             for (NodeId nodeId : switchManager.getReadySwitches()) {
209                 final InstanceIdentifier<Table> tablePath = FlowUtils.createTablePath(nodeId, tableId);
210                 checkList.add(deleteTableIfExists(rwTx, tablePath));
211             }
212         }
213         ListenableFuture<List<Void>> allAsListFuture = Futures.allAsList(checkList);
214         return Futures.transform(allAsListFuture, new AsyncFunction<List<Void>, Void>() {
215
216             @Override
217             public ListenableFuture<Void> apply(List<Void> readyToSubmit) {
218                 return rwTx.submit();
219             }
220         });
221     }
222
223     private List<Short> getTableIDs() {
224         List<Short> tableIds = new ArrayList<>();
225         tableIds.add(getTABLEID_PORTSECURITY());
226         tableIds.add(getTABLEID_INGRESS_NAT());
227         tableIds.add(getTABLEID_SOURCE_MAPPER());
228         tableIds.add(getTABLEID_DESTINATION_MAPPER());
229         tableIds.add(getTABLEID_POLICY_ENFORCER());
230         tableIds.add(getTABLEID_EGRESS_NAT());
231         tableIds.add(getTABLEID_EXTERNAL_MAPPER());
232         return tableIds;
233     }
234
235     private ListenableFuture<Void> deleteTableIfExists(final ReadWriteTransaction rwTx,
236             final InstanceIdentifier<Table> tablePath) {
237         return Futures.transform(rwTx.read(LogicalDatastoreType.CONFIGURATION, tablePath),
238                 new Function<Optional<Table>, Void>() {
239
240                     @Override
241                     public Void apply(Optional<Table> optTable) {
242                         if (optTable.isPresent()) {
243                             rwTx.delete(LogicalDatastoreType.CONFIGURATION, tablePath);
244                         }
245                         return null;
246                     }
247                 });
248     }
249
250     // **************
251     // SwitchListener
252     // **************
253
254     public short getTABLEID_PORTSECURITY() {
255         return (short) (tableOffset + TABLEID_PORTSECURITY);
256     }
257
258     public short getTABLEID_INGRESS_NAT() {
259         return (short) (tableOffset + TABLEID_INGRESS_NAT);
260     }
261
262     public short getTABLEID_SOURCE_MAPPER() {
263         return (short) (tableOffset + TABLEID_SOURCE_MAPPER);
264     }
265
266     public short getTABLEID_DESTINATION_MAPPER() {
267         return (short) (tableOffset + TABLEID_DESTINATION_MAPPER);
268     }
269
270     public short getTABLEID_POLICY_ENFORCER() {
271         return (short) (tableOffset + TABLEID_POLICY_ENFORCER);
272     }
273
274     public short getTABLEID_EGRESS_NAT() {
275         return (short) (tableOffset + TABLEID_EGRESS_NAT);
276     }
277
278     public short getTABLEID_EXTERNAL_MAPPER() {
279         return (short) (tableOffset + TABLEID_EXTERNAL_MAPPER);
280     }
281
282     public short getTABLEID_SFC_EGRESS() {
283         return TABLEID_SFC_EGRESS;
284     }
285
286     public short getTABLEID_SFC_INGRESS() {
287         return TABLEID_SFC_INGRESS;
288     }
289
290     public TableId verifyMaxTableId(short tableOffset) {
291         return new TableId((short) (tableOffset + TABLEID_EXTERNAL_MAPPER));
292     }
293
294     @Override
295     public void switchReady(final NodeId nodeId) {
296         scheduleUpdate();
297     }
298
299     @Override
300     public void switchRemoved(NodeId sw) {
301         // XXX TODO purge switch flows
302         scheduleUpdate();
303     }
304
305     @Override
306     public void switchUpdated(NodeId sw) {
307         scheduleUpdate();
308     }
309
310     // ****************
311     // EndpointListener
312     // ****************
313
314     @Override
315     public void endpointUpdated(EpKey epKey) {
316         scheduleUpdate();
317     }
318
319     @Override
320     public void nodeEndpointUpdated(NodeId nodeId, EpKey epKey) {
321         scheduleUpdate();
322     }
323
324     @Override
325     public void groupEndpointUpdated(EgKey egKey, EpKey epKey) {
326         // TODO a renderer should remove followed-EPG and followed-tenant at some point
327         if (dataBroker == null) {
328             LOG.error("DataBroker is null. Cannot write followed-epg {}", epKey);
329             return;
330         }
331         WriteTransaction wTx = dataBroker.newWriteOnlyTransaction();
332         FollowedEndpointGroup followedEpg = new FollowedEndpointGroupBuilder().setId(egKey.getEgId()).build();
333         wTx.put(LogicalDatastoreType.OPERATIONAL, IidFactory.followedEndpointgroupIid(OFOverlayRenderer.RENDERER_NAME,
334                 egKey.getTenantId(), egKey.getEgId()), followedEpg, true);
335         DataStoreHelper.submitToDs(wTx);
336         scheduleUpdate();
337     }
338
339     // **************
340     // ClusteredDataTreeChangeListener<ResolvedPolicy>
341     // **************
342
343     @Override
344     public void onDataTreeChanged(Collection<DataTreeModification<ResolvedPolicy>> changes) {
345         scheduleUpdate();
346     }
347
348     // *************
349     // PolicyManager
350     // *************
351
352     /**
353      * Set the learning mode to the specified value
354      *
355      * @param learningMode the learning mode to set
356      */
357     public void setLearningMode(LearningMode learningMode) {
358         // No-op for now
359     }
360
361     // **************
362     // Implementation
363     // **************
364
365     private void scheduleUpdate() {
366         if (switchManager != null) {
367             LOG.trace("Scheduling flow update task");
368             flowUpdateTask.reschedule(FLOW_UPDATE_DELAY, TimeUnit.MILLISECONDS);
369         }
370     }
371
372     /**
373      * Update the flows on a particular switch
374      */
375     private class SwitchFlowUpdateTask implements Callable<Void> {
376
377         private final OfWriter ofWriter;
378
379         public SwitchFlowUpdateTask(OfWriter ofWriter) {
380             this.ofWriter = ofWriter;
381         }
382
383         @Override
384         public Void call() throws Exception {
385             OfContext ofCtx = new OfContext(dataBroker, PolicyManager.this, switchManager, endpointManager, executor);
386             if (ofCtx.getCurrentPolicy() == null)
387                 return null;
388             List<? extends OfTable> flowPipeline = createFlowPipeline(ofCtx);
389             for (OfTable table : flowPipeline) {
390                 try {
391                     for (Endpoint endpoint : endpointManager.getEndpoints()) {
392                         if (switchManager.getReadySwitches().contains(endpointManager.getEndpointNodeId(endpoint))) {
393                             table.sync(endpoint, ofWriter);
394                         }
395                     }
396                 } catch (Exception e) {
397                     LOG.error("Failed to write Openflow table {}", table.getClass().getSimpleName(), e);
398                 }
399             }
400
401             return null;
402         }
403     }
404
405     /**
406      * Update all flows on all switches as needed. Note that this will block
407      * one of the threads on the executor.
408      */
409     private class FlowUpdateTask implements Runnable {
410
411         @Override
412         public void run() {
413             LOG.debug("Beginning flow update task");
414
415             CompletionService<Void> ecs = new ExecutorCompletionService<>(executor);
416
417             OfWriter ofWriter = new OfWriter();
418
419             SwitchFlowUpdateTask swut = new SwitchFlowUpdateTask(ofWriter);
420             ecs.submit(swut);
421
422             try {
423                 ecs.take().get();
424                 // Current gbp flow must be independent, find out where this run() ends,
425                 // set flows to one field and reset another
426                 Map<InstanceIdentifier<Table>, TableBuilder> actualGbpFlows = new HashMap<>();
427                 actualGbpFlows.putAll(ofWriter.commitToDataStore(dataBroker, previousGbpFlows));
428                 previousGbpFlows = actualGbpFlows;
429             } catch (InterruptedException | ExecutionException e) {
430                 LOG.error("Failed to update flow tables", e);
431             }
432             LOG.debug("Flow update completed");
433         }
434     }
435
436     @Override
437     public void close() throws IOException {
438         if (registerDataTreeChangeListener != null)
439             registerDataTreeChangeListener.close();
440         // TODO unregister classifier and action instance validators
441     }
442
443 }