Bug 3244 - SFC Improvements for distributed classifier, robustness
[groupbasedpolicy.git] / renderers / ofoverlay / src / main / java / org / opendaylight / groupbasedpolicy / renderer / ofoverlay / PolicyManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.groupbasedpolicy.renderer.ofoverlay;
10
11 import com.google.common.base.Equivalence;
12 import com.google.common.base.Optional;
13 import com.google.common.base.Preconditions;
14 import com.google.common.collect.Collections2;
15 import com.google.common.collect.ImmutableList;
16 import com.google.common.collect.Sets;
17 import com.google.common.util.concurrent.CheckedFuture;
18 import com.google.common.util.concurrent.FutureCallback;
19 import com.google.common.util.concurrent.Futures;
20 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
21 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
22 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
23 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
24 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
25 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
26 import org.opendaylight.groupbasedpolicy.endpoint.EpKey;
27 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.equivalence.EquivalenceFabric;
28 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.DestinationMapper;
29 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.ExternalMapper;
30 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.FlowUtils;
31 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.GroupTable;
32 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.EgressNatMapper;
33 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.IngressNatMapper;
34 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.OfTable;
35 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.PolicyEnforcer;
36 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.PortSecurity;
37 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.SourceMapper;
38 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.node.SwitchListener;
39 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.node.SwitchManager;
40 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.sf.Action;
41 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.sf.SubjectFeatures;
42 import org.opendaylight.groupbasedpolicy.resolver.EgKey;
43 import org.opendaylight.groupbasedpolicy.resolver.PolicyInfo;
44 import org.opendaylight.groupbasedpolicy.resolver.PolicyListener;
45 import org.opendaylight.groupbasedpolicy.resolver.PolicyResolver;
46 import org.opendaylight.groupbasedpolicy.resolver.PolicyScope;
47 import org.opendaylight.groupbasedpolicy.util.SingletonTask;
48 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev100924.MacAddress;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.ActionDefinitionId;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.ofoverlay.rev140528.OfOverlayConfig.LearningMode;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.SubjectFeatureDefinitions;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
56 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
57 import org.slf4j.Logger;
58 import org.slf4j.LoggerFactory;
59
60 import java.util.ArrayList;
61 import java.util.HashSet;
62 import java.util.List;
63 import java.util.Map.Entry;
64 import java.util.Set;
65 import java.util.concurrent.Callable;
66 import java.util.concurrent.CompletionService;
67 import java.util.concurrent.ConcurrentHashMap;
68 import java.util.concurrent.ConcurrentMap;
69 import java.util.concurrent.ExecutionException;
70 import java.util.concurrent.ExecutorCompletionService;
71 import java.util.concurrent.ScheduledExecutorService;
72 import java.util.concurrent.TimeUnit;
73
74 /**
75  * Manage policies on switches by subscribing to updates from the
76  * policy resolver and information about endpoints from the endpoint
77  * registry
78  */
79 public class PolicyManager
80      implements SwitchListener, PolicyListener, EndpointListener {
81     private static final Logger LOG =
82             LoggerFactory.getLogger(PolicyManager.class);
83
84     private short tableOffset;
85     private final short TABLEID_PORTSECURITY = 0;
86     private final short TABLEID_INGRESS_NAT = (short) (tableOffset+1);
87     private final short TABLEID_SOURCE_MAPPER = (short) (tableOffset+2);
88     private final short TABLEID_DESTINATION_MAPPER = (short) (tableOffset+3);
89     private final short TABLEID_POLICY_ENFORCER = (short) (tableOffset+4);
90     private final short TABLEID_EGRESS_NAT = (short) (tableOffset+5);
91     private final short TABLEID_EXTERNAL_MAPPER = (short) (tableOffset+6);
92
93     private static MacAddress externaMacAddress;
94
95     private final SwitchManager switchManager;
96     private final PolicyResolver policyResolver;
97
98     private final PolicyScope policyScope;
99
100     private final ScheduledExecutorService executor;
101     private final SingletonTask flowUpdateTask;
102     private final DataBroker dataBroker;
103
104     /**
105      * The flow tables that make up the processing pipeline
106      */
107     private final List<? extends OfTable> flowPipeline;
108
109     /**
110      * The delay before triggering the flow update task in response to an
111      * event in milliseconds.
112      */
113     private final static int FLOW_UPDATE_DELAY = 250;
114
115     public PolicyManager(DataBroker dataBroker,
116                          PolicyResolver policyResolver,
117                          SwitchManager switchManager,
118                          EndpointManager endpointManager,
119                          RpcProviderRegistry rpcRegistry,
120                          ScheduledExecutorService executor,
121                          short tableOffset,
122                          MacAddress externalRouterMac) {
123         super();
124         this.switchManager = switchManager;
125         this.executor = executor;
126         this.policyResolver = policyResolver;
127         this.dataBroker = dataBroker;
128         this.tableOffset=tableOffset;
129         this.externaMacAddress=externalRouterMac;
130
131
132         if (dataBroker != null) {
133             WriteTransaction t = dataBroker.newWriteOnlyTransaction();
134             t.put(LogicalDatastoreType.OPERATIONAL,
135                   InstanceIdentifier
136                       .builder(SubjectFeatureDefinitions.class)
137                       .build(),
138                   SubjectFeatures.OF_OVERLAY_FEATURES);
139             t.submit();
140         }
141
142         for(Entry<ActionDefinitionId, Action> entry : SubjectFeatures.getActions().entrySet()) {
143             policyResolver.registerActionDefinitions(entry.getKey(), entry.getValue());
144         }
145
146         OfContext ctx = new OfContext(dataBroker, rpcRegistry,
147                                         this, policyResolver, switchManager,
148                                         endpointManager, executor);
149
150         flowPipeline = ImmutableList.of(new PortSecurity(ctx,TABLEID_PORTSECURITY),
151                                         new GroupTable(ctx),
152                                         new IngressNatMapper(ctx,TABLEID_INGRESS_NAT),
153                                         new SourceMapper(ctx,TABLEID_SOURCE_MAPPER),
154                                         new DestinationMapper(ctx,TABLEID_DESTINATION_MAPPER),
155                                         new PolicyEnforcer(ctx,TABLEID_POLICY_ENFORCER),
156                                         new EgressNatMapper(ctx,TABLEID_EGRESS_NAT),
157                                         new ExternalMapper(ctx,TABLEID_EXTERNAL_MAPPER)
158                                         );
159
160         policyScope = policyResolver.registerListener(this);
161         if (switchManager != null)
162             switchManager.registerListener(this);
163         endpointManager.registerListener(this);
164
165         flowUpdateTask = new SingletonTask(executor, new FlowUpdateTask());
166         scheduleUpdate();
167
168         LOG.debug("Initialized OFOverlay policy manager");
169     }
170
171     // **************
172     // SwitchListener
173     // **************
174
175
176     public short getTABLEID_PORTSECURITY() {
177         return TABLEID_PORTSECURITY;
178     }
179
180
181     public short getTABLEID_INGRESS_NAT() {
182         return TABLEID_INGRESS_NAT;
183     }
184
185
186     public short getTABLEID_SOURCE_MAPPER() {
187         return TABLEID_SOURCE_MAPPER;
188     }
189
190
191     public short getTABLEID_DESTINATION_MAPPER() {
192         return TABLEID_DESTINATION_MAPPER;
193     }
194
195
196     public short getTABLEID_POLICY_ENFORCER() {
197         return TABLEID_POLICY_ENFORCER;
198     }
199
200
201     public short getTABLEID_EGRESS_NAT() {
202         return TABLEID_EGRESS_NAT;
203     }
204
205
206     public short getTABLEID_EXTERNAL_MAPPER() {
207         return TABLEID_EXTERNAL_MAPPER;
208     }
209
210     @Override
211     public void switchReady(final NodeId nodeId) {
212         scheduleUpdate();
213     }
214
215     @Override
216     public void switchRemoved(NodeId sw) {
217         // XXX TODO purge switch flows
218         scheduleUpdate();
219     }
220
221     @Override
222     public void switchUpdated(NodeId sw) {
223         scheduleUpdate();
224     }
225
226     // ****************
227     // EndpointListener
228     // ****************
229
230     @Override
231     public void endpointUpdated(EpKey epKey) {
232         scheduleUpdate();
233     }
234
235     @Override
236     public void nodeEndpointUpdated(NodeId nodeId, EpKey epKey){
237         scheduleUpdate();
238     }
239
240     @Override
241     public void groupEndpointUpdated(EgKey egKey, EpKey epKey) {
242         policyScope.addToScope(egKey.getTenantId(), egKey.getEgId());
243         scheduleUpdate();
244     }
245
246     // **************
247     // PolicyListener
248     // **************
249
250     @Override
251     public void policyUpdated(Set<EgKey> updatedConsumers) {
252         scheduleUpdate();
253     }
254
255     // *************
256     // PolicyManager
257     // *************
258
259     /**
260      * Set the learning mode to the specified value
261      * @param learningMode the learning mode to set
262      */
263     public void setLearningMode(LearningMode learningMode) {
264         // No-op for now
265     }
266
267     public static MacAddress getExternaMacAddress() {
268         return externaMacAddress;
269     }
270
271     // **************
272     // Implementation
273     // **************
274
275     public class FlowMap{
276         private ConcurrentMap<InstanceIdentifier<Table>, TableBuilder> flowMap = new ConcurrentHashMap<>();
277
278         public FlowMap() {
279         }
280
281         public TableBuilder getTableForNode(NodeId nodeId, short tableId) {
282             InstanceIdentifier<Table> tableIid = FlowUtils.createTablePath(nodeId, tableId);
283             if(this.flowMap.get(tableIid) == null) {
284                 this.flowMap.put(tableIid, new TableBuilder().setId(tableId));
285                 this.flowMap.get(tableIid).setFlow(new ArrayList<Flow>());
286             }
287             return this.flowMap.get(tableIid);
288         }
289
290         public void writeFlow(NodeId nodeId, short tableId, Flow flow) {
291             TableBuilder tableBuilder = this.getTableForNode(nodeId, tableId);
292             // transforming List<Flow> to Set (with customized equals/hashCode) to eliminate duplicate entries
293             List<Flow> flows = tableBuilder.getFlow();
294             Set<Equivalence.Wrapper<Flow>> wrappedFlows =
295                     new HashSet<>(Collections2.transform(flows, EquivalenceFabric.FLOW_WRAPPER_FUNCTION));
296
297             Equivalence.Wrapper<Flow> wFlow = EquivalenceFabric.FLOW_EQUIVALENCE.wrap(flow);
298
299             if (!wrappedFlows.contains(wFlow)) {
300                 tableBuilder.getFlow().add(Preconditions.checkNotNull(flow));
301             } else {
302                 LOG.debug("Flow already exists in FlowMap - {}", flow);
303             }
304         }
305
306         public void commitToDataStore() {
307             if (dataBroker != null) {
308                 for( Entry<InstanceIdentifier<Table>, TableBuilder> entry : flowMap.entrySet()) {
309                     try {
310                         /*
311                          * Get the currently configured flows for
312                          * this table.
313                          */
314                         updateFlowTable(entry);
315                     } catch (Exception e) {
316                         LOG.warn("Couldn't read flow table {}", entry.getKey());
317                     }
318                 }
319             }
320         }
321
322         private void updateFlowTable(Entry<InstanceIdentifier<Table>,
323                                      TableBuilder> entry)  throws Exception {
324             // flows to update
325             Set<Flow> update = new HashSet<>(entry.getValue().getFlow());
326             // flows currently in the table
327             Set<Flow> curr = new HashSet<>();
328
329             ReadWriteTransaction t = dataBroker.newReadWriteTransaction();
330             Optional<Table> r =
331                    t.read(LogicalDatastoreType.CONFIGURATION, entry.getKey()).get();
332
333             if (r.isPresent()) {
334                 Table currentTable = r.get();
335                 curr = new HashSet<>(currentTable.getFlow());
336             }
337
338             // Sets with custom equivalence rules
339             Set<Equivalence.Wrapper<Flow>> oldFlows =
340                     new HashSet<>(Collections2.transform(curr, EquivalenceFabric.FLOW_WRAPPER_FUNCTION));
341             Set<Equivalence.Wrapper<Flow>> updatedFlows =
342                     new HashSet<>(Collections2.transform(update, EquivalenceFabric.FLOW_WRAPPER_FUNCTION));
343
344             // what is still there but was not updated, needs to be deleted
345             Sets.SetView<Equivalence.Wrapper<Flow>> deletions =
346                     Sets.difference(oldFlows, updatedFlows);
347             // new flows (they were not there before)
348             Sets.SetView<Equivalence.Wrapper<Flow>> additions =
349                     Sets.difference(updatedFlows, oldFlows);
350
351             if (!deletions.isEmpty()) {
352                 for (Equivalence.Wrapper<Flow> wf: deletions) {
353                     Flow f = wf.get();
354                     if (f != null) {
355                         t.delete(LogicalDatastoreType.CONFIGURATION,
356                                 FlowUtils.createFlowPath(entry.getKey(), f.getId()));
357                     }
358                 }
359             }
360             if (!additions.isEmpty()) {
361                 for (Equivalence.Wrapper<Flow> wf: additions) {
362                     Flow f = wf.get();
363                     if (f != null) {
364                         t.put(LogicalDatastoreType.CONFIGURATION,
365                                 FlowUtils.createFlowPath(entry.getKey(), f.getId()), f, true);
366                     }
367                 }
368             }
369             CheckedFuture<Void, TransactionCommitFailedException> f = t.submit();
370             Futures.addCallback(f, new FutureCallback<Void>() {
371                 @Override
372                 public void onFailure(Throwable t) {
373                     LOG.error("Could not write flow table {}", t);
374                 }
375
376                 @Override
377                 public void onSuccess(Void result) {
378                     LOG.debug("Flow table updated.");
379                 }
380             });
381         }
382
383         private void purgeFromDataStore() {
384             // TODO: tbachman: Remove for Lithium -- this is a workaround
385             //       where some flow-mods aren't getting installed
386             //       on vSwitches when changing L3 contexts
387             WriteTransaction d = dataBroker.newWriteOnlyTransaction();
388
389             for( Entry<InstanceIdentifier<Table>, TableBuilder> entry : flowMap.entrySet()) {
390                 d.delete(LogicalDatastoreType.CONFIGURATION, entry.getKey());
391             }
392
393             CheckedFuture<Void, TransactionCommitFailedException> fu = d.submit();
394             Futures.addCallback(fu, new FutureCallback<Void>() {
395                 @Override
396                 public void onFailure(Throwable th) {
397                     LOG.error("Could not write flow table.", th);
398                 }
399
400                 @Override
401                 public void onSuccess(Void result) {
402                     LOG.debug("Flow table updated.");
403                 }
404             });
405         }
406
407     }
408
409     private void scheduleUpdate() {
410         if (switchManager != null) {
411             LOG.trace("Scheduling flow update task");
412             flowUpdateTask.reschedule(FLOW_UPDATE_DELAY, TimeUnit.MILLISECONDS);
413         }
414     }
415
416     /**
417      * Update the flows on a particular switch
418      */
419     private class SwitchFlowUpdateTask implements Callable<Void> {
420         private FlowMap flowMap;
421
422         public SwitchFlowUpdateTask(FlowMap flowMap) {
423             super();
424             this.flowMap = flowMap;
425         }
426
427         @Override
428         public Void call() throws Exception {
429             for (NodeId node : switchManager.getReadySwitches()) {
430                 PolicyInfo info = policyResolver.getCurrentPolicy();
431                 if (info == null)
432                     return null;
433                 for (OfTable table : flowPipeline) {
434                     try {
435                         table.update(node, info, flowMap);
436                     } catch (Exception e) {
437                         LOG.error("Failed to write flow table {}",
438                                 table.getClass().getSimpleName(), e);
439                     }
440                 }
441             }
442             return null;
443         }
444     }
445
446     /**
447      * Update all flows on all switches as needed.  Note that this will block
448      * one of the threads on the executor.
449      */
450     private class FlowUpdateTask implements Runnable {
451         @Override
452         public void run() {
453             LOG.debug("Beginning flow update task");
454
455             CompletionService<Void> ecs
456                 = new ExecutorCompletionService<>(executor);
457             int n = 0;
458
459             FlowMap flowMap = new FlowMap();
460
461             SwitchFlowUpdateTask swut = new SwitchFlowUpdateTask(flowMap);
462             ecs.submit(swut);
463             n+=1;
464
465             for (int i = 0; i < n; i++) {
466                 try {
467                     ecs.take().get();
468                     flowMap.commitToDataStore();
469                 } catch (InterruptedException | ExecutionException e) {
470                     LOG.error("Failed to update flow tables", e);
471                 }
472             }
473             LOG.debug("Flow update completed");
474         }
475     }
476
477
478
479
480
481 }