Merge "Bug 3281 duplicate flow programming"
[groupbasedpolicy.git] / renderers / ofoverlay / src / main / java / org / opendaylight / groupbasedpolicy / renderer / ofoverlay / PolicyManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.groupbasedpolicy.renderer.ofoverlay;
10
11 import com.google.common.base.Equivalence;
12 import com.google.common.base.Optional;
13 import com.google.common.base.Preconditions;
14 import com.google.common.collect.Collections2;
15 import com.google.common.collect.ImmutableList;
16 import com.google.common.collect.Sets;
17 import com.google.common.util.concurrent.CheckedFuture;
18 import com.google.common.util.concurrent.FutureCallback;
19 import com.google.common.util.concurrent.Futures;
20 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
21 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
22 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
23 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
24 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
25 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
26 import org.opendaylight.groupbasedpolicy.endpoint.EpKey;
27 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.equivalence.EquivalenceFabric;
28 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.DestinationMapper;
29 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.FlowUtils;
30 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.GroupTable;
31 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.OfTable;
32 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.PolicyEnforcer;
33 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.PortSecurity;
34 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.SourceMapper;
35 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.node.SwitchListener;
36 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.node.SwitchManager;
37 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.sf.Action;
38 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.sf.SubjectFeatures;
39 import org.opendaylight.groupbasedpolicy.resolver.EgKey;
40 import org.opendaylight.groupbasedpolicy.resolver.PolicyInfo;
41 import org.opendaylight.groupbasedpolicy.resolver.PolicyListener;
42 import org.opendaylight.groupbasedpolicy.resolver.PolicyResolver;
43 import org.opendaylight.groupbasedpolicy.resolver.PolicyScope;
44 import org.opendaylight.groupbasedpolicy.util.SingletonTask;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.ActionDefinitionId;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.ofoverlay.rev140528.OfOverlayConfig.LearningMode;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.SubjectFeatureDefinitions;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
52 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55
56 import java.util.ArrayList;
57 import java.util.HashSet;
58 import java.util.List;
59 import java.util.Map.Entry;
60 import java.util.Set;
61 import java.util.concurrent.Callable;
62 import java.util.concurrent.CompletionService;
63 import java.util.concurrent.ConcurrentHashMap;
64 import java.util.concurrent.ConcurrentMap;
65 import java.util.concurrent.ExecutionException;
66 import java.util.concurrent.ExecutorCompletionService;
67 import java.util.concurrent.ScheduledExecutorService;
68 import java.util.concurrent.TimeUnit;
69
70 /**
71  * Manage policies on switches by subscribing to updates from the
72  * policy resolver and information about endpoints from the endpoint
73  * registry
74  */
75 public class PolicyManager
76      implements SwitchListener, PolicyListener, EndpointListener {
77     private static final Logger LOG =
78             LoggerFactory.getLogger(PolicyManager.class);
79
80     private final SwitchManager switchManager;
81     private final PolicyResolver policyResolver;
82
83     private final PolicyScope policyScope;
84
85     private final ScheduledExecutorService executor;
86     private final SingletonTask flowUpdateTask;
87     private final DataBroker dataBroker;
88
89     /**
90      * The flow tables that make up the processing pipeline
91      */
92     private final List<? extends OfTable> flowPipeline;
93
94     /**
95      * The delay before triggering the flow update task in response to an
96      * event in milliseconds.
97      */
98     private final static int FLOW_UPDATE_DELAY = 250;
99
100     public PolicyManager(DataBroker dataBroker,
101                          PolicyResolver policyResolver,
102                          SwitchManager switchManager,
103                          EndpointManager endpointManager,
104                          RpcProviderRegistry rpcRegistry,
105                          ScheduledExecutorService executor) {
106         super();
107         this.switchManager = switchManager;
108         this.executor = executor;
109         this.policyResolver = policyResolver;
110         this.dataBroker = dataBroker;
111
112
113         if (dataBroker != null) {
114             WriteTransaction t = dataBroker.newWriteOnlyTransaction();
115             t.put(LogicalDatastoreType.OPERATIONAL,
116                   InstanceIdentifier
117                       .builder(SubjectFeatureDefinitions.class)
118                       .build(),
119                   SubjectFeatures.OF_OVERLAY_FEATURES);
120             t.submit();
121         }
122
123         for(Entry<ActionDefinitionId, Action> entry : SubjectFeatures.getActions().entrySet()) {
124             policyResolver.registerActionDefinitions(entry.getKey(), entry.getValue());
125         }
126
127         OfContext ctx = new OfContext(dataBroker, rpcRegistry,
128                                         this, policyResolver, switchManager,
129                                         endpointManager, executor);
130         flowPipeline = ImmutableList.of(new PortSecurity(ctx),
131                                         new GroupTable(ctx),
132                                         new SourceMapper(ctx),
133                                         new DestinationMapper(ctx),
134                                         new PolicyEnforcer(ctx));
135
136         policyScope = policyResolver.registerListener(this);
137         if (switchManager != null)
138             switchManager.registerListener(this);
139         endpointManager.registerListener(this);
140
141         flowUpdateTask = new SingletonTask(executor, new FlowUpdateTask());
142         scheduleUpdate();
143
144         LOG.debug("Initialized OFOverlay policy manager");
145     }
146
147     // **************
148     // SwitchListener
149     // **************
150
151     @Override
152     public void switchReady(final NodeId nodeId) {
153         scheduleUpdate();
154     }
155
156     @Override
157     public void switchRemoved(NodeId sw) {
158         // XXX TODO purge switch flows
159         scheduleUpdate();
160     }
161
162     @Override
163     public void switchUpdated(NodeId sw) {
164         scheduleUpdate();
165     }
166
167     // ****************
168     // EndpointListener
169     // ****************
170
171     @Override
172     public void endpointUpdated(EpKey epKey) {
173         scheduleUpdate();
174     }
175
176     @Override
177     public void nodeEndpointUpdated(NodeId nodeId, EpKey epKey){
178         scheduleUpdate();
179     }
180
181     @Override
182     public void groupEndpointUpdated(EgKey egKey, EpKey epKey) {
183         policyScope.addToScope(egKey.getTenantId(), egKey.getEgId());
184         scheduleUpdate();
185     }
186
187     // **************
188     // PolicyListener
189     // **************
190
191     @Override
192     public void policyUpdated(Set<EgKey> updatedConsumers) {
193         scheduleUpdate();
194     }
195
196     // *************
197     // PolicyManager
198     // *************
199
200     /**
201      * Set the learning mode to the specified value
202      * @param learningMode the learning mode to set
203      */
204     public void setLearningMode(LearningMode learningMode) {
205         // No-op for now
206     }
207
208
209
210     // **************
211     // Implementation
212     // **************
213
214     public class FlowMap{
215         private ConcurrentMap<InstanceIdentifier<Table>, TableBuilder> flowMap = new ConcurrentHashMap<>();
216
217         public FlowMap() {
218         }
219
220         public TableBuilder getTableForNode(NodeId nodeId, short tableId) {
221             InstanceIdentifier<Table> tableIid = FlowUtils.createTablePath(nodeId, tableId);
222             if(this.flowMap.get(tableIid) == null) {
223                 this.flowMap.put(tableIid, new TableBuilder().setId(tableId));
224                 this.flowMap.get(tableIid).setFlow(new ArrayList<Flow>());
225             }
226             return this.flowMap.get(tableIid);
227         }
228
229         public void writeFlow(NodeId nodeId, short tableId, Flow flow) {
230             TableBuilder tableBuilder = this.getTableForNode(nodeId, tableId);
231             // transforming List<Flow> to Set (with customized equals/hashCode) to eliminate duplicate entries
232             List<Flow> flows = tableBuilder.getFlow();
233             Set<Equivalence.Wrapper<Flow>> wrappedFlows =
234                     new HashSet<>(Collections2.transform(flows, EquivalenceFabric.FLOW_WRAPPER_FUNCTION));
235
236             Equivalence.Wrapper<Flow> wFlow = EquivalenceFabric.FLOW_EQUIVALENCE.wrap(flow);
237
238             if (!wrappedFlows.contains(wFlow)) {
239                 tableBuilder.getFlow().add(Preconditions.checkNotNull(flow));
240             }
241         }
242
243         public void commitToDataStore() {
244             if (dataBroker != null) {
245                 for( Entry<InstanceIdentifier<Table>, TableBuilder> entry : flowMap.entrySet()) {
246                     try {
247                         /*
248                          * Get the currently configured flows for
249                          * this table.
250                          */
251                         updateFlowTable(entry);
252                     } catch (Exception e) {
253                         LOG.warn("Couldn't read flow table {}", entry.getKey());
254                     }
255                 }
256             }
257         }
258
259         private void updateFlowTable(Entry<InstanceIdentifier<Table>,
260                                      TableBuilder> entry)  throws Exception {
261             // flows to update
262             Set<Flow> update = new HashSet<>(entry.getValue().getFlow());
263             // flows currently in the table
264             Set<Flow> curr = new HashSet<>();
265
266             ReadWriteTransaction t = dataBroker.newReadWriteTransaction();
267             Optional<Table> r =
268                    t.read(LogicalDatastoreType.CONFIGURATION, entry.getKey()).get();
269
270             if (r.isPresent()) {
271                 Table currentTable = r.get();
272                 curr = new HashSet<>(currentTable.getFlow());
273             }
274
275             // Sets with custom equivalence rules
276             Set<Equivalence.Wrapper<Flow>> oldFlows =
277                     new HashSet<>(Collections2.transform(curr, EquivalenceFabric.FLOW_WRAPPER_FUNCTION));
278             Set<Equivalence.Wrapper<Flow>> updatedFlows =
279                     new HashSet<>(Collections2.transform(update, EquivalenceFabric.FLOW_WRAPPER_FUNCTION));
280
281             // what is still there but was not updated, needs to be deleted
282             Sets.SetView<Equivalence.Wrapper<Flow>> deletions =
283                     Sets.difference(oldFlows, updatedFlows);
284             // new flows (they were not there before)
285             Sets.SetView<Equivalence.Wrapper<Flow>> additions =
286                     Sets.difference(updatedFlows, oldFlows);
287
288             if (!deletions.isEmpty()) {
289                 for (Equivalence.Wrapper<Flow> wf: deletions) {
290                     Flow f = wf.get();
291                     if (f != null) {
292                         t.delete(LogicalDatastoreType.CONFIGURATION,
293                                 FlowUtils.createFlowPath(entry.getKey(), f.getId()));
294                     }
295                 }
296             }
297             if (!additions.isEmpty()) {
298                 for (Equivalence.Wrapper<Flow> wf: additions) {
299                     Flow f = wf.get();
300                     if (f != null) {
301                         t.put(LogicalDatastoreType.CONFIGURATION,
302                                 FlowUtils.createFlowPath(entry.getKey(), f.getId()), f, true);
303                     }
304                 }
305             }
306             CheckedFuture<Void, TransactionCommitFailedException> f = t.submit();
307             Futures.addCallback(f, new FutureCallback<Void>() {
308                 @Override
309                 public void onFailure(Throwable t) {
310                     LOG.error("Could not write flow table {}", t);
311                 }
312
313                 @Override
314                 public void onSuccess(Void result) {
315                     LOG.debug("Flow table updated.");
316                 }
317             });
318         }
319
320         private void purgeFromDataStore() {
321             // TODO: tbachman: Remove for Lithium -- this is a workaround
322             //       where some flow-mods aren't getting installed
323             //       on vSwitches when changing L3 contexts
324             WriteTransaction d = dataBroker.newWriteOnlyTransaction();
325
326             for( Entry<InstanceIdentifier<Table>, TableBuilder> entry : flowMap.entrySet()) {
327                 d.delete(LogicalDatastoreType.CONFIGURATION, entry.getKey());
328             }
329
330             CheckedFuture<Void, TransactionCommitFailedException> fu = d.submit();
331             Futures.addCallback(fu, new FutureCallback<Void>() {
332                 @Override
333                 public void onFailure(Throwable th) {
334                     LOG.error("Could not write flow table.", th);
335                 }
336
337                 @Override
338                 public void onSuccess(Void result) {
339                     LOG.debug("Flow table updated.");
340                 }
341             });
342         }
343
344     }
345
346     private void scheduleUpdate() {
347         if (switchManager != null) {
348             LOG.trace("Scheduling flow update task");
349             flowUpdateTask.reschedule(FLOW_UPDATE_DELAY, TimeUnit.MILLISECONDS);
350         }
351     }
352
353     /**
354      * Update the flows on a particular switch
355      */
356     private class SwitchFlowUpdateTask implements Callable<Void> {
357         private FlowMap flowMap;
358
359         public SwitchFlowUpdateTask(FlowMap flowMap) {
360             super();
361             this.flowMap = flowMap;
362         }
363
364         @Override
365         public Void call() throws Exception {
366             for (NodeId node : switchManager.getReadySwitches()) {
367                 PolicyInfo info = policyResolver.getCurrentPolicy();
368                 if (info == null)
369                     return null;
370                 for (OfTable table : flowPipeline) {
371                     try {
372                         table.update(node, info, flowMap);
373                     } catch (Exception e) {
374                         LOG.error("Failed to write flow table {}",
375                                 table.getClass().getSimpleName(), e);
376                     }
377                 }
378             }
379             return null;
380         }
381     }
382
383     /**
384      * Update all flows on all switches as needed.  Note that this will block
385      * one of the threads on the executor.
386      */
387     private class FlowUpdateTask implements Runnable {
388         @Override
389         public void run() {
390             LOG.debug("Beginning flow update task");
391
392             CompletionService<Void> ecs
393                 = new ExecutorCompletionService<>(executor);
394             int n = 0;
395
396             FlowMap flowMap = new FlowMap();
397
398             SwitchFlowUpdateTask swut = new SwitchFlowUpdateTask(flowMap);
399             ecs.submit(swut);
400             n+=1;
401
402             for (int i = 0; i < n; i++) {
403                 try {
404                     ecs.take().get();
405                     flowMap.commitToDataStore();
406                 } catch (InterruptedException | ExecutionException e) {
407                     LOG.error("Failed to update flow tables", e);
408                 }
409             }
410             LOG.debug("Flow update completed");
411         }
412     }
413
414
415
416
417
418 }