update deprecated transform and addCallback methods
[groupbasedpolicy.git] / renderers / ofoverlay / src / main / java / org / opendaylight / groupbasedpolicy / renderer / ofoverlay / OfWriter.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.groupbasedpolicy.renderer.ofoverlay;
10
11 import static org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.FlowUtils.createGroupPath;
12 import static org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.FlowUtils.createNodePath;
13
14 import javax.annotation.Nullable;
15 import java.util.ArrayList;
16 import java.util.HashMap;
17 import java.util.HashSet;
18 import java.util.List;
19 import java.util.Map;
20 import java.util.Set;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.ConcurrentMap;
23 import java.util.concurrent.ExecutionException;
24
25 import com.google.common.base.Equivalence;
26 import com.google.common.base.Optional;
27 import com.google.common.base.Preconditions;
28 import com.google.common.collect.Collections2;
29 import com.google.common.collect.Sets;
30 import com.google.common.util.concurrent.CheckedFuture;
31 import com.google.common.util.concurrent.FutureCallback;
32 import com.google.common.util.concurrent.Futures;
33 import com.google.common.util.concurrent.MoreExecutors;
34
35 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
36 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
37 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
38 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
39 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.equivalence.EquivalenceFabric;
40 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.FlowUtils;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.MatchBuilder;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupId;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupTypes;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.BucketsBuilder;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.buckets.Bucket;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupBuilder;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
54 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
57
58 public class OfWriter {
59
60     private final ConcurrentMap<InstanceIdentifier<Table>, TableBuilder> flowMap =
61             new ConcurrentHashMap<>();
62     private final ConcurrentMap<InstanceIdentifier<Group>, GroupBuilder> groupByIid =
63             new ConcurrentHashMap<>();
64     private final ConcurrentMap<NodeId, Set<GroupId>> groupIdsByNode = new ConcurrentHashMap<>();
65
66     private static final Logger LOG = LoggerFactory.getLogger(OfWriter.class);
67
68     public Table getTableForNode(NodeId nodeId, short tableId) {
69         return getTableBuilderForNode(nodeId, tableId).build();
70     }
71
72     private TableBuilder getTableBuilderForNode(NodeId nodeId, short tableId) {
73         InstanceIdentifier<Table> tableIid = FlowUtils.createTablePath(nodeId, tableId);
74         if (this.flowMap.get(tableIid) == null) {
75             this.flowMap.put(tableIid,
76                     new TableBuilder().setId(tableId).setFlow(new ArrayList<Flow>()));
77         }
78         return this.flowMap.get(tableIid);
79     }
80
81     public boolean groupExists(NodeId nodeId, long groupId) {
82         return (getGroupForNode(nodeId, groupId) != null);
83     }
84
85     /**
86      * Gets group (or null if group does not exist) for node
87      *
88      * @param nodeId  NodeId
89      * @param groupId long
90      * @return Group or null
91      */
92     private Group getGroupForNode(NodeId nodeId, long groupId) {
93         InstanceIdentifier<Group> giid = FlowUtils.createGroupPath(nodeId, groupId);
94         if (this.groupByIid.get(giid) == null) {
95             return null;
96         }
97         return this.groupByIid.get(giid).build();
98     }
99
100     /**
101      * Short form of {@link #writeGroup(NodeId, GroupId, GroupTypes, String, String, Boolean)} with default parameters:<br>
102      * groupTypes = {@code GroupTypes.GroupAll}<br>
103      * containerName = null<br>
104      * groupName = null<br>
105      * barrier = null
106      *
107      * @param nodeId     NodeId
108      * @param groupId    GroupId
109      * @see OfWriter#writeGroup(NodeId, GroupId, GroupTypes, String, String, Boolean)
110      */
111     public void writeGroup(NodeId nodeId, GroupId groupId) {
112         writeGroup(nodeId, groupId, GroupTypes.GroupAll, null, null, null);
113     }
114
115     /**
116      * Writes a new group for OVS
117      *
118      * @param nodeId        NodeId
119      * @param groupId       GroupId
120      * @param groupTypes    GroupTypes
121      * @param containerName String
122      * @param groupName     String
123      * @param barrier       Boolean
124      */
125     public void writeGroup(NodeId nodeId, GroupId groupId, @Nullable GroupTypes groupTypes,
126             @Nullable String containerName, @Nullable String groupName,
127             @Nullable Boolean barrier) {
128         Preconditions.checkNotNull(nodeId);
129         Preconditions.checkNotNull(groupId);
130
131         GroupBuilder gb = new GroupBuilder().setGroupId(groupId)
132                 .setBarrier(barrier)
133                 .setContainerName(containerName)
134                 .setGroupName(groupName)
135                 .setGroupType(groupTypes)
136                 .setBuckets(new BucketsBuilder().setBucket(new ArrayList<Bucket>()).build());
137
138         groupByIid.put(FlowUtils.createGroupPath(nodeId, groupId), gb);
139         if (this.groupIdsByNode.get(nodeId) == null) {
140             this.groupIdsByNode.put(nodeId, new HashSet<GroupId>());
141         }
142         this.groupIdsByNode.get(nodeId).add(groupId);
143     }
144
145     /**
146      * Writes a Bucket to Group.<br>
147      * Group has to be created previously,<br>
148      * or an IllegalStateException will be thrown.
149      *
150      * @param nodeId  NodeId
151      * @param groupId GroupId
152      * @param bucket  Bucket to be added to group
153      * @throws IllegalStateException if the Group is absent
154      * @see #writeGroup(NodeId, GroupId, GroupTypes, String, String, Boolean)
155      * @see #writeGroup(NodeId, GroupId)
156      */
157     public void writeBucket(NodeId nodeId, GroupId groupId, Bucket bucket) {
158         Preconditions.checkNotNull(nodeId);
159         Preconditions.checkNotNull(groupId);
160         Preconditions.checkNotNull(bucket);
161
162         GroupBuilder gb = groupByIid.get(FlowUtils.createGroupPath(nodeId, groupId));
163         if (gb != null) {
164             gb.getBuckets().getBucket().add(bucket);
165         } else {
166             LOG.error("Group {} on node {} does not exist", groupId, nodeId);
167             throw new IllegalStateException();
168         }
169     }
170
171     public void writeFlow(NodeId nodeId, short tableId, Flow flow) {
172         Preconditions.checkNotNull(flow);
173         Preconditions.checkNotNull(nodeId);
174
175         if (flow.getMatch() == null) {
176             flow = new FlowBuilder(flow).setMatch(new MatchBuilder().build()).build();
177         }
178         TableBuilder tableBuilder = this.getTableBuilderForNode(nodeId, tableId);
179         // transforming List<Flow> to Set (with customized equals/hashCode) to eliminate duplicate entries
180         List<Flow> flows = tableBuilder.getFlow();
181         Set<Equivalence.Wrapper<Flow>> wrappedFlows = new HashSet<>(
182                 Collections2.transform(flows, EquivalenceFabric.FLOW_WRAPPER_FUNCTION));
183
184         Equivalence.Wrapper<Flow> wFlow = EquivalenceFabric.FLOW_EQUIVALENCE.wrap(flow);
185
186         if (!wrappedFlows.contains(wFlow)) {
187             tableBuilder.getFlow().add(flow);
188         } else {
189             LOG.debug("Flow already exists in OfData - {}", flow);
190         }
191     }
192
193     /**
194      * Update groups and flows on every node
195      * Only flows created by gbp - which are present in actualFlowMap - can be removed. It ensures no other flows
196      * are deleted
197      * Newly created flows are returned and will be used as actual in next update
198      *
199      * @param actualFlowMap map of flows which are currently present on all nodes
200      * @return map of newly created flows. These flows will be "actual" in next update
201      */
202     public Map<InstanceIdentifier<Table>, TableBuilder> commitToDataStore(DataBroker dataBroker,
203                                                                           Map<InstanceIdentifier<Table>, TableBuilder> actualFlowMap) {
204         Map<InstanceIdentifier<Table>, TableBuilder> actualFlows = new HashMap<>();
205         if (dataBroker != null) {
206
207             for (NodeId nodeId : groupIdsByNode.keySet()) {
208                 try {
209                     updateGroups(dataBroker, nodeId);
210                 } catch (ExecutionException | InterruptedException e) {
211                     LOG.error("Could not update Group table on node {}", nodeId);
212                 }
213             }
214
215             for (Map.Entry<InstanceIdentifier<Table>, TableBuilder> newEntry : flowMap.entrySet()) {
216                 try {
217                     // Get actual flows on the same node/table
218                     Map.Entry<InstanceIdentifier<Table>, TableBuilder> actualEntry = null;
219                     for (Map.Entry<InstanceIdentifier<Table>, TableBuilder> a : actualFlowMap.entrySet()) {
220                         if (a.getKey().equals(newEntry.getKey())) {
221                             actualEntry = a;
222                         }
223                     }
224                     // Get the currently configured flows for this table
225                     updateFlowTable(dataBroker, newEntry, actualEntry);
226                     actualFlows.put(newEntry.getKey(), newEntry.getValue());
227                 } catch (Exception e) {
228                     LOG.warn("Couldn't read flow table {}", newEntry.getKey());
229                 }
230             }
231         }
232         return actualFlows;
233     }
234
235     private void updateFlowTable(DataBroker dataBroker, Map.Entry<InstanceIdentifier<Table>, TableBuilder> desiredFlowMap,
236                                  Map.Entry<InstanceIdentifier<Table>, TableBuilder> actualFlowMap)
237             throws ExecutionException, InterruptedException {
238
239         // Actual state
240         List<Flow> actualFlows = new ArrayList<>();
241         if (actualFlowMap != null && actualFlowMap.getValue() != null) {
242             actualFlows = actualFlowMap.getValue().getFlow();
243         }
244         // New state
245         List<Flow> desiredFlows = new ArrayList<>(desiredFlowMap.getValue().getFlow());
246
247         // Sets with custom equivalence rules
248         Set<Equivalence.Wrapper<Flow>> wrappedActualFlows = new HashSet<>(
249                 Collections2.transform(actualFlows, EquivalenceFabric.FLOW_WRAPPER_FUNCTION));
250         Set<Equivalence.Wrapper<Flow>> wrappedDesiredFlows = new HashSet<>(
251                 Collections2.transform(desiredFlows, EquivalenceFabric.FLOW_WRAPPER_FUNCTION));
252
253         // All gbp flows which are not updated will be removed
254         Sets.SetView<Equivalence.Wrapper<Flow>> deletions = Sets.difference(wrappedActualFlows, wrappedDesiredFlows);
255         // New flows (they were not there before)
256         Sets.SetView<Equivalence.Wrapper<Flow>> additions = Sets.difference(wrappedDesiredFlows, wrappedActualFlows);
257
258         final InstanceIdentifier<Table> tableIid = desiredFlowMap.getKey();
259         ReadWriteTransaction t = dataBroker.newReadWriteTransaction();
260
261         if (!deletions.isEmpty()) {
262             for (Equivalence.Wrapper<Flow> wf : deletions) {
263                 Flow f = wf.get();
264                 if (f != null) {
265                     t.delete(LogicalDatastoreType.CONFIGURATION,
266                             FlowUtils.createFlowPath(tableIid, f.getId()));
267                 }
268             }
269         }
270         if (!additions.isEmpty()) {
271             for (Equivalence.Wrapper<Flow> wf : additions) {
272                 Flow f = wf.get();
273                 if (f != null) {
274                     if (f.getMatch() == null) {
275                         f = new FlowBuilder(f).setMatch(new MatchBuilder().build()).build();
276                     }
277                     t.put(LogicalDatastoreType.CONFIGURATION,
278                             FlowUtils.createFlowPath(tableIid, f.getId()), f, true);
279                 }
280             }
281         }
282         CheckedFuture<Void, TransactionCommitFailedException> f = t.submit();
283         Futures.addCallback(f, new FutureCallback<Void>() {
284
285             @Override
286             public void onFailure(Throwable t) {
287                 LOG.error("Could not write flow table {}: {}", tableIid, t);
288             }
289
290             @Override
291             public void onSuccess(Void result) {
292                 LOG.debug("Flow table {} updated.", tableIid);
293             }
294         }, MoreExecutors.directExecutor());
295     }
296
297     private void updateGroups(DataBroker dataBroker, final NodeId nodeId)
298             throws ExecutionException, InterruptedException {
299
300         if (this.groupIdsByNode.get(nodeId) == null) {
301             this.groupIdsByNode.put(nodeId, new HashSet<GroupId>());
302         }
303         Set<GroupId> createdGroupIds = new HashSet<>(this.groupIdsByNode.get(nodeId));
304         // groups from inner structure
305         Set<Group> createdGroups = new HashSet<>();
306         for (GroupId gid : createdGroupIds) {
307             Group g = getGroupForNode(nodeId, gid.getValue());
308             if (g != null) {
309                 createdGroups.add(g);
310             }
311         }
312         // groups from datastore
313         Set<Group> existingGroups = new HashSet<>();
314         ReadWriteTransaction t = dataBroker.newReadWriteTransaction();
315         FlowCapableNode fcn = null;
316         InstanceIdentifier<FlowCapableNode> fcniid =
317                 createNodePath(nodeId).builder().augmentation(FlowCapableNode.class).build();
318         Optional<FlowCapableNode> r = t.read(LogicalDatastoreType.OPERATIONAL, fcniid).get();
319         if (!r.isPresent()) {
320             LOG.warn("Node {} is not present", fcniid);
321             return;
322         }
323         fcn = r.get();
324
325         if (fcn.getGroup() != null) {
326             existingGroups = new HashSet<>(fcn.getGroup());
327         }
328
329         Set<Equivalence.Wrapper<Group>> existingGroupsWrap = new HashSet<>(
330                 Collections2.transform(existingGroups, EquivalenceFabric.GROUP_WRAPPER_FUNCTION));
331         Set<Equivalence.Wrapper<Group>> createdGroupsWrap = new HashSet<>(
332                 Collections2.transform(createdGroups, EquivalenceFabric.GROUP_WRAPPER_FUNCTION));
333
334         Sets.SetView<Equivalence.Wrapper<Group>> deletions =
335                 Sets.difference(existingGroupsWrap, createdGroupsWrap);
336         Sets.SetView<Equivalence.Wrapper<Group>> additions =
337                 Sets.difference(createdGroupsWrap, existingGroupsWrap);
338
339         if (!deletions.isEmpty()) {
340             for (Equivalence.Wrapper<Group> groupWrapper : deletions) {
341                 Group g = groupWrapper.get();
342                 if (g != null) {
343                     LOG.debug("Deleting group {} on node {}", g.getGroupId(), nodeId);
344                     t.delete(LogicalDatastoreType.CONFIGURATION,
345                             createGroupPath(nodeId, g.getGroupId()));
346                 }
347             }
348         }
349         if (!additions.isEmpty()) {
350             for (Equivalence.Wrapper<Group> groupWrapper : additions) {
351                 Group g = groupWrapper.get();
352                 if (g != null) {
353                     LOG.debug("Putting node {}, group {}", nodeId, g.getGroupId());
354                     t.put(LogicalDatastoreType.CONFIGURATION,
355                             createGroupPath(nodeId, g.getGroupId()), g, true);
356                 }
357             }
358         }
359
360         CheckedFuture<Void, TransactionCommitFailedException> f = t.submit();
361         Futures.addCallback(f, new FutureCallback<Void>() {
362
363             @Override
364             public void onFailure(Throwable t) {
365                 LOG.error("Could not write group table on node {}: {}", nodeId, t);
366             }
367
368             @Override
369             public void onSuccess(Void result) {
370                 LOG.debug("Group table on node {} updated.", nodeId);
371             }
372         }, MoreExecutors.directExecutor());
373     }
374
375 }