Merge "Bug 5208 - flows don't get removed on update"
[groupbasedpolicy.git] / renderers / ofoverlay / src / main / java / org / opendaylight / groupbasedpolicy / renderer / ofoverlay / PolicyManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.groupbasedpolicy.renderer.ofoverlay;
10
11 import java.io.Closeable;
12 import java.io.IOException;
13 import java.util.ArrayList;
14 import java.util.Collection;
15 import java.util.HashMap;
16 import java.util.List;
17 import java.util.Map;
18 import java.util.concurrent.Callable;
19 import java.util.concurrent.CompletionService;
20 import java.util.concurrent.ExecutionException;
21 import java.util.concurrent.ExecutorCompletionService;
22 import java.util.concurrent.ScheduledExecutorService;
23 import java.util.concurrent.TimeUnit;
24
25 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
26 import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeListener;
27 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
28 import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
29 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
30 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
31 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
32 import org.opendaylight.groupbasedpolicy.dto.EgKey;
33 import org.opendaylight.groupbasedpolicy.dto.EpKey;
34 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.endpoint.EndpointManager;
35 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.DestinationMapper;
36 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.EgressNatMapper;
37 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.ExternalMapper;
38 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.FlowUtils;
39 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.GroupTable;
40 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.IngressNatMapper;
41 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.OfTable;
42 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.PolicyEnforcer;
43 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.PortSecurity;
44 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.SourceMapper;
45 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.node.SwitchListener;
46 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.node.SwitchManager;
47 import org.opendaylight.groupbasedpolicy.util.DataStoreHelper;
48 import org.opendaylight.groupbasedpolicy.util.IidFactory;
49 import org.opendaylight.groupbasedpolicy.util.SingletonTask;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.ofoverlay.rev140528.OfOverlayConfig.LearningMode;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.renderer.rev151103.renderers.renderer.interests.followed.tenants.followed.tenant.FollowedEndpointGroup;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.renderer.rev151103.renderers.renderer.interests.followed.tenants.followed.tenant.FollowedEndpointGroupBuilder;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.resolved.policy.rev150828.ResolvedPolicies;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.resolved.policy.rev150828.resolved.policies.ResolvedPolicy;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId;
59 import org.opendaylight.yangtools.concepts.ListenerRegistration;
60 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
61 import org.slf4j.Logger;
62 import org.slf4j.LoggerFactory;
63
64 import com.google.common.base.Function;
65 import com.google.common.base.Optional;
66 import com.google.common.collect.ImmutableList;
67 import com.google.common.util.concurrent.AsyncFunction;
68 import com.google.common.util.concurrent.Futures;
69 import com.google.common.util.concurrent.ListenableFuture;
70
71 /**
72  * Manage policies on switches by subscribing to updates from the
73  * policy resolver and information about endpoints from the endpoint
74  * registry
75  */
76 public class PolicyManager
77      implements SwitchListener, EndpointListener, DataTreeChangeListener<ResolvedPolicy>, Closeable {
78     private static final Logger LOG =
79             LoggerFactory.getLogger(PolicyManager.class);
80
81     private Map<InstanceIdentifier<Table>, TableBuilder> previousGbpFlows  = new HashMap<>();
82
83     private short tableOffset;
84     private static final short TABLEID_PORTSECURITY = 0;
85     private static final short TABLEID_INGRESS_NAT =  1;
86     private static final short TABLEID_SOURCE_MAPPER = 2;
87     private static final short TABLEID_DESTINATION_MAPPER = 3;
88     private static final short TABLEID_POLICY_ENFORCER = 4;
89     private static final short TABLEID_EGRESS_NAT = 5;
90     private static final short TABLEID_EXTERNAL_MAPPER = 6;
91
92     private final SwitchManager switchManager;
93     private final EndpointManager endpointManager;
94
95     private final ListenerRegistration<PolicyManager> registerDataTreeChangeListener;
96
97     private final ScheduledExecutorService executor;
98     private final SingletonTask flowUpdateTask;
99     private final DataBroker dataBroker;
100
101     /**
102      * The delay before triggering the flow update task in response to an
103      * event in milliseconds.
104      */
105     private final static int FLOW_UPDATE_DELAY = 250;
106
107     public PolicyManager(DataBroker dataBroker,
108                          SwitchManager switchManager,
109                          EndpointManager endpointManager,
110                          ScheduledExecutorService executor,
111                          short tableOffset) {
112         super();
113         this.switchManager = switchManager;
114         this.executor = executor;
115         this.dataBroker = dataBroker;
116         this.tableOffset = tableOffset;
117         try {
118             // to validate against model
119             verifyMaxTableId(tableOffset);
120         } catch (IllegalArgumentException e) {
121             throw new IllegalArgumentException("Failed to start OF-Overlay renderer\n."
122                     + "Max. table ID would be out of range. Check config-subsystem.\n{}", e);
123         }
124
125         if (dataBroker != null) {
126             registerDataTreeChangeListener = dataBroker.registerDataTreeChangeListener(
127                     new DataTreeIdentifier<>(LogicalDatastoreType.OPERATIONAL,
128                             InstanceIdentifier.builder(ResolvedPolicies.class).child(ResolvedPolicy.class).build()),
129                     this);
130         } else {
131             registerDataTreeChangeListener = null;
132             LOG.error("DataBroker is null. Listener for {} was not registered.",
133                     ResolvedPolicy.class.getCanonicalName());
134         }
135         if (switchManager != null)
136             switchManager.registerListener(this);
137         this.endpointManager = endpointManager;
138         endpointManager.registerListener(this);
139
140         flowUpdateTask = new SingletonTask(executor, new FlowUpdateTask());
141         scheduleUpdate();
142
143         LOG.debug("Initialized OFOverlay policy manager");
144     }
145
146     private List<? extends OfTable> createFlowPipeline(OfContext ofCtx) {
147         // TODO - PORTSECURITY is kept in table 0.
148         // According to openflow spec,processing on vSwitch always starts from table 0.
149         // Packets will be droped if table 0 is empty.
150         // Alternative workaround - table-miss flow entries in table 0.
151         return ImmutableList.of(new PortSecurity(ofCtx, (short) 0),
152                                         new GroupTable(ofCtx),
153                                         new IngressNatMapper(ofCtx, getTABLEID_INGRESS_NAT()),
154                                         new SourceMapper(ofCtx, getTABLEID_SOURCE_MAPPER()),
155                                         new DestinationMapper(ofCtx, getTABLEID_DESTINATION_MAPPER()),
156                                         new PolicyEnforcer(ofCtx, getTABLEID_POLICY_ENFORCER()),
157                                         new EgressNatMapper(ofCtx, getTABLEID_EGRESS_NAT()),
158                                         new ExternalMapper(ofCtx, getTABLEID_EXTERNAL_MAPPER())
159                                         );
160     }
161
162     /**
163      * @param tableOffset the new offset value
164      * @return {@link ListenableFuture} to indicate that tables have been synced
165      */
166     public ListenableFuture<Void> changeOpenFlowTableOffset(final short tableOffset) {
167         try {
168             verifyMaxTableId(tableOffset);
169         } catch (IllegalArgumentException e) {
170             LOG.error("Cannot update table offset. Max. table ID would be out of range.\n{}", e);
171             // TODO - invalid offset value remains in conf DS
172             // It's not possible to validate offset value by using constrains in model,
173             // because number of tables in pipeline varies.
174             return Futures.immediateFuture(null);
175         }
176         List<Short> tableIDs = getTableIDs();
177         this.tableOffset = tableOffset;
178         return Futures.transform(removeUnusedTables(tableIDs), new Function<Void, Void>() {
179
180             @Override
181             public Void apply(Void tablesRemoved) {
182                 scheduleUpdate();
183                 return null;
184             }
185         });
186     }
187
188     /**
189      * @param  tableIDs - IDs of tables to delete
190      * @return ListenableFuture<Void> - which will be filled when clearing is done
191      */
192     private ListenableFuture<Void> removeUnusedTables(final List<Short> tableIDs) {
193         List<ListenableFuture<Void>> checkList = new ArrayList<>();
194         final ReadWriteTransaction rwTx = dataBroker.newReadWriteTransaction();
195         for (Short tableId : tableIDs) {
196             for (NodeId nodeId : switchManager.getReadySwitches()) {
197                 final InstanceIdentifier<Table> tablePath = FlowUtils.createTablePath(nodeId, tableId);
198                 checkList.add(deleteTableIfExists(rwTx, tablePath));
199             }
200         }
201         ListenableFuture<List<Void>> allAsListFuture = Futures.allAsList(checkList);
202         return Futures.transform(allAsListFuture, new AsyncFunction<List<Void>, Void>() {
203
204             @Override
205             public ListenableFuture<Void> apply(List<Void> readyToSubmit) {
206                 return rwTx.submit();
207             }
208         });
209     }
210
211     private List<Short> getTableIDs() {
212         List<Short> tableIds = new ArrayList<>();
213         tableIds.add(getTABLEID_PORTSECURITY());
214         tableIds.add(getTABLEID_INGRESS_NAT());
215         tableIds.add(getTABLEID_SOURCE_MAPPER());
216         tableIds.add(getTABLEID_DESTINATION_MAPPER());
217         tableIds.add(getTABLEID_POLICY_ENFORCER());
218         tableIds.add(getTABLEID_EGRESS_NAT());
219         tableIds.add(getTABLEID_EXTERNAL_MAPPER());
220         return tableIds;
221     }
222
223     private ListenableFuture<Void> deleteTableIfExists(final ReadWriteTransaction rwTx, final InstanceIdentifier<Table> tablePath){
224     return Futures.transform(rwTx.read(LogicalDatastoreType.CONFIGURATION, tablePath), new Function<Optional<Table>, Void>() {
225
226         @Override
227         public Void apply(Optional<Table> optTable) {
228             if(optTable.isPresent()){
229                 rwTx.delete(LogicalDatastoreType.CONFIGURATION, tablePath);
230             }
231             return null;
232         }});
233     }
234
235     // **************
236     // SwitchListener
237     // **************
238
239     public short getTABLEID_PORTSECURITY() {
240         return (short)(tableOffset+TABLEID_PORTSECURITY);
241     }
242
243
244     public short getTABLEID_INGRESS_NAT() {
245         return (short)(tableOffset+TABLEID_INGRESS_NAT);
246     }
247
248
249     public short getTABLEID_SOURCE_MAPPER() {
250         return (short)(tableOffset+TABLEID_SOURCE_MAPPER);
251     }
252
253
254     public short getTABLEID_DESTINATION_MAPPER() {
255         return (short)(tableOffset+TABLEID_DESTINATION_MAPPER);
256     }
257
258
259     public short getTABLEID_POLICY_ENFORCER() {
260         return (short)(tableOffset+TABLEID_POLICY_ENFORCER);
261     }
262
263
264     public short getTABLEID_EGRESS_NAT() {
265         return (short)(tableOffset+TABLEID_EGRESS_NAT);
266     }
267
268
269     public short getTABLEID_EXTERNAL_MAPPER() {
270         return (short)(tableOffset+TABLEID_EXTERNAL_MAPPER);
271     }
272
273
274     public TableId verifyMaxTableId(short tableOffset) {
275         return new TableId((short)(tableOffset+TABLEID_EXTERNAL_MAPPER));
276     }
277
278     @Override
279     public void switchReady(final NodeId nodeId) {
280         scheduleUpdate();
281     }
282
283     @Override
284     public void switchRemoved(NodeId sw) {
285         // XXX TODO purge switch flows
286         scheduleUpdate();
287     }
288
289     @Override
290     public void switchUpdated(NodeId sw) {
291         scheduleUpdate();
292     }
293
294     // ****************
295     // EndpointListener
296     // ****************
297
298     @Override
299     public void endpointUpdated(EpKey epKey) {
300         scheduleUpdate();
301     }
302
303     @Override
304     public void nodeEndpointUpdated(NodeId nodeId, EpKey epKey){
305         scheduleUpdate();
306     }
307
308     @Override
309     public void groupEndpointUpdated(EgKey egKey, EpKey epKey) {
310         // TODO a renderer should remove followed-EPG and followed-tenant at some point
311         if (dataBroker == null) {
312             LOG.error("DataBroker is null. Cannot write followed-epg {}", epKey);
313             return;
314         }
315         WriteTransaction wTx = dataBroker.newWriteOnlyTransaction();
316         FollowedEndpointGroup followedEpg = new FollowedEndpointGroupBuilder().setId(egKey.getEgId()).build();
317         wTx.put(LogicalDatastoreType.OPERATIONAL, IidFactory.followedEndpointgroupIid(OFOverlayRenderer.RENDERER_NAME,
318                 egKey.getTenantId(), egKey.getEgId()), followedEpg, true);
319         DataStoreHelper.submitToDs(wTx);
320         scheduleUpdate();
321     }
322
323     // **************
324     // DataTreeChangeListener<ResolvedPolicy>
325     // **************
326
327     @Override
328     public void onDataTreeChanged(Collection<DataTreeModification<ResolvedPolicy>> changes) {
329         scheduleUpdate();
330     }
331
332     // *************
333     // PolicyManager
334     // *************
335
336     /**
337      * Set the learning mode to the specified value
338      * @param learningMode the learning mode to set
339      */
340     public void setLearningMode(LearningMode learningMode) {
341         // No-op for now
342     }
343
344     // **************
345     // Implementation
346     // **************
347
348     private void scheduleUpdate() {
349         if (switchManager != null) {
350             LOG.trace("Scheduling flow update task");
351             flowUpdateTask.reschedule(FLOW_UPDATE_DELAY, TimeUnit.MILLISECONDS);
352         }
353     }
354
355     /**
356      * Update the flows on a particular switch
357      */
358     private class SwitchFlowUpdateTask implements Callable<Void> {
359         private final OfWriter ofWriter;
360
361         public SwitchFlowUpdateTask(OfWriter ofWriter) {
362             this.ofWriter = ofWriter;
363         }
364
365         @Override
366         public Void call() throws Exception {
367             OfContext ofCtx = new OfContext(dataBroker, PolicyManager.this, switchManager, endpointManager, executor);
368             if (ofCtx.getCurrentPolicy() == null)
369                 return null;
370             List<? extends OfTable> flowPipeline = createFlowPipeline(ofCtx);
371             for (NodeId node : switchManager.getReadySwitches()) {
372                 for (OfTable table : flowPipeline) {
373                     try {
374                         table.sync(node, ofWriter);
375                     } catch (Exception e) {
376                         LOG.error("Failed to write Openflow table {}",
377                                 table.getClass().getSimpleName(), e);
378                     }
379                 }
380             }
381             return null;
382         }
383     }
384
385     /**
386      * Update all flows on all switches as needed.  Note that this will block
387      * one of the threads on the executor.
388      */
389     private class FlowUpdateTask implements Runnable {
390         @Override
391         public void run() {
392             LOG.debug("Beginning flow update task");
393
394             CompletionService<Void> ecs
395                 = new ExecutorCompletionService<>(executor);
396
397             OfWriter ofWriter = new OfWriter();
398
399             SwitchFlowUpdateTask swut = new SwitchFlowUpdateTask(ofWriter);
400             ecs.submit(swut);
401
402             try {
403                 ecs.take().get();
404                 // Current gbp flow must be independent, find out where this run() ends,
405                 // set flows to one field and reset another
406                 Map<InstanceIdentifier<Table>, TableBuilder> actualGbpFlows = new HashMap<>();
407                 actualGbpFlows.putAll(ofWriter.commitToDataStore(dataBroker, previousGbpFlows));
408                 previousGbpFlows = actualGbpFlows;
409             } catch (InterruptedException | ExecutionException e) {
410                 LOG.error("Failed to update flow tables", e);
411             }
412             LOG.debug("Flow update completed");
413         }
414     }
415
416     @Override
417     public void close() throws IOException {
418         if (registerDataTreeChangeListener != null)
419             registerDataTreeChangeListener.close();
420         // TODO unregister classifier and action instance validators
421     }
422
423 }