Merge "Bug#2977 EndpointManager maps are not threadsafe"
[groupbasedpolicy.git] / renderers / ofoverlay / src / main / java / org / opendaylight / groupbasedpolicy / renderer / ofoverlay / PolicyManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.groupbasedpolicy.renderer.ofoverlay;
10
11 import java.util.ArrayList;
12 import java.util.List;
13 import java.util.Map.Entry;
14 import java.util.Set;
15 import java.util.concurrent.Callable;
16 import java.util.concurrent.CompletionService;
17 import java.util.concurrent.ConcurrentHashMap;
18 import java.util.concurrent.ConcurrentMap;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.ExecutorCompletionService;
21 import java.util.concurrent.ScheduledExecutorService;
22 import java.util.concurrent.TimeUnit;
23
24 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
25 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
26 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
27 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
28 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
29 import org.opendaylight.groupbasedpolicy.endpoint.EpKey;
30 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.DestinationMapper;
31 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.FlowUtils;
32 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.GroupTable;
33 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.OfTable;
34 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.PolicyEnforcer;
35 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.PortSecurity;
36 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.SourceMapper;
37 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.sf.SubjectFeatures;
38 import org.opendaylight.groupbasedpolicy.resolver.EgKey;
39 import org.opendaylight.groupbasedpolicy.resolver.PolicyInfo;
40 import org.opendaylight.groupbasedpolicy.resolver.PolicyListener;
41 import org.opendaylight.groupbasedpolicy.resolver.PolicyResolver;
42 import org.opendaylight.groupbasedpolicy.resolver.PolicyScope;
43 import org.opendaylight.groupbasedpolicy.util.SingletonTask;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.ofoverlay.rev140528.OfOverlayConfig.LearningMode;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.policy.rev140421.SubjectFeatureDefinitions;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
50 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53
54 import com.google.common.base.Preconditions;
55 import com.google.common.collect.ImmutableList;
56 import com.google.common.util.concurrent.CheckedFuture;
57 import com.google.common.util.concurrent.FutureCallback;
58 import com.google.common.util.concurrent.Futures;
59
60 /**
61  * Manage policies on switches by subscribing to updates from the
62  * policy resolver and information about endpoints from the endpoint
63  * registry
64  */
65 public class PolicyManager
66      implements SwitchListener, PolicyListener, EndpointListener {
67     private static final Logger LOG =
68             LoggerFactory.getLogger(PolicyManager.class);
69
70     private final SwitchManager switchManager;
71     private final PolicyResolver policyResolver;
72
73     private final PolicyScope policyScope;
74
75     private final ScheduledExecutorService executor;
76     private final SingletonTask flowUpdateTask;
77     private final DataBroker dataBroker;
78
79     /**
80      * The flow tables that make up the processing pipeline
81      */
82     private final List<? extends OfTable> flowPipeline;
83
84     /**
85      * The delay before triggering the flow update task in response to an
86      * event in milliseconds.
87      */
88     private final static int FLOW_UPDATE_DELAY = 250;
89
90
91
92     public PolicyManager(DataBroker dataBroker,
93                          PolicyResolver policyResolver,
94                          SwitchManager switchManager,
95                          EndpointManager endpointManager,
96                          RpcProviderRegistry rpcRegistry,
97                          ScheduledExecutorService executor) {
98         super();
99         this.switchManager = switchManager;
100         this.executor = executor;
101         this.policyResolver = policyResolver;
102         this.dataBroker = dataBroker;
103
104
105         if (dataBroker != null) {
106             WriteTransaction t = dataBroker.newWriteOnlyTransaction();
107             t.put(LogicalDatastoreType.OPERATIONAL,
108                   InstanceIdentifier
109                       .builder(SubjectFeatureDefinitions.class)
110                       .build(),
111                   SubjectFeatures.OF_OVERLAY_FEATURES);
112             t.submit();
113         }
114
115         OfContext ctx = new OfContext(dataBroker, rpcRegistry,
116                                         this, policyResolver, switchManager,
117                                         endpointManager, executor);
118         flowPipeline = ImmutableList.of(new PortSecurity(ctx),
119                                         new GroupTable(ctx),
120                                         new SourceMapper(ctx),
121                                         new DestinationMapper(ctx),
122                                         new PolicyEnforcer(ctx));
123
124         policyScope = policyResolver.registerListener(this);
125         if (switchManager != null)
126             switchManager.registerListener(this);
127         endpointManager.registerListener(this);
128
129         flowUpdateTask = new SingletonTask(executor, new FlowUpdateTask());
130         scheduleUpdate();
131
132         LOG.debug("Initialized OFOverlay policy manager");
133     }
134
135     // **************
136     // SwitchListener
137     // **************
138
139     @Override
140     public void switchReady(final NodeId nodeId) {
141         //TODO Apr15 alagalah : OVSDB CRUD tunnels may go here.
142 //        WriteTransaction t = dataBroker.newWriteOnlyTransaction();
143 //
144 //        NodeBuilder nb = new NodeBuilder()
145 //            .setId(nodeId)
146 //            .addAugmentation(FlowCapableNode.class,
147 //                             new FlowCapableNodeBuilder()
148 //                                .build());
149 //        t.merge(LogicalDatastoreType.CONFIGURATION,
150 //                FlowUtils.createNodePath(nodeId),
151 //                nb.build(), true);
152 //        ListenableFuture<Void> result = t.submit();
153 //        Futures.addCallback(result,
154 //                            new FutureCallback<Void>() {
155 //            @Override
156 //            public void onSuccess(Void result) {
157 //                dirty.get().addNode(nodeId);
158 //                scheduleUpdate();
159 //            }
160 //
161 //            @Override
162 //            public void onFailure(Throwable t) {
163 //                LOG.error("Could not add switch {}", nodeId, t);
164 //            }
165 //        });
166
167     }
168
169     @Override
170     public void switchRemoved(NodeId sw) {
171         // XXX TODO purge switch flows
172         scheduleUpdate();
173     }
174
175     @Override
176     public void switchUpdated(NodeId sw) {
177         scheduleUpdate();
178     }
179
180     // ****************
181     // EndpointListener
182     // ****************
183
184     @Override
185     public void endpointUpdated(EpKey epKey) {
186         scheduleUpdate();
187     }
188
189     @Override
190     public void nodeEndpointUpdated(NodeId nodeId, EpKey epKey){
191         scheduleUpdate();
192     }
193
194     @Override
195     public void groupEndpointUpdated(EgKey egKey, EpKey epKey) {
196         policyScope.addToScope(egKey.getTenantId(), egKey.getEgId());
197         scheduleUpdate();
198     }
199
200     // **************
201     // PolicyListener
202     // **************
203
204     @Override
205     public void policyUpdated(Set<EgKey> updatedConsumers) {
206         scheduleUpdate();
207     }
208
209     // *************
210     // PolicyManager
211     // *************
212
213     /**
214      * Set the learning mode to the specified value
215      * @param learningMode the learning mode to set
216      */
217     public void setLearningMode(LearningMode learningMode) {
218         // No-op for now
219     }
220
221
222
223     // **************
224     // Implementation
225     // **************
226
227     public class FlowMap{
228         private ConcurrentMap<InstanceIdentifier<Table>, TableBuilder> flowMap = new ConcurrentHashMap<>();
229
230         public FlowMap() {
231         }
232
233         public TableBuilder getTableForNode(NodeId nodeId, short tableId) {
234             InstanceIdentifier<Table> tableIid = FlowUtils.createTablePath(nodeId, tableId);
235             if(this.flowMap.get(tableIid) == null) {
236                 this.flowMap.put(tableIid, new TableBuilder().setId(tableId));
237                 this.flowMap.get(tableIid).setFlow(new ArrayList<Flow>());
238             }
239             return this.flowMap.get(tableIid);
240         }
241
242         public void writeFlow(NodeId nodeId,short tableId, Flow flow) {
243             TableBuilder tableBuilder = this.getTableForNode(nodeId, tableId);
244             if (!tableBuilder.getFlow().contains(flow)) {
245                 tableBuilder.getFlow().add(Preconditions.checkNotNull(flow));
246             }
247         }
248
249         public void commitToDataStore() {
250             if (dataBroker != null) {
251                 WriteTransaction t = dataBroker.newWriteOnlyTransaction();
252
253                 for( Entry<InstanceIdentifier<Table>, TableBuilder> entry : flowMap.entrySet()) {
254                     t.put(LogicalDatastoreType.CONFIGURATION,
255                           entry.getKey(), entry.getValue().build(),true);
256                 }
257
258                 CheckedFuture<Void, TransactionCommitFailedException> f = t.submit();
259                 Futures.addCallback(f, new FutureCallback<Void>() {
260                     @Override
261                     public void onFailure(Throwable t) {
262                         LOG.error("Could not write flow table.", t);
263                     }
264
265                     @Override
266                     public void onSuccess(Void result) {
267                         LOG.debug("Flow table updated.");
268                     }
269                 });
270             }
271         }
272
273      }
274
275     private void scheduleUpdate() {
276         if (switchManager != null) {
277             LOG.trace("Scheduling flow update task");
278             flowUpdateTask.reschedule(FLOW_UPDATE_DELAY, TimeUnit.MILLISECONDS);
279         }
280     }
281
282     /**
283      * Update the flows on a particular switch
284      */
285     private class SwitchFlowUpdateTask implements Callable<Void> {
286         private FlowMap flowMap;
287
288         public SwitchFlowUpdateTask(FlowMap flowMap) {
289             super();
290             this.flowMap = flowMap;
291         }
292
293         @Override
294         public Void call() throws Exception {
295             for (NodeId node : switchManager.getReadySwitches()) {
296                 if (!switchManager.isSwitchReady(node))
297                     return null;
298                 PolicyInfo info = policyResolver.getCurrentPolicy();
299                 if (info == null)
300                     return null;
301                 for (OfTable table : flowPipeline) {
302                     try {
303                         table.update(node, info, flowMap);
304                     } catch (Exception e) {
305                         LOG.error("Failed to write flow table {}",
306                                 table.getClass().getSimpleName(), e);
307                     }
308                 }
309             }
310             return null;
311         }
312     }
313
314     /**
315      * Update all flows on all switches as needed.  Note that this will block
316      * one of the threads on the executor.
317      */
318     private class FlowUpdateTask implements Runnable {
319         @Override
320         public void run() {
321             LOG.debug("Beginning flow update task");
322
323             CompletionService<Void> ecs
324                 = new ExecutorCompletionService<Void>(executor);
325             int n = 0;
326
327             FlowMap flowMap = new FlowMap();
328
329             SwitchFlowUpdateTask swut = new SwitchFlowUpdateTask(flowMap);
330             ecs.submit(swut);
331             n+=1;
332
333             for (int i = 0; i < n; i++) {
334                 try {
335                     ecs.take().get();
336                     flowMap.commitToDataStore();
337                 } catch (InterruptedException | ExecutionException e) {
338                     LOG.error("Failed to update flow tables", e);
339                 }
340             }
341             LOG.debug("Flow update completed");
342         }
343     }
344
345
346
347
348
349 }