90d88c9488ab7f96223b7c7cd59e587a1b986df1
[groupbasedpolicy.git] / renderers / ofoverlay / src / main / java / org / opendaylight / groupbasedpolicy / renderer / ofoverlay / PolicyManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.groupbasedpolicy.renderer.ofoverlay;
10
11 import com.google.common.base.Equivalence;
12 import com.google.common.base.Optional;
13 import com.google.common.base.Preconditions;
14 import com.google.common.collect.Collections2;
15 import com.google.common.collect.ImmutableList;
16 import com.google.common.collect.Sets;
17 import com.google.common.util.concurrent.CheckedFuture;
18 import com.google.common.util.concurrent.FutureCallback;
19 import com.google.common.util.concurrent.Futures;
20 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
21 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
22 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
23 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
24 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
25 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
26 import org.opendaylight.groupbasedpolicy.endpoint.EpKey;
27 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.equivalence.EquivalenceFabric;
28 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.DestinationMapper;
29 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.ExternalMapper;
30 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.FlowUtils;
31 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.GroupTable;
32 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.EgressNatMapper;
33 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.IngressNatMapper;
34 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.OfTable;
35 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.PolicyEnforcer;
36 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.PortSecurity;
37 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.SourceMapper;
38 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.node.SwitchListener;
39 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.node.SwitchManager;
40 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.sf.Action;
41 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.sf.SubjectFeatures;
42 import org.opendaylight.groupbasedpolicy.resolver.EgKey;
43 import org.opendaylight.groupbasedpolicy.resolver.PolicyInfo;
44 import org.opendaylight.groupbasedpolicy.resolver.PolicyListener;
45 import org.opendaylight.groupbasedpolicy.resolver.PolicyResolver;
46 import org.opendaylight.groupbasedpolicy.resolver.PolicyScope;
47 import org.opendaylight.groupbasedpolicy.util.SingletonTask;
48 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev100924.MacAddress;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.ActionDefinitionId;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.ofoverlay.rev140528.OfOverlayConfig.LearningMode;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.SubjectFeatureDefinitions;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
56 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
57 import org.slf4j.Logger;
58 import org.slf4j.LoggerFactory;
59
60 import java.util.ArrayList;
61 import java.util.HashSet;
62 import java.util.List;
63 import java.util.Map.Entry;
64 import java.util.Set;
65 import java.util.concurrent.Callable;
66 import java.util.concurrent.CompletionService;
67 import java.util.concurrent.ConcurrentHashMap;
68 import java.util.concurrent.ConcurrentMap;
69 import java.util.concurrent.ExecutionException;
70 import java.util.concurrent.ExecutorCompletionService;
71 import java.util.concurrent.ScheduledExecutorService;
72 import java.util.concurrent.TimeUnit;
73
74 /**
75  * Manage policies on switches by subscribing to updates from the
76  * policy resolver and information about endpoints from the endpoint
77  * registry
78  */
79 public class PolicyManager
80      implements SwitchListener, PolicyListener, EndpointListener {
81     private static final Logger LOG =
82             LoggerFactory.getLogger(PolicyManager.class);
83
84     private short tableOffset;
85     private final short TABLEID_PORTSECURITY = 0;
86     private final short TABLEID_INGRESS_NAT = (short) (tableOffset+1);
87     private final short TABLEID_SOURCE_MAPPER = (short) (tableOffset+2);
88     private final short TABLEID_DESTINATION_MAPPER = (short) (tableOffset+3);
89     private final short TABLEID_POLICY_ENFORCER = (short) (tableOffset+4);
90     private final short TABLEID_EGRESS_NAT = (short) (tableOffset+5);
91     private final short TABLEID_EXTERNAL_MAPPER = (short) (tableOffset+6);
92
93     private static MacAddress externaMacAddress;
94
95     private final SwitchManager switchManager;
96     private final PolicyResolver policyResolver;
97
98     private final PolicyScope policyScope;
99
100     private final ScheduledExecutorService executor;
101     private final SingletonTask flowUpdateTask;
102     private final DataBroker dataBroker;
103
104     /**
105      * The flow tables that make up the processing pipeline
106      */
107     private final List<? extends OfTable> flowPipeline;
108
109     /**
110      * The delay before triggering the flow update task in response to an
111      * event in milliseconds.
112      */
113     private final static int FLOW_UPDATE_DELAY = 250;
114
115     public PolicyManager(DataBroker dataBroker,
116                          PolicyResolver policyResolver,
117                          SwitchManager switchManager,
118                          EndpointManager endpointManager,
119                          RpcProviderRegistry rpcRegistry,
120                          ScheduledExecutorService executor,
121                          short tableOffset,
122                          MacAddress externalRouterMac) {
123         super();
124         this.switchManager = switchManager;
125         this.executor = executor;
126         this.policyResolver = policyResolver;
127         this.dataBroker = dataBroker;
128         this.tableOffset=tableOffset;
129         this.externaMacAddress=externalRouterMac;
130
131
132         if (dataBroker != null) {
133             WriteTransaction t = dataBroker.newWriteOnlyTransaction();
134             t.put(LogicalDatastoreType.OPERATIONAL,
135                   InstanceIdentifier
136                       .builder(SubjectFeatureDefinitions.class)
137                       .build(),
138                   SubjectFeatures.OF_OVERLAY_FEATURES);
139             t.submit();
140         }
141
142         for(Entry<ActionDefinitionId, Action> entry : SubjectFeatures.getActions().entrySet()) {
143             policyResolver.registerActionDefinitions(entry.getKey(), entry.getValue());
144         }
145
146         OfContext ctx = new OfContext(dataBroker, rpcRegistry,
147                                         this, policyResolver, switchManager,
148                                         endpointManager, executor);
149
150         flowPipeline = ImmutableList.of(new PortSecurity(ctx,TABLEID_PORTSECURITY),
151                                         new GroupTable(ctx),
152                                         new IngressNatMapper(ctx,TABLEID_INGRESS_NAT),
153                                         new SourceMapper(ctx,TABLEID_SOURCE_MAPPER),
154                                         new DestinationMapper(ctx,TABLEID_DESTINATION_MAPPER),
155                                         new PolicyEnforcer(ctx,TABLEID_POLICY_ENFORCER),
156                                         new EgressNatMapper(ctx,TABLEID_EGRESS_NAT),
157                                         new ExternalMapper(ctx,TABLEID_EXTERNAL_MAPPER)
158                                         );
159
160         policyScope = policyResolver.registerListener(this);
161         if (switchManager != null)
162             switchManager.registerListener(this);
163         endpointManager.registerListener(this);
164
165         flowUpdateTask = new SingletonTask(executor, new FlowUpdateTask());
166         scheduleUpdate();
167
168         LOG.debug("Initialized OFOverlay policy manager");
169     }
170
171     // **************
172     // SwitchListener
173     // **************
174
175
176     public short getTABLEID_PORTSECURITY() {
177         return TABLEID_PORTSECURITY;
178     }
179
180
181     public short getTABLEID_INGRESS_NAT() {
182         return TABLEID_INGRESS_NAT;
183     }
184
185
186     public short getTABLEID_SOURCE_MAPPER() {
187         return TABLEID_SOURCE_MAPPER;
188     }
189
190
191     public short getTABLEID_DESTINATION_MAPPER() {
192         return TABLEID_DESTINATION_MAPPER;
193     }
194
195
196     public short getTABLEID_POLICY_ENFORCER() {
197         return TABLEID_POLICY_ENFORCER;
198     }
199
200
201     public short getTABLEID_EGRESS_NAT() {
202         return TABLEID_EGRESS_NAT;
203     }
204
205
206     public short getTABLEID_EXTERNAL_MAPPER() {
207         return TABLEID_EXTERNAL_MAPPER;
208     }
209
210     @Override
211     public void switchReady(final NodeId nodeId) {
212         scheduleUpdate();
213     }
214
215     @Override
216     public void switchRemoved(NodeId sw) {
217         // XXX TODO purge switch flows
218         scheduleUpdate();
219     }
220
221     @Override
222     public void switchUpdated(NodeId sw) {
223         scheduleUpdate();
224     }
225
226     // ****************
227     // EndpointListener
228     // ****************
229
230     @Override
231     public void endpointUpdated(EpKey epKey) {
232         scheduleUpdate();
233     }
234
235     @Override
236     public void nodeEndpointUpdated(NodeId nodeId, EpKey epKey){
237         scheduleUpdate();
238     }
239
240     @Override
241     public void groupEndpointUpdated(EgKey egKey, EpKey epKey) {
242         policyScope.addToScope(egKey.getTenantId(), egKey.getEgId());
243         scheduleUpdate();
244     }
245
246     // **************
247     // PolicyListener
248     // **************
249
250     @Override
251     public void policyUpdated(Set<EgKey> updatedConsumers) {
252         scheduleUpdate();
253     }
254
255     // *************
256     // PolicyManager
257     // *************
258
259     /**
260      * Set the learning mode to the specified value
261      * @param learningMode the learning mode to set
262      */
263     public void setLearningMode(LearningMode learningMode) {
264         // No-op for now
265     }
266
267     public static MacAddress getExternaMacAddress() {
268         return externaMacAddress;
269     }
270
271     // **************
272     // Implementation
273     // **************
274
275     public class FlowMap{
276         private ConcurrentMap<InstanceIdentifier<Table>, TableBuilder> flowMap = new ConcurrentHashMap<>();
277
278         public FlowMap() {
279         }
280
281         public TableBuilder getTableForNode(NodeId nodeId, short tableId) {
282             InstanceIdentifier<Table> tableIid = FlowUtils.createTablePath(nodeId, tableId);
283             if(this.flowMap.get(tableIid) == null) {
284                 this.flowMap.put(tableIid, new TableBuilder().setId(tableId));
285                 this.flowMap.get(tableIid).setFlow(new ArrayList<Flow>());
286             }
287             return this.flowMap.get(tableIid);
288         }
289
290         public void writeFlow(NodeId nodeId, short tableId, Flow flow) {
291             TableBuilder tableBuilder = this.getTableForNode(nodeId, tableId);
292             // transforming List<Flow> to Set (with customized equals/hashCode) to eliminate duplicate entries
293             List<Flow> flows = tableBuilder.getFlow();
294             Set<Equivalence.Wrapper<Flow>> wrappedFlows =
295                     new HashSet<>(Collections2.transform(flows, EquivalenceFabric.FLOW_WRAPPER_FUNCTION));
296
297             Equivalence.Wrapper<Flow> wFlow = EquivalenceFabric.FLOW_EQUIVALENCE.wrap(flow);
298
299             if (!wrappedFlows.contains(wFlow)) {
300                 tableBuilder.getFlow().add(Preconditions.checkNotNull(flow));
301             }
302         }
303
304         public void commitToDataStore() {
305             if (dataBroker != null) {
306                 for( Entry<InstanceIdentifier<Table>, TableBuilder> entry : flowMap.entrySet()) {
307                     try {
308                         /*
309                          * Get the currently configured flows for
310                          * this table.
311                          */
312                         updateFlowTable(entry);
313                     } catch (Exception e) {
314                         LOG.warn("Couldn't read flow table {}", entry.getKey());
315                     }
316                 }
317             }
318         }
319
320         private void updateFlowTable(Entry<InstanceIdentifier<Table>,
321                                      TableBuilder> entry)  throws Exception {
322             // flows to update
323             Set<Flow> update = new HashSet<>(entry.getValue().getFlow());
324             // flows currently in the table
325             Set<Flow> curr = new HashSet<>();
326
327             ReadWriteTransaction t = dataBroker.newReadWriteTransaction();
328             Optional<Table> r =
329                    t.read(LogicalDatastoreType.CONFIGURATION, entry.getKey()).get();
330
331             if (r.isPresent()) {
332                 Table currentTable = r.get();
333                 curr = new HashSet<>(currentTable.getFlow());
334             }
335
336             // Sets with custom equivalence rules
337             Set<Equivalence.Wrapper<Flow>> oldFlows =
338                     new HashSet<>(Collections2.transform(curr, EquivalenceFabric.FLOW_WRAPPER_FUNCTION));
339             Set<Equivalence.Wrapper<Flow>> updatedFlows =
340                     new HashSet<>(Collections2.transform(update, EquivalenceFabric.FLOW_WRAPPER_FUNCTION));
341
342             // what is still there but was not updated, needs to be deleted
343             Sets.SetView<Equivalence.Wrapper<Flow>> deletions =
344                     Sets.difference(oldFlows, updatedFlows);
345             // new flows (they were not there before)
346             Sets.SetView<Equivalence.Wrapper<Flow>> additions =
347                     Sets.difference(updatedFlows, oldFlows);
348
349             if (!deletions.isEmpty()) {
350                 for (Equivalence.Wrapper<Flow> wf: deletions) {
351                     Flow f = wf.get();
352                     if (f != null) {
353                         t.delete(LogicalDatastoreType.CONFIGURATION,
354                                 FlowUtils.createFlowPath(entry.getKey(), f.getId()));
355                     }
356                 }
357             }
358             if (!additions.isEmpty()) {
359                 for (Equivalence.Wrapper<Flow> wf: additions) {
360                     Flow f = wf.get();
361                     if (f != null) {
362                         t.put(LogicalDatastoreType.CONFIGURATION,
363                                 FlowUtils.createFlowPath(entry.getKey(), f.getId()), f, true);
364                     }
365                 }
366             }
367             CheckedFuture<Void, TransactionCommitFailedException> f = t.submit();
368             Futures.addCallback(f, new FutureCallback<Void>() {
369                 @Override
370                 public void onFailure(Throwable t) {
371                     LOG.error("Could not write flow table {}", t);
372                 }
373
374                 @Override
375                 public void onSuccess(Void result) {
376                     LOG.debug("Flow table updated.");
377                 }
378             });
379         }
380
381         private void purgeFromDataStore() {
382             // TODO: tbachman: Remove for Lithium -- this is a workaround
383             //       where some flow-mods aren't getting installed
384             //       on vSwitches when changing L3 contexts
385             WriteTransaction d = dataBroker.newWriteOnlyTransaction();
386
387             for( Entry<InstanceIdentifier<Table>, TableBuilder> entry : flowMap.entrySet()) {
388                 d.delete(LogicalDatastoreType.CONFIGURATION, entry.getKey());
389             }
390
391             CheckedFuture<Void, TransactionCommitFailedException> fu = d.submit();
392             Futures.addCallback(fu, new FutureCallback<Void>() {
393                 @Override
394                 public void onFailure(Throwable th) {
395                     LOG.error("Could not write flow table.", th);
396                 }
397
398                 @Override
399                 public void onSuccess(Void result) {
400                     LOG.debug("Flow table updated.");
401                 }
402             });
403         }
404
405     }
406
407     private void scheduleUpdate() {
408         if (switchManager != null) {
409             LOG.trace("Scheduling flow update task");
410             flowUpdateTask.reschedule(FLOW_UPDATE_DELAY, TimeUnit.MILLISECONDS);
411         }
412     }
413
414     /**
415      * Update the flows on a particular switch
416      */
417     private class SwitchFlowUpdateTask implements Callable<Void> {
418         private FlowMap flowMap;
419
420         public SwitchFlowUpdateTask(FlowMap flowMap) {
421             super();
422             this.flowMap = flowMap;
423         }
424
425         @Override
426         public Void call() throws Exception {
427             for (NodeId node : switchManager.getReadySwitches()) {
428                 PolicyInfo info = policyResolver.getCurrentPolicy();
429                 if (info == null)
430                     return null;
431                 for (OfTable table : flowPipeline) {
432                     try {
433                         table.update(node, info, flowMap);
434                     } catch (Exception e) {
435                         LOG.error("Failed to write flow table {}",
436                                 table.getClass().getSimpleName(), e);
437                     }
438                 }
439             }
440             return null;
441         }
442     }
443
444     /**
445      * Update all flows on all switches as needed.  Note that this will block
446      * one of the threads on the executor.
447      */
448     private class FlowUpdateTask implements Runnable {
449         @Override
450         public void run() {
451             LOG.debug("Beginning flow update task");
452
453             CompletionService<Void> ecs
454                 = new ExecutorCompletionService<>(executor);
455             int n = 0;
456
457             FlowMap flowMap = new FlowMap();
458
459             SwitchFlowUpdateTask swut = new SwitchFlowUpdateTask(flowMap);
460             ecs.submit(swut);
461             n+=1;
462
463             for (int i = 0; i < n; i++) {
464                 try {
465                     ecs.take().get();
466                     flowMap.commitToDataStore();
467                 } catch (InterruptedException | ExecutionException e) {
468                     LOG.error("Failed to update flow tables", e);
469                 }
470             }
471             LOG.debug("Flow update completed");
472         }
473     }
474
475
476
477
478
479 }