Bug 5213 - Missing segmentation-id for physical networks
[groupbasedpolicy.git] / renderers / ofoverlay / src / main / java / org / opendaylight / groupbasedpolicy / renderer / ofoverlay / PolicyManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.groupbasedpolicy.renderer.ofoverlay;
10
11 import java.io.Closeable;
12 import java.io.IOException;
13 import java.util.ArrayList;
14 import java.util.Collection;
15 import java.util.HashMap;
16 import java.util.List;
17 import java.util.Map;
18 import java.util.concurrent.Callable;
19 import java.util.concurrent.CompletionService;
20 import java.util.concurrent.ExecutionException;
21 import java.util.concurrent.ExecutorCompletionService;
22 import java.util.concurrent.ScheduledExecutorService;
23 import java.util.concurrent.TimeUnit;
24
25 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
26 import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeListener;
27 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
28 import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
29 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
30 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
31 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
32 import org.opendaylight.groupbasedpolicy.dto.EgKey;
33 import org.opendaylight.groupbasedpolicy.dto.EpKey;
34 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.endpoint.EndpointManager;
35 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.DestinationMapper;
36 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.EgressNatMapper;
37 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.ExternalMapper;
38 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.FlowUtils;
39 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.GroupTable;
40 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.IngressNatMapper;
41 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.OfTable;
42 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.PolicyEnforcer;
43 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.PortSecurity;
44 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.SourceMapper;
45 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.node.SwitchListener;
46 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.node.SwitchManager;
47 import org.opendaylight.groupbasedpolicy.util.DataStoreHelper;
48 import org.opendaylight.groupbasedpolicy.util.IidFactory;
49 import org.opendaylight.groupbasedpolicy.util.SingletonTask;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.ofoverlay.rev140528.OfOverlayConfig.LearningMode;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.renderer.rev151103.renderers.renderer.interests.followed.tenants.followed.tenant.FollowedEndpointGroup;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.renderer.rev151103.renderers.renderer.interests.followed.tenants.followed.tenant.FollowedEndpointGroupBuilder;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.resolved.policy.rev150828.ResolvedPolicies;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.resolved.policy.rev150828.resolved.policies.ResolvedPolicy;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId;
59 import org.opendaylight.yangtools.concepts.ListenerRegistration;
60 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
61 import org.slf4j.Logger;
62 import org.slf4j.LoggerFactory;
63
64 import com.google.common.base.Function;
65 import com.google.common.base.Optional;
66 import com.google.common.collect.ImmutableList;
67 import com.google.common.util.concurrent.AsyncFunction;
68 import com.google.common.util.concurrent.Futures;
69 import com.google.common.util.concurrent.ListenableFuture;
70
71 /**
72  * Manage policies on switches by subscribing to updates from the
73  * policy resolver and information about endpoints from the endpoint
74  * registry
75  */
76 public class PolicyManager
77      implements SwitchListener, EndpointListener, DataTreeChangeListener<ResolvedPolicy>, Closeable {
78     private static final Logger LOG =
79             LoggerFactory.getLogger(PolicyManager.class);
80
81     private Map<InstanceIdentifier<Table>, TableBuilder> actualGbpFlows = new HashMap<>();
82     private Map<InstanceIdentifier<Table>, TableBuilder> previousGbpFlows  = new HashMap<>();
83
84     private short tableOffset;
85     private static final short TABLEID_PORTSECURITY = 0;
86     private static final short TABLEID_INGRESS_NAT =  1;
87     private static final short TABLEID_SOURCE_MAPPER = 2;
88     private static final short TABLEID_DESTINATION_MAPPER = 3;
89     private static final short TABLEID_POLICY_ENFORCER = 4;
90     private static final short TABLEID_EGRESS_NAT = 5;
91     private static final short TABLEID_EXTERNAL_MAPPER = 6;
92
93     private final SwitchManager switchManager;
94     private final EndpointManager endpointManager;
95
96     private final ListenerRegistration<PolicyManager> registerDataTreeChangeListener;
97
98     private final ScheduledExecutorService executor;
99     private final SingletonTask flowUpdateTask;
100     private final DataBroker dataBroker;
101
102     /**
103      * The delay before triggering the flow update task in response to an
104      * event in milliseconds.
105      */
106     private final static int FLOW_UPDATE_DELAY = 250;
107
108     public PolicyManager(DataBroker dataBroker,
109                          SwitchManager switchManager,
110                          EndpointManager endpointManager,
111                          ScheduledExecutorService executor,
112                          short tableOffset) {
113         super();
114         this.switchManager = switchManager;
115         this.executor = executor;
116         this.dataBroker = dataBroker;
117         this.tableOffset = tableOffset;
118         try {
119             // to validate against model
120             verifyMaxTableId(tableOffset);
121         } catch (IllegalArgumentException e) {
122             throw new IllegalArgumentException("Failed to start OF-Overlay renderer\n."
123                     + "Max. table ID would be out of range. Check config-subsystem.\n{}", e);
124         }
125
126         if (dataBroker != null) {
127             registerDataTreeChangeListener = dataBroker.registerDataTreeChangeListener(
128                     new DataTreeIdentifier<>(LogicalDatastoreType.OPERATIONAL,
129                             InstanceIdentifier.builder(ResolvedPolicies.class).child(ResolvedPolicy.class).build()),
130                     this);
131         } else {
132             registerDataTreeChangeListener = null;
133             LOG.error("DataBroker is null. Listener for {} was not registered.",
134                     ResolvedPolicy.class.getCanonicalName());
135         }
136         if (switchManager != null)
137             switchManager.registerListener(this);
138         this.endpointManager = endpointManager;
139         endpointManager.registerListener(this);
140
141         flowUpdateTask = new SingletonTask(executor, new FlowUpdateTask());
142         scheduleUpdate();
143
144         LOG.debug("Initialized OFOverlay policy manager");
145     }
146
147     private List<? extends OfTable> createFlowPipeline(OfContext ofCtx) {
148         // TODO - PORTSECURITY is kept in table 0.
149         // According to openflow spec,processing on vSwitch always starts from table 0.
150         // Packets will be droped if table 0 is empty.
151         // Alternative workaround - table-miss flow entries in table 0.
152         return ImmutableList.of(new PortSecurity(ofCtx, (short) 0),
153                                         new GroupTable(ofCtx),
154                                         new IngressNatMapper(ofCtx, getTABLEID_INGRESS_NAT()),
155                                         new SourceMapper(ofCtx, getTABLEID_SOURCE_MAPPER()),
156                                         new DestinationMapper(ofCtx, getTABLEID_DESTINATION_MAPPER()),
157                                         new PolicyEnforcer(ofCtx, getTABLEID_POLICY_ENFORCER()),
158                                         new EgressNatMapper(ofCtx, getTABLEID_EGRESS_NAT()),
159                                         new ExternalMapper(ofCtx, getTABLEID_EXTERNAL_MAPPER())
160                                         );
161     }
162
163     /**
164      * @param tableOffset the new offset value
165      * @return {@link ListenableFuture} to indicate that tables have been synced
166      */
167     public ListenableFuture<Void> changeOpenFlowTableOffset(final short tableOffset) {
168         try {
169             verifyMaxTableId(tableOffset);
170         } catch (IllegalArgumentException e) {
171             LOG.error("Cannot update table offset. Max. table ID would be out of range.\n{}", e);
172             // TODO - invalid offset value remains in conf DS
173             // It's not possible to validate offset value by using constrains in model,
174             // because number of tables in pipeline varies.
175             return Futures.immediateFuture(null);
176         }
177         List<Short> tableIDs = getTableIDs();
178         this.tableOffset = tableOffset;
179         return Futures.transform(removeUnusedTables(tableIDs), new Function<Void, Void>() {
180
181             @Override
182             public Void apply(Void tablesRemoved) {
183                 scheduleUpdate();
184                 return null;
185             }
186         });
187     }
188
189     /**
190      * @param  tableIDs - IDs of tables to delete
191      * @return ListenableFuture<Void> - which will be filled when clearing is done
192      */
193     private ListenableFuture<Void> removeUnusedTables(final List<Short> tableIDs) {
194         List<ListenableFuture<Void>> checkList = new ArrayList<>();
195         final ReadWriteTransaction rwTx = dataBroker.newReadWriteTransaction();
196         for (Short tableId : tableIDs) {
197             for (NodeId nodeId : switchManager.getReadySwitches()) {
198                 final InstanceIdentifier<Table> tablePath = FlowUtils.createTablePath(nodeId, tableId);
199                 checkList.add(deleteTableIfExists(rwTx, tablePath));
200             }
201         }
202         ListenableFuture<List<Void>> allAsListFuture = Futures.allAsList(checkList);
203         return Futures.transform(allAsListFuture, new AsyncFunction<List<Void>, Void>() {
204
205             @Override
206             public ListenableFuture<Void> apply(List<Void> readyToSubmit) {
207                 return rwTx.submit();
208             }
209         });
210     }
211
212     private List<Short> getTableIDs() {
213         List<Short> tableIds = new ArrayList<>();
214         tableIds.add(getTABLEID_PORTSECURITY());
215         tableIds.add(getTABLEID_INGRESS_NAT());
216         tableIds.add(getTABLEID_SOURCE_MAPPER());
217         tableIds.add(getTABLEID_DESTINATION_MAPPER());
218         tableIds.add(getTABLEID_POLICY_ENFORCER());
219         tableIds.add(getTABLEID_EGRESS_NAT());
220         tableIds.add(getTABLEID_EXTERNAL_MAPPER());
221         return tableIds;
222     }
223
224     private ListenableFuture<Void> deleteTableIfExists(final ReadWriteTransaction rwTx, final InstanceIdentifier<Table> tablePath){
225     return Futures.transform(rwTx.read(LogicalDatastoreType.CONFIGURATION, tablePath), new Function<Optional<Table>, Void>() {
226
227         @Override
228         public Void apply(Optional<Table> optTable) {
229             if(optTable.isPresent()){
230                 rwTx.delete(LogicalDatastoreType.CONFIGURATION, tablePath);
231             }
232             return null;
233         }});
234     }
235
236     // **************
237     // SwitchListener
238     // **************
239
240     public short getTABLEID_PORTSECURITY() {
241         return (short)(tableOffset+TABLEID_PORTSECURITY);
242     }
243
244
245     public short getTABLEID_INGRESS_NAT() {
246         return (short)(tableOffset+TABLEID_INGRESS_NAT);
247     }
248
249
250     public short getTABLEID_SOURCE_MAPPER() {
251         return (short)(tableOffset+TABLEID_SOURCE_MAPPER);
252     }
253
254
255     public short getTABLEID_DESTINATION_MAPPER() {
256         return (short)(tableOffset+TABLEID_DESTINATION_MAPPER);
257     }
258
259
260     public short getTABLEID_POLICY_ENFORCER() {
261         return (short)(tableOffset+TABLEID_POLICY_ENFORCER);
262     }
263
264
265     public short getTABLEID_EGRESS_NAT() {
266         return (short)(tableOffset+TABLEID_EGRESS_NAT);
267     }
268
269
270     public short getTABLEID_EXTERNAL_MAPPER() {
271         return (short)(tableOffset+TABLEID_EXTERNAL_MAPPER);
272     }
273
274
275     public TableId verifyMaxTableId(short tableOffset) {
276         return new TableId((short)(tableOffset+TABLEID_EXTERNAL_MAPPER));
277     }
278
279     @Override
280     public void switchReady(final NodeId nodeId) {
281         scheduleUpdate();
282     }
283
284     @Override
285     public void switchRemoved(NodeId sw) {
286         // XXX TODO purge switch flows
287         scheduleUpdate();
288     }
289
290     @Override
291     public void switchUpdated(NodeId sw) {
292         scheduleUpdate();
293     }
294
295     // ****************
296     // EndpointListener
297     // ****************
298
299     @Override
300     public void endpointUpdated(EpKey epKey) {
301         scheduleUpdate();
302     }
303
304     @Override
305     public void nodeEndpointUpdated(NodeId nodeId, EpKey epKey){
306         scheduleUpdate();
307     }
308
309     @Override
310     public void groupEndpointUpdated(EgKey egKey, EpKey epKey) {
311         // TODO a renderer should remove followed-EPG and followed-tenant at some point
312         if (dataBroker == null) {
313             LOG.error("DataBroker is null. Cannot write followed-epg {}", epKey);
314             return;
315         }
316         WriteTransaction wTx = dataBroker.newWriteOnlyTransaction();
317         FollowedEndpointGroup followedEpg = new FollowedEndpointGroupBuilder().setId(egKey.getEgId()).build();
318         wTx.put(LogicalDatastoreType.OPERATIONAL, IidFactory.followedEndpointgroupIid(OFOverlayRenderer.RENDERER_NAME,
319                 egKey.getTenantId(), egKey.getEgId()), followedEpg, true);
320         DataStoreHelper.submitToDs(wTx);
321         scheduleUpdate();
322     }
323
324     // **************
325     // DataTreeChangeListener<ResolvedPolicy>
326     // **************
327
328     @Override
329     public void onDataTreeChanged(Collection<DataTreeModification<ResolvedPolicy>> changes) {
330         scheduleUpdate();
331     }
332
333     // *************
334     // PolicyManager
335     // *************
336
337     /**
338      * Set the learning mode to the specified value
339      * @param learningMode the learning mode to set
340      */
341     public void setLearningMode(LearningMode learningMode) {
342         // No-op for now
343     }
344
345     // **************
346     // Implementation
347     // **************
348
349     private void scheduleUpdate() {
350         if (switchManager != null) {
351             LOG.trace("Scheduling flow update task");
352
353             // Mark all existing flows as previous - will be compared with new ones
354             previousGbpFlows = actualGbpFlows;
355             actualGbpFlows = new HashMap<>();
356
357             flowUpdateTask.reschedule(FLOW_UPDATE_DELAY, TimeUnit.MILLISECONDS);
358         }
359     }
360
361     /**
362      * Update the flows on a particular switch
363      */
364     private class SwitchFlowUpdateTask implements Callable<Void> {
365         private final OfWriter ofWriter;
366
367         public SwitchFlowUpdateTask(OfWriter ofWriter) {
368             this.ofWriter = ofWriter;
369         }
370
371         @Override
372         public Void call() throws Exception {
373             OfContext ofCtx = new OfContext(dataBroker, PolicyManager.this, switchManager, endpointManager, executor);
374             if (ofCtx.getCurrentPolicy() == null)
375                 return null;
376             List<? extends OfTable> flowPipeline = createFlowPipeline(ofCtx);
377             for (NodeId node : switchManager.getReadySwitches()) {
378                 for (OfTable table : flowPipeline) {
379                     try {
380                         table.sync(node, ofWriter);
381                     } catch (Exception e) {
382                         LOG.error("Failed to write Openflow table {}",
383                                 table.getClass().getSimpleName(), e);
384                     }
385                 }
386             }
387             return null;
388         }
389     }
390
391     /**
392      * Update all flows on all switches as needed.  Note that this will block
393      * one of the threads on the executor.
394      */
395     private class FlowUpdateTask implements Runnable {
396         @Override
397         public void run() {
398             LOG.debug("Beginning flow update task");
399
400             CompletionService<Void> ecs
401                 = new ExecutorCompletionService<>(executor);
402
403             OfWriter ofWriter = new OfWriter();
404
405             SwitchFlowUpdateTask swut = new SwitchFlowUpdateTask(ofWriter);
406             ecs.submit(swut);
407
408             try {
409                 ecs.take().get();
410                 // Current gbp flow must be independent, find out where this run() ends,
411                 // set flows to one field and reset another
412                 actualGbpFlows.putAll(ofWriter.commitToDataStore(dataBroker, previousGbpFlows));
413             } catch (InterruptedException | ExecutionException e) {
414                 LOG.error("Failed to update flow tables", e);
415             }
416             LOG.debug("Flow update completed");
417         }
418     }
419
420     @Override
421     public void close() throws IOException {
422         if (registerDataTreeChangeListener != null)
423             registerDataTreeChangeListener.close();
424         // TODO unregister classifier and action instance validators
425     }
426
427 }