2b8169b995186a738ab37d9c440c1daff39f8bce
[groupbasedpolicy.git] / renderers / ofoverlay / src / main / java / org / opendaylight / groupbasedpolicy / renderer / ofoverlay / PolicyManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.groupbasedpolicy.renderer.ofoverlay;
10
11 import java.util.ArrayList;
12 import java.util.List;
13 import java.util.Map.Entry;
14 import java.util.Set;
15 import java.util.concurrent.Callable;
16 import java.util.concurrent.CompletionService;
17 import java.util.concurrent.ExecutionException;
18 import java.util.concurrent.ExecutorCompletionService;
19 import java.util.concurrent.ScheduledExecutorService;
20 import java.util.concurrent.TimeUnit;
21
22 import com.google.common.base.Function;
23 import com.google.common.base.Optional;
24 import com.google.common.collect.ImmutableList;
25 import com.google.common.util.concurrent.AsyncFunction;
26 import com.google.common.util.concurrent.Futures;
27 import com.google.common.util.concurrent.ListenableFuture;
28 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
29 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
30 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
31 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
32 import org.opendaylight.groupbasedpolicy.endpoint.EpKey;
33 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.endpoint.EndpointManager;
34 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.DestinationMapper;
35 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.EgressNatMapper;
36 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.ExternalMapper;
37 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.FlowUtils;
38 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.GroupTable;
39 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.IngressNatMapper;
40 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.OfTable;
41 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.PolicyEnforcer;
42 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.PortSecurity;
43 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.SourceMapper;
44 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.node.SwitchListener;
45 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.node.SwitchManager;
46 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.sf.Action;
47 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.sf.SubjectFeatures;
48 import org.opendaylight.groupbasedpolicy.resolver.EgKey;
49 import org.opendaylight.groupbasedpolicy.resolver.PolicyInfo;
50 import org.opendaylight.groupbasedpolicy.resolver.PolicyListener;
51 import org.opendaylight.groupbasedpolicy.resolver.PolicyResolver;
52 import org.opendaylight.groupbasedpolicy.resolver.PolicyScope;
53 import org.opendaylight.groupbasedpolicy.util.SingletonTask;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.ActionDefinitionId;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.ofoverlay.rev140528.OfOverlayConfig.LearningMode;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableId;
59 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
60 import org.slf4j.Logger;
61 import org.slf4j.LoggerFactory;
62
63 /**
64  * Manage policies on switches by subscribing to updates from the
65  * policy resolver and information about endpoints from the endpoint
66  * registry
67  */
68 public class PolicyManager
69      implements SwitchListener, PolicyListener, EndpointListener {
70     private static final Logger LOG =
71             LoggerFactory.getLogger(PolicyManager.class);
72
73     private short tableOffset;
74     private static final short TABLEID_PORTSECURITY = 0;
75     private static final short TABLEID_INGRESS_NAT =  1;
76     private static final short TABLEID_SOURCE_MAPPER = 2;
77     private static final short TABLEID_DESTINATION_MAPPER = 3;
78     private static final short TABLEID_POLICY_ENFORCER = 4;
79     private static final short TABLEID_EGRESS_NAT = 5;
80     private static final short TABLEID_EXTERNAL_MAPPER = 6;
81
82     private final SwitchManager switchManager;
83     private final PolicyResolver policyResolver;
84
85     private final PolicyScope policyScope;
86
87     private final ScheduledExecutorService executor;
88     private final SingletonTask flowUpdateTask;
89     private final DataBroker dataBroker;
90     private final OfContext ofCtx;
91     /**
92      * The flow tables that make up the processing pipeline
93      */
94     private List<? extends OfTable> flowPipeline;
95
96     /**
97      * The delay before triggering the flow update task in response to an
98      * event in milliseconds.
99      */
100     private final static int FLOW_UPDATE_DELAY = 250;
101
102     public PolicyManager(DataBroker dataBroker,
103                          PolicyResolver policyResolver,
104                          SwitchManager switchManager,
105                          EndpointManager endpointManager,
106                          RpcProviderRegistry rpcRegistry,
107                          ScheduledExecutorService executor,
108                          short tableOffset) {
109         super();
110         this.switchManager = switchManager;
111         this.executor = executor;
112         this.policyResolver = policyResolver;
113         this.dataBroker = dataBroker;
114         this.tableOffset = tableOffset;
115         try {
116             // to validate against model
117             verifyMaxTableId(tableOffset);
118         } catch (IllegalArgumentException e) {
119             throw new IllegalArgumentException("Failed to start OF-Overlay renderer\n."
120                     + "Max. table ID would be out of range. Check config-subsystem.\n{}", e);
121         }
122
123         for(Entry<ActionDefinitionId, Action> entry : SubjectFeatures.getActions().entrySet()) {
124             policyResolver.registerActionDefinitions(entry.getKey(), entry.getValue());
125         }
126
127         ofCtx = new OfContext(dataBroker, rpcRegistry,
128                                         this, policyResolver, switchManager,
129                                         endpointManager, executor);
130
131         flowPipeline = createFlowPipeline();
132
133         policyScope = policyResolver.registerListener(this);
134         if (switchManager != null)
135             switchManager.registerListener(this);
136         endpointManager.registerListener(this);
137
138         flowUpdateTask = new SingletonTask(executor, new FlowUpdateTask());
139         scheduleUpdate();
140
141         LOG.debug("Initialized OFOverlay policy manager");
142     }
143
144     private List<? extends OfTable> createFlowPipeline() {
145         // TODO - PORTSECURITY is kept in table 0.
146         // According to openflow spec,processing on vSwitch always starts from table 0.
147         // Packets will be droped if table 0 is empty.
148         // Alternative workaround - table-miss flow entries in table 0.
149         return ImmutableList.of(new PortSecurity(ofCtx, (short) 0),
150                                         new GroupTable(ofCtx),
151                                         new IngressNatMapper(ofCtx, getTABLEID_INGRESS_NAT()),
152                                         new SourceMapper(ofCtx, getTABLEID_SOURCE_MAPPER()),
153                                         new DestinationMapper(ofCtx, getTABLEID_DESTINATION_MAPPER()),
154                                         new PolicyEnforcer(ofCtx, getTABLEID_POLICY_ENFORCER()),
155                                         new EgressNatMapper(ofCtx, getTABLEID_EGRESS_NAT()),
156                                         new ExternalMapper(ofCtx, getTABLEID_EXTERNAL_MAPPER())
157                                         );
158     }
159
160     /**
161      * @param tableOffset the new offset value
162      * @return {@link ListenableFuture} to indicate that tables have been synced
163      */
164     public ListenableFuture<Void> changeOpenFlowTableOffset(final short tableOffset) {
165         try {
166             verifyMaxTableId(tableOffset);
167         } catch (IllegalArgumentException e) {
168             LOG.error("Cannot update table offset. Max. table ID would be out of range.\n{}", e);
169             // TODO - invalid offset value remains in conf DS
170             // It's not possible to validate offset value by using constrains in model,
171             // because number of tables in pipeline varies.
172             return Futures.immediateFuture(null);
173         }
174         List<Short> tableIDs = getTableIDs();
175         this.tableOffset = tableOffset;
176         return Futures.transform(removeUnusedTables(tableIDs), new Function<Void, Void>() {
177
178             @Override
179             public Void apply(Void tablesRemoved) {
180                 flowPipeline = createFlowPipeline();
181                 scheduleUpdate();
182                 return null;
183             }
184         });
185     }
186
187     /**
188      * @param  tableIDs - IDs of tables to delete
189      * @return ListenableFuture<Void> - which will be filled when clearing is done
190      */
191     private ListenableFuture<Void> removeUnusedTables(final List<Short> tableIDs) {
192         List<ListenableFuture<Void>> checkList = new ArrayList<>();
193         final ReadWriteTransaction rwTx = dataBroker.newReadWriteTransaction();
194         for (Short tableId : tableIDs) {
195             for (NodeId nodeId : switchManager.getReadySwitches()) {
196                 final InstanceIdentifier<Table> tablePath = FlowUtils.createTablePath(nodeId, tableId);
197                 checkList.add(deteleTableIfExists(rwTx, tablePath));
198             }
199         }
200         ListenableFuture<List<Void>> allAsListFuture = Futures.allAsList(checkList);
201         return Futures.transform(allAsListFuture, new AsyncFunction<List<Void>, Void>() {
202
203             @Override
204             public ListenableFuture<Void> apply(List<Void> readyToSubmit) {
205                 return rwTx.submit();
206             }
207         });
208     }
209
210     private List<Short> getTableIDs() {
211         List<Short> tableIds = new ArrayList<>();
212         tableIds.add(getTABLEID_PORTSECURITY());
213         tableIds.add(getTABLEID_INGRESS_NAT());
214         tableIds.add(getTABLEID_SOURCE_MAPPER());
215         tableIds.add(getTABLEID_DESTINATION_MAPPER());
216         tableIds.add(getTABLEID_POLICY_ENFORCER());
217         tableIds.add(getTABLEID_EGRESS_NAT());
218         tableIds.add(getTABLEID_EXTERNAL_MAPPER());
219         return tableIds;
220     }
221
222     private ListenableFuture<Void> deteleTableIfExists(final ReadWriteTransaction rwTx, final InstanceIdentifier<Table> tablePath){
223     return Futures.transform(rwTx.read(LogicalDatastoreType.CONFIGURATION, tablePath), new Function<Optional<Table>, Void>() {
224
225         @Override
226         public Void apply(Optional<Table> optTable) {
227             if(optTable.isPresent()){
228                 rwTx.delete(LogicalDatastoreType.CONFIGURATION, tablePath);
229             }
230             return null;
231         }});
232     }
233
234     // **************
235     // SwitchListener
236     // **************
237
238     public short getTABLEID_PORTSECURITY() {
239         return (short)(tableOffset+TABLEID_PORTSECURITY);
240     }
241
242
243     public short getTABLEID_INGRESS_NAT() {
244         return (short)(tableOffset+TABLEID_INGRESS_NAT);
245     }
246
247
248     public short getTABLEID_SOURCE_MAPPER() {
249         return (short)(tableOffset+TABLEID_SOURCE_MAPPER);
250     }
251
252
253     public short getTABLEID_DESTINATION_MAPPER() {
254         return (short)(tableOffset+TABLEID_DESTINATION_MAPPER);
255     }
256
257
258     public short getTABLEID_POLICY_ENFORCER() {
259         return (short)(tableOffset+TABLEID_POLICY_ENFORCER);
260     }
261
262
263     public short getTABLEID_EGRESS_NAT() {
264         return (short)(tableOffset+TABLEID_EGRESS_NAT);
265     }
266
267
268     public short getTABLEID_EXTERNAL_MAPPER() {
269         return (short)(tableOffset+TABLEID_EXTERNAL_MAPPER);
270     }
271
272
273     public TableId verifyMaxTableId(short tableOffset) {
274         return new TableId((short)(tableOffset+TABLEID_EXTERNAL_MAPPER));
275     }
276
277     @Override
278     public void switchReady(final NodeId nodeId) {
279         scheduleUpdate();
280     }
281
282     @Override
283     public void switchRemoved(NodeId sw) {
284         // XXX TODO purge switch flows
285         scheduleUpdate();
286     }
287
288     @Override
289     public void switchUpdated(NodeId sw) {
290         scheduleUpdate();
291     }
292
293     // ****************
294     // EndpointListener
295     // ****************
296
297     @Override
298     public void endpointUpdated(EpKey epKey) {
299         scheduleUpdate();
300     }
301
302     @Override
303     public void nodeEndpointUpdated(NodeId nodeId, EpKey epKey){
304         scheduleUpdate();
305     }
306
307     @Override
308     public void groupEndpointUpdated(EgKey egKey, EpKey epKey) {
309         policyScope.addToScope(egKey.getTenantId(), egKey.getEgId());
310         scheduleUpdate();
311     }
312
313     // **************
314     // PolicyListener
315     // **************
316
317     @Override
318     public void policyUpdated(Set<EgKey> updatedConsumers) {
319         scheduleUpdate();
320     }
321
322     // *************
323     // PolicyManager
324     // *************
325
326     /**
327      * Set the learning mode to the specified value
328      * @param learningMode the learning mode to set
329      */
330     public void setLearningMode(LearningMode learningMode) {
331         // No-op for now
332     }
333
334     // **************
335     // Implementation
336     // **************
337
338     private void scheduleUpdate() {
339         if (switchManager != null) {
340             LOG.trace("Scheduling flow update task");
341             flowUpdateTask.reschedule(FLOW_UPDATE_DELAY, TimeUnit.MILLISECONDS);
342         }
343     }
344
345     /**
346      * Update the flows on a particular switch
347      */
348     private class SwitchFlowUpdateTask implements Callable<Void> {
349         private OfWriter ofWriter;
350
351         public SwitchFlowUpdateTask(OfWriter ofWriter) {
352             super();
353             this.ofWriter = ofWriter;
354         }
355
356         @Override
357         public Void call() throws Exception {
358             for (NodeId node : switchManager.getReadySwitches()) {
359                 PolicyInfo info = policyResolver.getCurrentPolicy();
360                 if (info == null)
361                     return null;
362                 for (OfTable table : flowPipeline) {
363                     try {
364                         table.update(node, info, ofWriter);
365                     } catch (Exception e) {
366                         LOG.error("Failed to write Openflow table {}",
367                                 table.getClass().getSimpleName(), e);
368                     }
369                 }
370             }
371             return null;
372         }
373     }
374
375     /**
376      * Update all flows on all switches as needed.  Note that this will block
377      * one of the threads on the executor.
378      */
379     private class FlowUpdateTask implements Runnable {
380         @Override
381         public void run() {
382             LOG.debug("Beginning flow update task");
383
384             CompletionService<Void> ecs
385                 = new ExecutorCompletionService<>(executor);
386             int n = 0;
387
388             OfWriter ofWriter = new OfWriter();
389
390             SwitchFlowUpdateTask swut = new SwitchFlowUpdateTask(ofWriter);
391             ecs.submit(swut);
392             n+=1;
393
394             for (int i = 0; i < n; i++) {
395                 try {
396                     ecs.take().get();
397                     ofWriter.commitToDataStore(dataBroker);
398                 } catch (InterruptedException | ExecutionException e) {
399                     LOG.error("Failed to update flow tables", e);
400                 }
401             }
402             LOG.debug("Flow update completed");
403         }
404     }
405
406
407
408
409
410 }