2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.groupbasedpolicy.renderer.ofoverlay;
11 import static org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.FlowUtils.createGroupPath;
12 import static org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.FlowUtils.createNodePath;
14 import javax.annotation.Nullable;
15 import java.util.ArrayList;
16 import java.util.HashMap;
17 import java.util.HashSet;
18 import java.util.List;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.ConcurrentMap;
23 import java.util.concurrent.ExecutionException;
25 import com.google.common.base.Equivalence;
26 import com.google.common.base.Optional;
27 import com.google.common.base.Preconditions;
28 import com.google.common.collect.Collections2;
29 import com.google.common.collect.Sets;
30 import com.google.common.util.concurrent.CheckedFuture;
31 import com.google.common.util.concurrent.FutureCallback;
32 import com.google.common.util.concurrent.Futures;
33 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
34 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
35 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
36 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
37 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.equivalence.EquivalenceFabric;
38 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.FlowUtils;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupId;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupTypes;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.BucketsBuilder;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.buckets.Bucket;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupBuilder;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
50 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
54 public class OfWriter {
56 private final ConcurrentMap<InstanceIdentifier<Table>, TableBuilder> flowMap =
57 new ConcurrentHashMap<>();
58 private final ConcurrentMap<InstanceIdentifier<Group>, GroupBuilder> groupByIid =
59 new ConcurrentHashMap<>();
60 private final ConcurrentMap<NodeId, Set<GroupId>> groupIdsByNode = new ConcurrentHashMap<>();
62 private static final Logger LOG = LoggerFactory.getLogger(OfWriter.class);
64 public Table getTableForNode(NodeId nodeId, short tableId) {
65 return getTableBuilderForNode(nodeId, tableId).build();
68 private TableBuilder getTableBuilderForNode(NodeId nodeId, short tableId) {
69 InstanceIdentifier<Table> tableIid = FlowUtils.createTablePath(nodeId, tableId);
70 if (this.flowMap.get(tableIid) == null) {
71 this.flowMap.put(tableIid,
72 new TableBuilder().setId(tableId).setFlow(new ArrayList<Flow>()));
74 return this.flowMap.get(tableIid);
77 public boolean groupExists(NodeId nodeId, long groupId) {
78 return (getGroupForNode(nodeId, groupId) != null);
82 * Gets group (or null if group does not exist) for node
84 * @param nodeId NodeId
86 * @return Group or null
88 private Group getGroupForNode(NodeId nodeId, long groupId) {
89 InstanceIdentifier<Group> giid = FlowUtils.createGroupPath(nodeId, groupId);
90 if (this.groupByIid.get(giid) == null) {
93 return this.groupByIid.get(giid).build();
97 * Short form of {@link #writeGroup(NodeId, GroupId, GroupTypes, String, String, Boolean)} with default parameters:<br>
98 * groupTypes = {@code GroupTypes.GroupAll}<br>
99 * containerName = null<br>
100 * groupName = null<br>
103 * @param nodeId NodeId
104 * @param groupId GroupId
105 * @see OfWriter#writeGroup(NodeId, GroupId, GroupTypes, String, String, Boolean)
107 public void writeGroup(NodeId nodeId, GroupId groupId) {
108 writeGroup(nodeId, groupId, GroupTypes.GroupAll, null, null, null);
112 * Writes a new group for OVS
114 * @param nodeId NodeId
115 * @param groupId GroupId
116 * @param groupTypes GroupTypes
117 * @param containerName String
118 * @param groupName String
119 * @param barrier Boolean
121 public void writeGroup(NodeId nodeId, GroupId groupId, @Nullable GroupTypes groupTypes,
122 @Nullable String containerName, @Nullable String groupName,
123 @Nullable Boolean barrier) {
124 Preconditions.checkNotNull(nodeId);
125 Preconditions.checkNotNull(groupId);
127 GroupBuilder gb = new GroupBuilder().setGroupId(groupId)
129 .setContainerName(containerName)
130 .setGroupName(groupName)
131 .setGroupType(groupTypes)
132 .setBuckets(new BucketsBuilder().setBucket(new ArrayList<Bucket>()).build());
134 groupByIid.put(FlowUtils.createGroupPath(nodeId, groupId), gb);
135 if (this.groupIdsByNode.get(nodeId) == null) {
136 this.groupIdsByNode.put(nodeId, new HashSet<GroupId>());
138 this.groupIdsByNode.get(nodeId).add(groupId);
142 * Writes a Bucket to Group.<br>
143 * Group has to be created previously,<br>
144 * or an IllegalStateException will be thrown.
146 * @param nodeId NodeId
147 * @param groupId GroupId
148 * @param bucket Bucket to be added to group
149 * @throws IllegalStateException if the Group is absent
150 * @see #writeGroup(NodeId, GroupId, GroupTypes, String, String, Boolean)
151 * @see #writeGroup(NodeId, GroupId)
153 public void writeBucket(NodeId nodeId, GroupId groupId, Bucket bucket) {
154 Preconditions.checkNotNull(nodeId);
155 Preconditions.checkNotNull(groupId);
156 Preconditions.checkNotNull(bucket);
158 GroupBuilder gb = groupByIid.get(FlowUtils.createGroupPath(nodeId, groupId));
160 gb.getBuckets().getBucket().add(bucket);
162 LOG.error("Group {} on node {} does not exist", groupId, nodeId);
163 throw new IllegalStateException();
167 public void writeFlow(NodeId nodeId, short tableId, Flow flow) {
168 Preconditions.checkNotNull(flow);
169 Preconditions.checkNotNull(nodeId);
171 TableBuilder tableBuilder = this.getTableBuilderForNode(nodeId, tableId);
172 // transforming List<Flow> to Set (with customized equals/hashCode) to eliminate duplicate entries
173 List<Flow> flows = tableBuilder.getFlow();
174 Set<Equivalence.Wrapper<Flow>> wrappedFlows = new HashSet<>(
175 Collections2.transform(flows, EquivalenceFabric.FLOW_WRAPPER_FUNCTION));
177 Equivalence.Wrapper<Flow> wFlow = EquivalenceFabric.FLOW_EQUIVALENCE.wrap(flow);
179 if (!wrappedFlows.contains(wFlow)) {
180 tableBuilder.getFlow().add(flow);
182 LOG.debug("Flow already exists in OfData - {}", flow);
187 * Update groups and flows on every node
188 * Only flows created by gbp - which are present in actualFlowMap - can be removed. It ensures no other flows
190 * Newly created flows are returned and will be used as actual in next update
192 * @param actualFlowMap map of flows which are currently present on all nodes
193 * @return map of newly created flows. These flows will be "actual" in next update
195 public Map<InstanceIdentifier<Table>, TableBuilder> commitToDataStore(DataBroker dataBroker,
196 Map<InstanceIdentifier<Table>, TableBuilder> actualFlowMap) {
197 Map<InstanceIdentifier<Table>, TableBuilder> actualFlows = new HashMap<>();
198 if (dataBroker != null) {
200 for (NodeId nodeId : groupIdsByNode.keySet()) {
202 updateGroups(dataBroker, nodeId);
203 } catch (ExecutionException | InterruptedException e) {
204 LOG.error("Could not update Group table on node {}", nodeId);
208 for (Map.Entry<InstanceIdentifier<Table>, TableBuilder> newEntry : flowMap.entrySet()) {
210 // Get actual flows on the same node/table
211 Map.Entry<InstanceIdentifier<Table>, TableBuilder> actualEntry = null;
212 for (Map.Entry<InstanceIdentifier<Table>, TableBuilder> a : actualFlowMap.entrySet()) {
213 if (a.getKey().equals(newEntry.getKey())) {
217 // Get the currently configured flows for this table
218 updateFlowTable(dataBroker, newEntry, actualEntry);
219 actualFlows.put(newEntry.getKey(), newEntry.getValue());
220 } catch (Exception e) {
221 LOG.warn("Couldn't read flow table {}", newEntry.getKey());
228 private void updateFlowTable(DataBroker dataBroker, Map.Entry<InstanceIdentifier<Table>, TableBuilder> desiredFlowMap,
229 Map.Entry<InstanceIdentifier<Table>, TableBuilder> actualFlowMap)
230 throws ExecutionException, InterruptedException {
233 List<Flow> actualFlows = new ArrayList<>();
234 if (actualFlowMap != null && actualFlowMap.getValue() != null) {
235 actualFlows = actualFlowMap.getValue().getFlow();
238 List<Flow> desiredFlows = new ArrayList<>(desiredFlowMap.getValue().getFlow());
240 // Sets with custom equivalence rules
241 Set<Equivalence.Wrapper<Flow>> wrappedActualFlows = new HashSet<>(
242 Collections2.transform(actualFlows, EquivalenceFabric.FLOW_WRAPPER_FUNCTION));
243 Set<Equivalence.Wrapper<Flow>> wrappedDesiredFlows = new HashSet<>(
244 Collections2.transform(desiredFlows, EquivalenceFabric.FLOW_WRAPPER_FUNCTION));
246 // All gbp flows which are not updated will be removed
247 Sets.SetView<Equivalence.Wrapper<Flow>> deletions = Sets.difference(wrappedActualFlows, wrappedDesiredFlows);
248 // New flows (they were not there before)
249 Sets.SetView<Equivalence.Wrapper<Flow>> additions = Sets.difference(wrappedDesiredFlows, wrappedActualFlows);
251 final InstanceIdentifier<Table> tableIid = desiredFlowMap.getKey();
252 ReadWriteTransaction t = dataBroker.newReadWriteTransaction();
254 if (!deletions.isEmpty()) {
255 for (Equivalence.Wrapper<Flow> wf : deletions) {
258 t.delete(LogicalDatastoreType.CONFIGURATION,
259 FlowUtils.createFlowPath(tableIid, f.getId()));
263 if (!additions.isEmpty()) {
264 for (Equivalence.Wrapper<Flow> wf : additions) {
267 t.put(LogicalDatastoreType.CONFIGURATION,
268 FlowUtils.createFlowPath(tableIid, f.getId()), f, true);
272 CheckedFuture<Void, TransactionCommitFailedException> f = t.submit();
273 Futures.addCallback(f, new FutureCallback<Void>() {
276 public void onFailure(Throwable t) {
277 LOG.error("Could not write flow table {}: {}", tableIid, t);
281 public void onSuccess(Void result) {
282 LOG.debug("Flow table {} updated.", tableIid);
287 private void updateGroups(DataBroker dataBroker, final NodeId nodeId)
288 throws ExecutionException, InterruptedException {
290 if (this.groupIdsByNode.get(nodeId) == null) {
291 this.groupIdsByNode.put(nodeId, new HashSet<GroupId>());
293 Set<GroupId> createdGroupIds = new HashSet<>(this.groupIdsByNode.get(nodeId));
294 // groups from inner structure
295 Set<Group> createdGroups = new HashSet<>();
296 for (GroupId gid : createdGroupIds) {
297 Group g = getGroupForNode(nodeId, gid.getValue());
299 createdGroups.add(g);
302 // groups from datastore
303 Set<Group> existingGroups = new HashSet<>();
304 ReadWriteTransaction t = dataBroker.newReadWriteTransaction();
305 FlowCapableNode fcn = null;
306 InstanceIdentifier<FlowCapableNode> fcniid =
307 createNodePath(nodeId).builder().augmentation(FlowCapableNode.class).build();
308 Optional<FlowCapableNode> r = t.read(LogicalDatastoreType.OPERATIONAL, fcniid).get();
309 if (!r.isPresent()) {
310 LOG.warn("Node {} is not present", fcniid);
315 if (fcn.getGroup() != null) {
316 existingGroups = new HashSet<>(fcn.getGroup());
319 Set<Equivalence.Wrapper<Group>> existingGroupsWrap = new HashSet<>(
320 Collections2.transform(existingGroups, EquivalenceFabric.GROUP_WRAPPER_FUNCTION));
321 Set<Equivalence.Wrapper<Group>> createdGroupsWrap = new HashSet<>(
322 Collections2.transform(createdGroups, EquivalenceFabric.GROUP_WRAPPER_FUNCTION));
324 Sets.SetView<Equivalence.Wrapper<Group>> deletions =
325 Sets.difference(existingGroupsWrap, createdGroupsWrap);
326 Sets.SetView<Equivalence.Wrapper<Group>> additions =
327 Sets.difference(createdGroupsWrap, existingGroupsWrap);
329 if (!deletions.isEmpty()) {
330 for (Equivalence.Wrapper<Group> groupWrapper : deletions) {
331 Group g = groupWrapper.get();
333 LOG.debug("Deleting group {} on node {}", g.getGroupId(), nodeId);
334 t.delete(LogicalDatastoreType.CONFIGURATION,
335 createGroupPath(nodeId, g.getGroupId()));
339 if (!additions.isEmpty()) {
340 for (Equivalence.Wrapper<Group> groupWrapper : additions) {
341 Group g = groupWrapper.get();
343 LOG.debug("Putting node {}, group {}", nodeId, g.getGroupId());
344 t.put(LogicalDatastoreType.CONFIGURATION,
345 createGroupPath(nodeId, g.getGroupId()), g, true);
350 CheckedFuture<Void, TransactionCommitFailedException> f = t.submit();
351 Futures.addCallback(f, new FutureCallback<Void>() {
354 public void onFailure(Throwable t) {
355 LOG.error("Could not write group table on node {}: {}", nodeId, t);
359 public void onSuccess(Void result) {
360 LOG.debug("Group table on node {} updated.", nodeId);