Implement SFC integration
[groupbasedpolicy.git] / renderers / ofoverlay / src / main / java / org / opendaylight / groupbasedpolicy / renderer / ofoverlay / PolicyManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.groupbasedpolicy.renderer.ofoverlay;
10
11 import java.util.ArrayList;
12 import java.util.HashSet;
13 import java.util.List;
14 import java.util.Map.Entry;
15 import java.util.Set;
16 import java.util.concurrent.Callable;
17 import java.util.concurrent.CompletionService;
18 import java.util.concurrent.ConcurrentHashMap;
19 import java.util.concurrent.ConcurrentMap;
20 import java.util.concurrent.ExecutionException;
21 import java.util.concurrent.ExecutorCompletionService;
22 import java.util.concurrent.ScheduledExecutorService;
23 import java.util.concurrent.TimeUnit;
24
25 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
26 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
27 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
28 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
29 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
30 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
31 import org.opendaylight.groupbasedpolicy.endpoint.EpKey;
32 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.DestinationMapper;
33 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.FlowUtils;
34 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.GroupTable;
35 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.OfTable;
36 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.PolicyEnforcer;
37 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.PortSecurity;
38 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.SourceMapper;
39 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.node.SwitchListener;
40 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.sf.Action;
41 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.node.SwitchManager;
42 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.sf.SubjectFeatures;
43 import org.opendaylight.groupbasedpolicy.resolver.EgKey;
44 import org.opendaylight.groupbasedpolicy.resolver.PolicyInfo;
45 import org.opendaylight.groupbasedpolicy.resolver.PolicyListener;
46 import org.opendaylight.groupbasedpolicy.resolver.PolicyResolver;
47 import org.opendaylight.groupbasedpolicy.resolver.PolicyScope;
48 import org.opendaylight.groupbasedpolicy.util.SingletonTask;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.common.rev140421.ActionDefinitionId;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.ofoverlay.rev140528.OfOverlayConfig.LearningMode;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.SubjectFeatureDefinitions;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
56 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
57 import org.slf4j.Logger;
58 import org.slf4j.LoggerFactory;
59
60 import com.google.common.base.Optional;
61 import com.google.common.base.Preconditions;
62 import com.google.common.collect.ImmutableList;
63 import com.google.common.collect.Sets;
64 import com.google.common.util.concurrent.CheckedFuture;
65 import com.google.common.util.concurrent.FutureCallback;
66 import com.google.common.util.concurrent.Futures;
67
68 /**
69  * Manage policies on switches by subscribing to updates from the
70  * policy resolver and information about endpoints from the endpoint
71  * registry
72  */
73 public class PolicyManager
74      implements SwitchListener, PolicyListener, EndpointListener {
75     private static final Logger LOG =
76             LoggerFactory.getLogger(PolicyManager.class);
77
78     private final SwitchManager switchManager;
79     private final PolicyResolver policyResolver;
80
81     private final PolicyScope policyScope;
82
83     private final ScheduledExecutorService executor;
84     private final SingletonTask flowUpdateTask;
85     private final DataBroker dataBroker;
86
87     /**
88      * The flow tables that make up the processing pipeline
89      */
90     private final List<? extends OfTable> flowPipeline;
91
92     /**
93      * The delay before triggering the flow update task in response to an
94      * event in milliseconds.
95      */
96     private final static int FLOW_UPDATE_DELAY = 250;
97
98
99
100     public PolicyManager(DataBroker dataBroker,
101                          PolicyResolver policyResolver,
102                          SwitchManager switchManager,
103                          EndpointManager endpointManager,
104                          RpcProviderRegistry rpcRegistry,
105                          ScheduledExecutorService executor) {
106         super();
107         this.switchManager = switchManager;
108         this.executor = executor;
109         this.policyResolver = policyResolver;
110         this.dataBroker = dataBroker;
111
112
113         if (dataBroker != null) {
114             WriteTransaction t = dataBroker.newWriteOnlyTransaction();
115             t.put(LogicalDatastoreType.OPERATIONAL,
116                   InstanceIdentifier
117                       .builder(SubjectFeatureDefinitions.class)
118                       .build(),
119                   SubjectFeatures.OF_OVERLAY_FEATURES);
120             t.submit();
121         }
122
123         for(Entry<ActionDefinitionId, Action> entry : SubjectFeatures.getActions().entrySet()) {
124             policyResolver.registerActionDefinitions(entry.getKey(), entry.getValue());
125         }
126
127         OfContext ctx = new OfContext(dataBroker, rpcRegistry,
128                                         this, policyResolver, switchManager,
129                                         endpointManager, executor);
130         flowPipeline = ImmutableList.of(new PortSecurity(ctx),
131                                         new GroupTable(ctx),
132                                         new SourceMapper(ctx),
133                                         new DestinationMapper(ctx),
134                                         new PolicyEnforcer(ctx));
135
136         policyScope = policyResolver.registerListener(this);
137         if (switchManager != null)
138             switchManager.registerListener(this);
139         endpointManager.registerListener(this);
140
141         flowUpdateTask = new SingletonTask(executor, new FlowUpdateTask());
142         scheduleUpdate();
143
144         LOG.debug("Initialized OFOverlay policy manager");
145     }
146
147     // **************
148     // SwitchListener
149     // **************
150
151     @Override
152     public void switchReady(final NodeId nodeId) {
153         scheduleUpdate();
154     }
155
156     @Override
157     public void switchRemoved(NodeId sw) {
158         // XXX TODO purge switch flows
159         scheduleUpdate();
160     }
161
162     @Override
163     public void switchUpdated(NodeId sw) {
164         scheduleUpdate();
165     }
166
167     // ****************
168     // EndpointListener
169     // ****************
170
171     @Override
172     public void endpointUpdated(EpKey epKey) {
173         scheduleUpdate();
174     }
175
176     @Override
177     public void nodeEndpointUpdated(NodeId nodeId, EpKey epKey){
178         scheduleUpdate();
179     }
180
181     @Override
182     public void groupEndpointUpdated(EgKey egKey, EpKey epKey) {
183         policyScope.addToScope(egKey.getTenantId(), egKey.getEgId());
184         scheduleUpdate();
185     }
186
187     // **************
188     // PolicyListener
189     // **************
190
191     @Override
192     public void policyUpdated(Set<EgKey> updatedConsumers) {
193         scheduleUpdate();
194     }
195
196     // *************
197     // PolicyManager
198     // *************
199
200     /**
201      * Set the learning mode to the specified value
202      * @param learningMode the learning mode to set
203      */
204     public void setLearningMode(LearningMode learningMode) {
205         // No-op for now
206     }
207
208
209
210     // **************
211     // Implementation
212     // **************
213
214     public class FlowMap{
215         private ConcurrentMap<InstanceIdentifier<Table>, TableBuilder> flowMap = new ConcurrentHashMap<>();
216
217         public FlowMap() {
218         }
219
220         public TableBuilder getTableForNode(NodeId nodeId, short tableId) {
221             InstanceIdentifier<Table> tableIid = FlowUtils.createTablePath(nodeId, tableId);
222             if(this.flowMap.get(tableIid) == null) {
223                 this.flowMap.put(tableIid, new TableBuilder().setId(tableId));
224                 this.flowMap.get(tableIid).setFlow(new ArrayList<Flow>());
225             }
226             return this.flowMap.get(tableIid);
227         }
228
229         public void writeFlow(NodeId nodeId,short tableId, Flow flow) {
230             TableBuilder tableBuilder = this.getTableForNode(nodeId, tableId);
231             if (!tableBuilder.getFlow().contains(flow)) {
232                 tableBuilder.getFlow().add(Preconditions.checkNotNull(flow));
233             }
234         }
235
236         public void commitToDataStore() {
237             if (dataBroker != null) {
238                 for( Entry<InstanceIdentifier<Table>, TableBuilder> entry : flowMap.entrySet()) {
239                     try {
240                         /*
241                          * Get the currently configured flows for
242                          * this table.
243                          */
244                         updateFlowTable(entry);
245                     } catch (Exception e) {
246                         LOG.warn("Couldn't read flow table {}", entry.getKey());
247                     }
248                 }
249             }
250         }
251
252         private void updateFlowTable(Entry<InstanceIdentifier<Table>,
253                                      TableBuilder> entry)  throws Exception {
254             Set<Flow> update = new HashSet<Flow>(entry.getValue().getFlow());
255             Set<Flow> curr = new HashSet<Flow>();
256
257             ReadWriteTransaction t = dataBroker.newReadWriteTransaction();
258             Optional<Table> r =
259                    t.read(LogicalDatastoreType.CONFIGURATION, entry.getKey()).get();
260
261             if (r.isPresent()) {
262                 Table curTable = r.get();
263                 curr = new HashSet<Flow>(curTable.getFlow());
264             }
265             Sets.SetView<Flow> deletions = Sets.difference(curr, update);
266             Sets.SetView<Flow> additions = Sets.difference(update, curr);
267             if (!deletions.isEmpty()) {
268                 for (Flow f: deletions) {
269                     t.delete(LogicalDatastoreType.CONFIGURATION,
270                              FlowUtils.createFlowPath(entry.getKey(), f.getId()));
271                 }
272             }
273             if (!additions.isEmpty()) {
274                 for (Flow f: additions) {
275                     t.put(LogicalDatastoreType.CONFIGURATION,
276                           FlowUtils.createFlowPath(entry.getKey(), f.getId()), f, true);
277                 }
278             }
279             CheckedFuture<Void, TransactionCommitFailedException> f = t.submit();
280             Futures.addCallback(f, new FutureCallback<Void>() {
281                 @Override
282                 public void onFailure(Throwable t) {
283                     LOG.error("Could not write flow table {}", t);
284                 }
285
286                 @Override
287                 public void onSuccess(Void result) {
288                     LOG.debug("Flow table updated.");
289                 }
290             });
291         }
292
293         private void purgeFromDataStore() {
294             // TODO: tbachman: Remove for Lithium -- this is a workaround
295             //       where some flow-mods aren't getting installed
296             //       on vSwitches when changing L3 contexts
297             WriteTransaction d = dataBroker.newWriteOnlyTransaction();
298
299             for( Entry<InstanceIdentifier<Table>, TableBuilder> entry : flowMap.entrySet()) {
300                 d.delete(LogicalDatastoreType.CONFIGURATION, entry.getKey());
301             }
302
303             CheckedFuture<Void, TransactionCommitFailedException> fu = d.submit();
304             Futures.addCallback(fu, new FutureCallback<Void>() {
305                 @Override
306                 public void onFailure(Throwable th) {
307                     LOG.error("Could not write flow table.", th);
308                 }
309
310                 @Override
311                 public void onSuccess(Void result) {
312                     LOG.debug("Flow table updated.");
313                 }
314             });
315         }
316
317      }
318
319     private void scheduleUpdate() {
320         if (switchManager != null) {
321             LOG.trace("Scheduling flow update task");
322             flowUpdateTask.reschedule(FLOW_UPDATE_DELAY, TimeUnit.MILLISECONDS);
323         }
324     }
325
326     /**
327      * Update the flows on a particular switch
328      */
329     private class SwitchFlowUpdateTask implements Callable<Void> {
330         private FlowMap flowMap;
331
332         public SwitchFlowUpdateTask(FlowMap flowMap) {
333             super();
334             this.flowMap = flowMap;
335         }
336
337         @Override
338         public Void call() throws Exception {
339             for (NodeId node : switchManager.getReadySwitches()) {
340                 PolicyInfo info = policyResolver.getCurrentPolicy();
341                 if (info == null)
342                     return null;
343                 for (OfTable table : flowPipeline) {
344                     try {
345                         table.update(node, info, flowMap);
346                     } catch (Exception e) {
347                         LOG.error("Failed to write flow table {}",
348                                 table.getClass().getSimpleName(), e);
349                     }
350                 }
351             }
352             return null;
353         }
354     }
355
356     /**
357      * Update all flows on all switches as needed.  Note that this will block
358      * one of the threads on the executor.
359      */
360     private class FlowUpdateTask implements Runnable {
361         @Override
362         public void run() {
363             LOG.debug("Beginning flow update task");
364
365             CompletionService<Void> ecs
366                 = new ExecutorCompletionService<Void>(executor);
367             int n = 0;
368
369             FlowMap flowMap = new FlowMap();
370
371             SwitchFlowUpdateTask swut = new SwitchFlowUpdateTask(flowMap);
372             ecs.submit(swut);
373             n+=1;
374
375             for (int i = 0; i < n; i++) {
376                 try {
377                     ecs.take().get();
378                     flowMap.commitToDataStore();
379                 } catch (InterruptedException | ExecutionException e) {
380                     LOG.error("Failed to update flow tables", e);
381                 }
382             }
383             LOG.debug("Flow update completed");
384         }
385     }
386
387
388
389
390
391 }