2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.groupbasedpolicy.renderer.ofoverlay;
11 import static org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.FlowUtils.createGroupPath;
12 import static org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.FlowUtils.createNodePath;
14 import javax.annotation.Nullable;
15 import java.util.ArrayList;
16 import java.util.HashMap;
17 import java.util.HashSet;
18 import java.util.List;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.ConcurrentMap;
23 import java.util.concurrent.ExecutionException;
25 import com.google.common.base.Equivalence;
26 import com.google.common.base.Optional;
27 import com.google.common.base.Preconditions;
28 import com.google.common.collect.Collections2;
29 import com.google.common.collect.Sets;
30 import com.google.common.util.concurrent.CheckedFuture;
31 import com.google.common.util.concurrent.FutureCallback;
32 import com.google.common.util.concurrent.Futures;
33 import com.google.common.util.concurrent.MoreExecutors;
35 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
36 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
37 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
38 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
39 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.equivalence.EquivalenceFabric;
40 import org.opendaylight.groupbasedpolicy.renderer.ofoverlay.flow.FlowUtils;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.MatchBuilder;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupId;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupTypes;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.BucketsBuilder;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.buckets.Bucket;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupBuilder;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
54 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
58 public class OfWriter {
60 private final ConcurrentMap<InstanceIdentifier<Table>, TableBuilder> flowMap =
61 new ConcurrentHashMap<>();
62 private final ConcurrentMap<InstanceIdentifier<Group>, GroupBuilder> groupByIid =
63 new ConcurrentHashMap<>();
64 private final ConcurrentMap<NodeId, Set<GroupId>> groupIdsByNode = new ConcurrentHashMap<>();
66 private static final Logger LOG = LoggerFactory.getLogger(OfWriter.class);
68 public Table getTableForNode(NodeId nodeId, short tableId) {
69 return getTableBuilderForNode(nodeId, tableId).build();
72 private TableBuilder getTableBuilderForNode(NodeId nodeId, short tableId) {
73 InstanceIdentifier<Table> tableIid = FlowUtils.createTablePath(nodeId, tableId);
74 if (this.flowMap.get(tableIid) == null) {
75 this.flowMap.put(tableIid,
76 new TableBuilder().setId(tableId).setFlow(new ArrayList<Flow>()));
78 return this.flowMap.get(tableIid);
81 public boolean groupExists(NodeId nodeId, long groupId) {
82 return (getGroupForNode(nodeId, groupId) != null);
86 * Gets group (or null if group does not exist) for node
88 * @param nodeId NodeId
90 * @return Group or null
92 private Group getGroupForNode(NodeId nodeId, long groupId) {
93 InstanceIdentifier<Group> giid = FlowUtils.createGroupPath(nodeId, groupId);
94 if (this.groupByIid.get(giid) == null) {
97 return this.groupByIid.get(giid).build();
101 * Short form of {@link #writeGroup(NodeId, GroupId, GroupTypes, String, String, Boolean)} with default parameters:<br>
102 * groupTypes = {@code GroupTypes.GroupAll}<br>
103 * containerName = null<br>
104 * groupName = null<br>
107 * @param nodeId NodeId
108 * @param groupId GroupId
109 * @see OfWriter#writeGroup(NodeId, GroupId, GroupTypes, String, String, Boolean)
111 public void writeGroup(NodeId nodeId, GroupId groupId) {
112 writeGroup(nodeId, groupId, GroupTypes.GroupAll, null, null, null);
116 * Writes a new group for OVS
118 * @param nodeId NodeId
119 * @param groupId GroupId
120 * @param groupTypes GroupTypes
121 * @param containerName String
122 * @param groupName String
123 * @param barrier Boolean
125 public void writeGroup(NodeId nodeId, GroupId groupId, @Nullable GroupTypes groupTypes,
126 @Nullable String containerName, @Nullable String groupName,
127 @Nullable Boolean barrier) {
128 Preconditions.checkNotNull(nodeId);
129 Preconditions.checkNotNull(groupId);
131 GroupBuilder gb = new GroupBuilder().setGroupId(groupId)
133 .setContainerName(containerName)
134 .setGroupName(groupName)
135 .setGroupType(groupTypes)
136 .setBuckets(new BucketsBuilder().setBucket(new ArrayList<Bucket>()).build());
138 groupByIid.put(FlowUtils.createGroupPath(nodeId, groupId), gb);
139 if (this.groupIdsByNode.get(nodeId) == null) {
140 this.groupIdsByNode.put(nodeId, new HashSet<GroupId>());
142 this.groupIdsByNode.get(nodeId).add(groupId);
146 * Writes a Bucket to Group.<br>
147 * Group has to be created previously,<br>
148 * or an IllegalStateException will be thrown.
150 * @param nodeId NodeId
151 * @param groupId GroupId
152 * @param bucket Bucket to be added to group
153 * @throws IllegalStateException if the Group is absent
154 * @see #writeGroup(NodeId, GroupId, GroupTypes, String, String, Boolean)
155 * @see #writeGroup(NodeId, GroupId)
157 public void writeBucket(NodeId nodeId, GroupId groupId, Bucket bucket) {
158 Preconditions.checkNotNull(nodeId);
159 Preconditions.checkNotNull(groupId);
160 Preconditions.checkNotNull(bucket);
162 GroupBuilder gb = groupByIid.get(FlowUtils.createGroupPath(nodeId, groupId));
164 gb.getBuckets().getBucket().add(bucket);
166 LOG.error("Group {} on node {} does not exist", groupId, nodeId);
167 throw new IllegalStateException();
171 public void writeFlow(NodeId nodeId, short tableId, Flow flow) {
172 Preconditions.checkNotNull(flow);
173 Preconditions.checkNotNull(nodeId);
175 if (flow.getMatch() == null) {
176 flow = new FlowBuilder(flow).setMatch(new MatchBuilder().build()).build();
178 TableBuilder tableBuilder = this.getTableBuilderForNode(nodeId, tableId);
179 // transforming List<Flow> to Set (with customized equals/hashCode) to eliminate duplicate entries
180 List<Flow> flows = tableBuilder.getFlow();
181 Set<Equivalence.Wrapper<Flow>> wrappedFlows = new HashSet<>(
182 Collections2.transform(flows, EquivalenceFabric.FLOW_WRAPPER_FUNCTION));
184 Equivalence.Wrapper<Flow> wFlow = EquivalenceFabric.FLOW_EQUIVALENCE.wrap(flow);
186 if (!wrappedFlows.contains(wFlow)) {
187 tableBuilder.getFlow().add(flow);
189 LOG.debug("Flow already exists in OfData - {}", flow);
194 * Update groups and flows on every node
195 * Only flows created by gbp - which are present in actualFlowMap - can be removed. It ensures no other flows
197 * Newly created flows are returned and will be used as actual in next update
199 * @param actualFlowMap map of flows which are currently present on all nodes
200 * @return map of newly created flows. These flows will be "actual" in next update
202 public Map<InstanceIdentifier<Table>, TableBuilder> commitToDataStore(DataBroker dataBroker,
203 Map<InstanceIdentifier<Table>, TableBuilder> actualFlowMap) {
204 Map<InstanceIdentifier<Table>, TableBuilder> actualFlows = new HashMap<>();
205 if (dataBroker != null) {
207 for (NodeId nodeId : groupIdsByNode.keySet()) {
209 updateGroups(dataBroker, nodeId);
210 } catch (ExecutionException | InterruptedException e) {
211 LOG.error("Could not update Group table on node {}", nodeId);
215 for (Map.Entry<InstanceIdentifier<Table>, TableBuilder> newEntry : flowMap.entrySet()) {
217 // Get actual flows on the same node/table
218 Map.Entry<InstanceIdentifier<Table>, TableBuilder> actualEntry = null;
219 for (Map.Entry<InstanceIdentifier<Table>, TableBuilder> a : actualFlowMap.entrySet()) {
220 if (a.getKey().equals(newEntry.getKey())) {
224 // Get the currently configured flows for this table
225 updateFlowTable(dataBroker, newEntry, actualEntry);
226 actualFlows.put(newEntry.getKey(), newEntry.getValue());
227 } catch (Exception e) {
228 LOG.warn("Couldn't read flow table {}", newEntry.getKey());
235 private void updateFlowTable(DataBroker dataBroker, Map.Entry<InstanceIdentifier<Table>, TableBuilder> desiredFlowMap,
236 Map.Entry<InstanceIdentifier<Table>, TableBuilder> actualFlowMap)
237 throws ExecutionException, InterruptedException {
240 List<Flow> actualFlows = new ArrayList<>();
241 if (actualFlowMap != null && actualFlowMap.getValue() != null) {
242 actualFlows = actualFlowMap.getValue().getFlow();
245 List<Flow> desiredFlows = new ArrayList<>(desiredFlowMap.getValue().getFlow());
247 // Sets with custom equivalence rules
248 Set<Equivalence.Wrapper<Flow>> wrappedActualFlows = new HashSet<>(
249 Collections2.transform(actualFlows, EquivalenceFabric.FLOW_WRAPPER_FUNCTION));
250 Set<Equivalence.Wrapper<Flow>> wrappedDesiredFlows = new HashSet<>(
251 Collections2.transform(desiredFlows, EquivalenceFabric.FLOW_WRAPPER_FUNCTION));
253 // All gbp flows which are not updated will be removed
254 Sets.SetView<Equivalence.Wrapper<Flow>> deletions = Sets.difference(wrappedActualFlows, wrappedDesiredFlows);
255 // New flows (they were not there before)
256 Sets.SetView<Equivalence.Wrapper<Flow>> additions = Sets.difference(wrappedDesiredFlows, wrappedActualFlows);
258 final InstanceIdentifier<Table> tableIid = desiredFlowMap.getKey();
259 ReadWriteTransaction t = dataBroker.newReadWriteTransaction();
261 if (!deletions.isEmpty()) {
262 for (Equivalence.Wrapper<Flow> wf : deletions) {
265 t.delete(LogicalDatastoreType.CONFIGURATION,
266 FlowUtils.createFlowPath(tableIid, f.getId()));
270 if (!additions.isEmpty()) {
271 for (Equivalence.Wrapper<Flow> wf : additions) {
274 if (f.getMatch() == null) {
275 f = new FlowBuilder(f).setMatch(new MatchBuilder().build()).build();
277 t.put(LogicalDatastoreType.CONFIGURATION,
278 FlowUtils.createFlowPath(tableIid, f.getId()), f, true);
282 CheckedFuture<Void, TransactionCommitFailedException> f = t.submit();
283 Futures.addCallback(f, new FutureCallback<Void>() {
286 public void onFailure(Throwable t) {
287 LOG.error("Could not write flow table {}: {}", tableIid, t);
291 public void onSuccess(Void result) {
292 LOG.debug("Flow table {} updated.", tableIid);
294 }, MoreExecutors.directExecutor());
297 private void updateGroups(DataBroker dataBroker, final NodeId nodeId)
298 throws ExecutionException, InterruptedException {
300 if (this.groupIdsByNode.get(nodeId) == null) {
301 this.groupIdsByNode.put(nodeId, new HashSet<GroupId>());
303 Set<GroupId> createdGroupIds = new HashSet<>(this.groupIdsByNode.get(nodeId));
304 // groups from inner structure
305 Set<Group> createdGroups = new HashSet<>();
306 for (GroupId gid : createdGroupIds) {
307 Group g = getGroupForNode(nodeId, gid.getValue());
309 createdGroups.add(g);
312 // groups from datastore
313 Set<Group> existingGroups = new HashSet<>();
314 ReadWriteTransaction t = dataBroker.newReadWriteTransaction();
315 FlowCapableNode fcn = null;
316 InstanceIdentifier<FlowCapableNode> fcniid =
317 createNodePath(nodeId).builder().augmentation(FlowCapableNode.class).build();
318 Optional<FlowCapableNode> r = t.read(LogicalDatastoreType.OPERATIONAL, fcniid).get();
319 if (!r.isPresent()) {
320 LOG.warn("Node {} is not present", fcniid);
325 if (fcn.getGroup() != null) {
326 existingGroups = new HashSet<>(fcn.getGroup());
329 Set<Equivalence.Wrapper<Group>> existingGroupsWrap = new HashSet<>(
330 Collections2.transform(existingGroups, EquivalenceFabric.GROUP_WRAPPER_FUNCTION));
331 Set<Equivalence.Wrapper<Group>> createdGroupsWrap = new HashSet<>(
332 Collections2.transform(createdGroups, EquivalenceFabric.GROUP_WRAPPER_FUNCTION));
334 Sets.SetView<Equivalence.Wrapper<Group>> deletions =
335 Sets.difference(existingGroupsWrap, createdGroupsWrap);
336 Sets.SetView<Equivalence.Wrapper<Group>> additions =
337 Sets.difference(createdGroupsWrap, existingGroupsWrap);
339 if (!deletions.isEmpty()) {
340 for (Equivalence.Wrapper<Group> groupWrapper : deletions) {
341 Group g = groupWrapper.get();
343 LOG.debug("Deleting group {} on node {}", g.getGroupId(), nodeId);
344 t.delete(LogicalDatastoreType.CONFIGURATION,
345 createGroupPath(nodeId, g.getGroupId()));
349 if (!additions.isEmpty()) {
350 for (Equivalence.Wrapper<Group> groupWrapper : additions) {
351 Group g = groupWrapper.get();
353 LOG.debug("Putting node {}, group {}", nodeId, g.getGroupId());
354 t.put(LogicalDatastoreType.CONFIGURATION,
355 createGroupPath(nodeId, g.getGroupId()), g, true);
360 CheckedFuture<Void, TransactionCommitFailedException> f = t.submit();
361 Futures.addCallback(f, new FutureCallback<Void>() {
364 public void onFailure(Throwable t) {
365 LOG.error("Could not write group table on node {}: {}", nodeId, t);
369 public void onSuccess(Void result) {
370 LOG.debug("Group table on node {} updated.", nodeId);
372 }, MoreExecutors.directExecutor());