beedc2a8667b46c0b5e0f9e2226c896de7ce21c0
[groupbasedpolicy.git] / renderers / ofoverlay / src / main / java / org / opendaylight / groupbasedpolicy / renderer / ofoverlay / PolicyManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.groupbasedpolicy.renderer.ofoverlay;
10
11 import java.util.ArrayList;
12 import java.util.HashSet;
13 import java.util.List;
14 import java.util.Map.Entry;
15 import java.util.Set;
16 import java.util.concurrent.Callable;
17 import java.util.concurrent.CompletionService;
18 import java.util.concurrent.ConcurrentHashMap;
19 import java.util.concurrent.ConcurrentMap;
20 import java.util.concurrent.ExecutionException;
21 import java.util.concurrent.ExecutorCompletionService;
22 import java.util.concurrent.ScheduledExecutorService;
23 import java.util.concurrent.TimeUnit;
24
25 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
26 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
27 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
28 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
29 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
30 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
31 import org.opendaylight.groupbasedpolicy.endpoint.EpKey;
32 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.DestinationMapper;
33 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.FlowUtils;
34 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.GroupTable;
35 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.OfTable;
36 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.PolicyEnforcer;
37 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.PortSecurity;
38 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.SourceMapper;
39 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.node.SwitchListener;
40 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.node.SwitchManager;
41 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.sf.SubjectFeatures;
42 import org.opendaylight.groupbasedpolicy.resolver.EgKey;
43 import org.opendaylight.groupbasedpolicy.resolver.PolicyInfo;
44 import org.opendaylight.groupbasedpolicy.resolver.PolicyListener;
45 import org.opendaylight.groupbasedpolicy.resolver.PolicyResolver;
46 import org.opendaylight.groupbasedpolicy.resolver.PolicyScope;
47 import org.opendaylight.groupbasedpolicy.util.SingletonTask;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.ofoverlay.rev140528.OfOverlayConfig.LearningMode;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.SubjectFeatureDefinitions;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
54 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
57
58 import com.google.common.base.Optional;
59 import com.google.common.base.Preconditions;
60 import com.google.common.collect.ImmutableList;
61 import com.google.common.collect.Sets;
62 import com.google.common.util.concurrent.CheckedFuture;
63 import com.google.common.util.concurrent.FutureCallback;
64 import com.google.common.util.concurrent.Futures;
65
66 /**
67  * Manage policies on switches by subscribing to updates from the
68  * policy resolver and information about endpoints from the endpoint
69  * registry
70  */
71 public class PolicyManager
72      implements SwitchListener, PolicyListener, EndpointListener {
73     private static final Logger LOG =
74             LoggerFactory.getLogger(PolicyManager.class);
75
76     private final SwitchManager switchManager;
77     private final PolicyResolver policyResolver;
78
79     private final PolicyScope policyScope;
80
81     private final ScheduledExecutorService executor;
82     private final SingletonTask flowUpdateTask;
83     private final DataBroker dataBroker;
84
85     /**
86      * The flow tables that make up the processing pipeline
87      */
88     private final List<? extends OfTable> flowPipeline;
89
90     /**
91      * The delay before triggering the flow update task in response to an
92      * event in milliseconds.
93      */
94     private final static int FLOW_UPDATE_DELAY = 250;
95
96
97
98     public PolicyManager(DataBroker dataBroker,
99                          PolicyResolver policyResolver,
100                          SwitchManager switchManager,
101                          EndpointManager endpointManager,
102                          RpcProviderRegistry rpcRegistry,
103                          ScheduledExecutorService executor) {
104         super();
105         this.switchManager = switchManager;
106         this.executor = executor;
107         this.policyResolver = policyResolver;
108         this.dataBroker = dataBroker;
109
110
111         if (dataBroker != null) {
112             WriteTransaction t = dataBroker.newWriteOnlyTransaction();
113             t.put(LogicalDatastoreType.OPERATIONAL,
114                   InstanceIdentifier
115                       .builder(SubjectFeatureDefinitions.class)
116                       .build(),
117                   SubjectFeatures.OF_OVERLAY_FEATURES);
118             t.submit();
119         }
120
121         OfContext ctx = new OfContext(dataBroker, rpcRegistry,
122                                         this, policyResolver, switchManager,
123                                         endpointManager, executor);
124         flowPipeline = ImmutableList.of(new PortSecurity(ctx),
125                                         new GroupTable(ctx),
126                                         new SourceMapper(ctx),
127                                         new DestinationMapper(ctx),
128                                         new PolicyEnforcer(ctx));
129
130         policyScope = policyResolver.registerListener(this);
131         if (switchManager != null)
132             switchManager.registerListener(this);
133         endpointManager.registerListener(this);
134
135         flowUpdateTask = new SingletonTask(executor, new FlowUpdateTask());
136         scheduleUpdate();
137
138         LOG.debug("Initialized OFOverlay policy manager");
139     }
140
141     // **************
142     // SwitchListener
143     // **************
144
145     @Override
146     public void switchReady(final NodeId nodeId) {
147         scheduleUpdate();
148     }
149
150     @Override
151     public void switchRemoved(NodeId sw) {
152         // XXX TODO purge switch flows
153         scheduleUpdate();
154     }
155
156     @Override
157     public void switchUpdated(NodeId sw) {
158         scheduleUpdate();
159     }
160
161     // ****************
162     // EndpointListener
163     // ****************
164
165     @Override
166     public void endpointUpdated(EpKey epKey) {
167         scheduleUpdate();
168     }
169
170     @Override
171     public void nodeEndpointUpdated(NodeId nodeId, EpKey epKey){
172         scheduleUpdate();
173     }
174
175     @Override
176     public void groupEndpointUpdated(EgKey egKey, EpKey epKey) {
177         policyScope.addToScope(egKey.getTenantId(), egKey.getEgId());
178         scheduleUpdate();
179     }
180
181     // **************
182     // PolicyListener
183     // **************
184
185     @Override
186     public void policyUpdated(Set<EgKey> updatedConsumers) {
187         scheduleUpdate();
188     }
189
190     // *************
191     // PolicyManager
192     // *************
193
194     /**
195      * Set the learning mode to the specified value
196      * @param learningMode the learning mode to set
197      */
198     public void setLearningMode(LearningMode learningMode) {
199         // No-op for now
200     }
201
202
203
204     // **************
205     // Implementation
206     // **************
207
208     public class FlowMap{
209         private ConcurrentMap<InstanceIdentifier<Table>, TableBuilder> flowMap = new ConcurrentHashMap<>();
210
211         public FlowMap() {
212         }
213
214         public TableBuilder getTableForNode(NodeId nodeId, short tableId) {
215             InstanceIdentifier<Table> tableIid = FlowUtils.createTablePath(nodeId, tableId);
216             if(this.flowMap.get(tableIid) == null) {
217                 this.flowMap.put(tableIid, new TableBuilder().setId(tableId));
218                 this.flowMap.get(tableIid).setFlow(new ArrayList<Flow>());
219             }
220             return this.flowMap.get(tableIid);
221         }
222
223         public void writeFlow(NodeId nodeId,short tableId, Flow flow) {
224             TableBuilder tableBuilder = this.getTableForNode(nodeId, tableId);
225             if (!tableBuilder.getFlow().contains(flow)) {
226                 tableBuilder.getFlow().add(Preconditions.checkNotNull(flow));
227             }
228         }
229
230         public void commitToDataStore() {
231             if (dataBroker != null) {
232                 for( Entry<InstanceIdentifier<Table>, TableBuilder> entry : flowMap.entrySet()) {
233                     try {
234                         /*
235                          * Get the currently configured flows for
236                          * this table.
237                          */
238                         updateFlowTable(entry);
239                     } catch (Exception e) {
240                         LOG.warn("Couldn't read flow table {}", entry.getKey());
241                     }
242                 }
243             }
244         }
245
246         private void updateFlowTable(Entry<InstanceIdentifier<Table>,
247                                      TableBuilder> entry)  throws Exception {
248             Set<Flow> update = new HashSet<Flow>(entry.getValue().getFlow());
249             Set<Flow> curr = new HashSet<Flow>();
250
251             ReadWriteTransaction t = dataBroker.newReadWriteTransaction();
252             Optional<Table> r =
253                    t.read(LogicalDatastoreType.CONFIGURATION, entry.getKey()).get();
254
255             if (r.isPresent()) {
256                 Table curTable = r.get();
257                 curr = new HashSet<Flow>(curTable.getFlow());
258             }
259             Sets.SetView<Flow> deletions = Sets.difference(curr, update);
260             Sets.SetView<Flow> additions = Sets.difference(update, curr);
261             if (!deletions.isEmpty()) {
262                 for (Flow f: deletions) {
263                     t.delete(LogicalDatastoreType.CONFIGURATION,
264                              FlowUtils.createFlowPath(entry.getKey(), f.getId()));
265                 }
266             }
267             if (!additions.isEmpty()) {
268                 for (Flow f: additions) {
269                     t.put(LogicalDatastoreType.CONFIGURATION,
270                           FlowUtils.createFlowPath(entry.getKey(), f.getId()), f, true);
271                 }
272             }
273             CheckedFuture<Void, TransactionCommitFailedException> f = t.submit();
274             Futures.addCallback(f, new FutureCallback<Void>() {
275                 @Override
276                 public void onFailure(Throwable t) {
277                     LOG.error("Could not write flow table {}", t);
278                 }
279
280                 @Override
281                 public void onSuccess(Void result) {
282                     LOG.debug("Flow table updated.");
283                 }
284             });
285         }
286
287         private void purgeFromDataStore() {
288             // TODO: tbachman: Remove for Lithium -- this is a workaround
289             //       where some flow-mods aren't getting installed
290             //       on vSwitches when changing L3 contexts
291             WriteTransaction d = dataBroker.newWriteOnlyTransaction();
292
293             for( Entry<InstanceIdentifier<Table>, TableBuilder> entry : flowMap.entrySet()) {
294                 d.delete(LogicalDatastoreType.CONFIGURATION, entry.getKey());
295             }
296
297             CheckedFuture<Void, TransactionCommitFailedException> fu = d.submit();
298             Futures.addCallback(fu, new FutureCallback<Void>() {
299                 @Override
300                 public void onFailure(Throwable th) {
301                     LOG.error("Could not write flow table.", th);
302                 }
303
304                 @Override
305                 public void onSuccess(Void result) {
306                     LOG.debug("Flow table updated.");
307                 }
308             });
309         }
310
311      }
312
313     private void scheduleUpdate() {
314         if (switchManager != null) {
315             LOG.trace("Scheduling flow update task");
316             flowUpdateTask.reschedule(FLOW_UPDATE_DELAY, TimeUnit.MILLISECONDS);
317         }
318     }
319
320     /**
321      * Update the flows on a particular switch
322      */
323     private class SwitchFlowUpdateTask implements Callable<Void> {
324         private FlowMap flowMap;
325
326         public SwitchFlowUpdateTask(FlowMap flowMap) {
327             super();
328             this.flowMap = flowMap;
329         }
330
331         @Override
332         public Void call() throws Exception {
333             for (NodeId node : switchManager.getReadySwitches()) {
334                 PolicyInfo info = policyResolver.getCurrentPolicy();
335                 if (info == null)
336                     return null;
337                 for (OfTable table : flowPipeline) {
338                     try {
339                         table.update(node, info, flowMap);
340                     } catch (Exception e) {
341                         LOG.error("Failed to write flow table {}",
342                                 table.getClass().getSimpleName(), e);
343                     }
344                 }
345             }
346             return null;
347         }
348     }
349
350     /**
351      * Update all flows on all switches as needed.  Note that this will block
352      * one of the threads on the executor.
353      */
354     private class FlowUpdateTask implements Runnable {
355         @Override
356         public void run() {
357             LOG.debug("Beginning flow update task");
358
359             CompletionService<Void> ecs
360                 = new ExecutorCompletionService<Void>(executor);
361             int n = 0;
362
363             FlowMap flowMap = new FlowMap();
364
365             SwitchFlowUpdateTask swut = new SwitchFlowUpdateTask(flowMap);
366             ecs.submit(swut);
367             n+=1;
368
369             for (int i = 0; i < n; i++) {
370                 try {
371                     ecs.take().get();
372                     flowMap.commitToDataStore();
373                 } catch (InterruptedException | ExecutionException e) {
374                     LOG.error("Failed to update flow tables", e);
375                 }
376             }
377             LOG.debug("Flow update completed");
378         }
379     }
380
381
382
383
384
385 }