2 * Copyright (c) 2018 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.applications.frm.impl;
10 import static java.util.Objects.requireNonNull;
11 import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.buildGroupInstanceIdentifier;
12 import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.getFlowId;
13 import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.getNodeIdValueFromNodeIdentifier;
14 import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.getTableId;
15 import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.isFlowDependentOnGroup;
16 import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.isGroupExistsOnDevice;
18 import com.google.common.util.concurrent.FutureCallback;
19 import com.google.common.util.concurrent.Futures;
20 import com.google.common.util.concurrent.ListenableFuture;
21 import com.google.common.util.concurrent.MoreExecutors;
22 import com.google.common.util.concurrent.SettableFuture;
23 import java.util.ArrayList;
24 import java.util.Collections;
25 import java.util.List;
26 import java.util.Optional;
27 import java.util.concurrent.ExecutionException;
28 import org.opendaylight.infrautils.utils.concurrent.LoggingFutures;
29 import org.opendaylight.mdsal.binding.api.ReadTransaction;
30 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
31 import org.opendaylight.openflowplugin.applications.frm.BundleMessagesCommiter;
32 import org.opendaylight.openflowplugin.applications.frm.ForwardingRulesManager;
33 import org.opendaylight.openflowplugin.applications.frm.NodeConfigurator;
34 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Uri;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupInputBuilder;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupRef;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesInput;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesInputBuilder;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesOutput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.MessagesBuilder;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.Message;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.MessageBuilder;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.BundleInnerMessage;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleAddFlowCaseBuilder;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleAddGroupCaseBuilder;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveFlowCaseBuilder;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.add.flow._case.AddFlowCaseDataBuilder;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.add.group._case.AddGroupCaseDataBuilder;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.remove.flow._case.RemoveFlowCaseDataBuilder;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleFlags;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleId;
57 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
58 import org.opendaylight.yangtools.yang.common.ErrorType;
59 import org.opendaylight.yangtools.yang.common.RpcResult;
60 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
61 import org.opendaylight.yangtools.yang.common.Uint32;
62 import org.opendaylight.yangtools.yang.common.Uint8;
63 import org.slf4j.Logger;
64 import org.slf4j.LoggerFactory;
66 public class BundleFlowForwarder implements BundleMessagesCommiter<Flow> {
67 private static final Logger LOG = LoggerFactory.getLogger(BundleFlowForwarder.class);
68 private static final BundleFlags BUNDLE_FLAGS = new BundleFlags(true, true);
70 private final ForwardingRulesManager forwardingRulesManager;
71 private final NodeConfigurator nodeConfigurator;
73 public BundleFlowForwarder(final ForwardingRulesManager forwardingRulesManager) {
74 this.forwardingRulesManager = requireNonNull(forwardingRulesManager, "ForwardingRulesManager can not be null!");
75 nodeConfigurator = requireNonNull(forwardingRulesManager.getNodeConfigurator(),
76 "NodeConfigurator can not be null!");
80 public void remove(final InstanceIdentifier<Flow> identifier,
82 final InstanceIdentifier<FlowCapableNode> nodeIdent,
83 final BundleId bundleId) {
84 final String nodeId = getNodeIdValueFromNodeIdentifier(nodeIdent);
85 nodeConfigurator.enqueueJob(nodeId, () -> {
86 String node = nodeIdent.firstKeyOf(Node.class).getId().getValue();
87 final var addBundleMessagesInput = new AddBundleMessagesInputBuilder()
88 .setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)))
89 .setBundleId(bundleId)
90 .setFlags(BUNDLE_FLAGS)
91 .setMessages(new MessagesBuilder()
92 .setMessage(List.of(new MessageBuilder()
93 .setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)))
94 .setBundleInnerMessage(new BundleRemoveFlowCaseBuilder()
95 .setRemoveFlowCaseData(new RemoveFlowCaseDataBuilder(flow).build())
100 LOG.trace("Pushing flow remove message {} to bundle {} for device {}", addBundleMessagesInput,
101 bundleId.getValue(), node);
102 return LoggingFutures.addErrorLogging(
103 forwardingRulesManager.addBundleMessages().invoke(addBundleMessagesInput),
104 LOG, "removeBundleFlow");
109 public void update(final InstanceIdentifier<Flow> identifier,
110 final Flow originalFlow,
111 final Flow updatedFlow,
112 final InstanceIdentifier<FlowCapableNode> nodeIdent,
113 final BundleId bundleId) {
114 remove(identifier, originalFlow, nodeIdent, bundleId);
115 add(identifier, updatedFlow, nodeIdent, bundleId);
119 public ListenableFuture<RpcResult<AddBundleMessagesOutput>> add(final InstanceIdentifier<Flow> identifier,
121 final InstanceIdentifier<FlowCapableNode> nodeIdent,
122 final BundleId bundleId) {
123 final String nodeId = getNodeIdValueFromNodeIdentifier(nodeIdent);
124 return nodeConfigurator.enqueueJob(nodeId, () -> {
125 BundleInnerMessage bundleInnerMessage = new BundleAddFlowCaseBuilder()
126 .setAddFlowCaseData(new AddFlowCaseDataBuilder(flow)
129 Message message = new MessageBuilder().setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)))
130 .setBundleInnerMessage(bundleInnerMessage)
132 ListenableFuture<RpcResult<AddBundleMessagesOutput>> groupFuture = pushDependentGroup(nodeIdent, flow,
133 identifier, bundleId);
134 SettableFuture<RpcResult<AddBundleMessagesOutput>> resultFuture = SettableFuture.create();
135 Futures.addCallback(groupFuture,
136 new BundleFlowCallBack(nodeIdent, bundleId, message, identifier, resultFuture),
137 MoreExecutors.directExecutor());
142 private ListenableFuture<RpcResult<AddBundleMessagesOutput>> pushDependentGroup(
143 final InstanceIdentifier<FlowCapableNode> nodeIdent,
144 final Flow updatedFlow,
145 final InstanceIdentifier<Flow> identifier,
146 final BundleId bundleId) {
147 //TODO This read to the DS might have a performance impact.
148 //if the dependent group is not installed than we should just cache the parent group,
149 //till we receive the dependent group DTCN and then push it.
150 Uint32 groupId = isFlowDependentOnGroup(updatedFlow);
151 ListenableFuture<RpcResult<AddBundleMessagesOutput>> resultFuture;
152 if (groupId != null) {
153 LOG.trace("The flow {} is dependent on group {}. Checking if the group is already present",
154 getFlowId(identifier), groupId);
155 if (isGroupExistsOnDevice(nodeIdent, groupId, forwardingRulesManager)) {
156 LOG.trace("The dependent group {} is already programmed. Updating the flow {}", groupId,
157 getFlowId(identifier));
158 resultFuture = Futures.immediateFuture(RpcResultBuilder.<AddBundleMessagesOutput>success().build());
160 LOG.trace("The dependent group {} isn't programmed yet. Pushing the group", groupId);
161 InstanceIdentifier<Group> groupIdent = buildGroupInstanceIdentifier(nodeIdent, groupId);
162 LOG.info("Reading the group from config inventory: {}", groupId);
163 try (ReadTransaction readTransaction = forwardingRulesManager.getReadTransaction()) {
164 Optional<Group> optGroup = readTransaction.read(LogicalDatastoreType.CONFIGURATION, groupIdent)
166 if (optGroup.isPresent()) {
167 final Group group = optGroup.orElseThrow();
168 final AddGroupInputBuilder builder = new AddGroupInputBuilder(group);
169 builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
170 builder.setGroupRef(new GroupRef(nodeIdent));
171 builder.setTransactionUri(new Uri(forwardingRulesManager.getNewTransactionId()));
172 BundleInnerMessage bundleInnerMessage = new BundleAddGroupCaseBuilder()
173 .setAddGroupCaseData(new AddGroupCaseDataBuilder(group).build()).build();
174 Message groupMessage = new MessageBuilder()
175 .setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)))
176 .setBundleInnerMessage(bundleInnerMessage)
178 final List<Message> messages = new ArrayList<>(1);
179 messages.add(groupMessage);
180 AddBundleMessagesInput addBundleMessagesInput = new AddBundleMessagesInputBuilder()
181 .setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)))
182 .setBundleId(bundleId)
183 .setFlags(BUNDLE_FLAGS)
184 .setMessages(new MessagesBuilder()
185 .setMessage(messages).build())
187 LOG.trace("Pushing flow update message {} to bundle {} for device {}", addBundleMessagesInput,
188 bundleId.getValue(), getNodeIdValueFromNodeIdentifier(nodeIdent));
189 resultFuture = forwardingRulesManager.addBundleMessages().invoke(addBundleMessagesInput);
190 Futures.transformAsync(resultFuture, rpcResult -> {
191 if (rpcResult.isSuccessful()) {
192 forwardingRulesManager.getDevicesGroupRegistry()
193 .storeGroup(getNodeIdValueFromNodeIdentifier(nodeIdent), groupId);
194 LOG.trace("Group {} stored in cache", groupId);
196 return Futures.immediateFuture(null);
197 }, MoreExecutors.directExecutor());
199 LOG.debug("Group {} not present in the config inventory", groupId);
200 resultFuture = RpcResultBuilder.<AddBundleMessagesOutput>failed()
201 .withError(ErrorType.APPLICATION,
202 "Group " + groupId + " not present in the config inventory").buildFuture();
204 } catch (InterruptedException | ExecutionException e) {
205 LOG.error("Error while reading group from config datastore for the group ID {}", groupId, e);
206 resultFuture = RpcResultBuilder.<AddBundleMessagesOutput>failed()
207 .withError(ErrorType.APPLICATION,
208 "Group " + groupId + " not present in the config inventory").buildFuture();
212 resultFuture = RpcResultBuilder.<AddBundleMessagesOutput>success().buildFuture();
217 private final class BundleFlowCallBack implements FutureCallback<RpcResult<AddBundleMessagesOutput>> {
218 private final InstanceIdentifier<FlowCapableNode> nodeIdent;
219 private final BundleId bundleId;
220 private final Message messages;
221 private final String nodeId;
222 private final String flowId;
223 private final Uint8 tableId;
224 private final SettableFuture<RpcResult<AddBundleMessagesOutput>> resultFuture;
226 BundleFlowCallBack(final InstanceIdentifier<FlowCapableNode> nodeIdent, final BundleId bundleId,
227 final Message messages, final InstanceIdentifier<Flow> identifier,
228 final SettableFuture<RpcResult<AddBundleMessagesOutput>> resultFuture) {
229 this.nodeIdent = nodeIdent;
230 this.bundleId = bundleId;
231 this.messages = messages;
232 this.resultFuture = resultFuture;
233 flowId = getFlowId(identifier);
234 tableId = getTableId(identifier);
235 nodeId = getNodeIdValueFromNodeIdentifier(nodeIdent);
239 public void onSuccess(final RpcResult<AddBundleMessagesOutput> rpcResult) {
240 if (rpcResult.isSuccessful()) {
241 AddBundleMessagesInput addBundleMessagesInput = new AddBundleMessagesInputBuilder()
242 .setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)))
243 .setBundleId(bundleId)
244 .setFlags(BUNDLE_FLAGS)
245 .setMessages(new MessagesBuilder()
247 Collections.singletonList(messages)).build())
250 LOG.trace("Pushing flow add message {} to bundle {} for device {}", addBundleMessagesInput,
251 bundleId.getValue(), nodeId);
253 final var addFuture = forwardingRulesManager.addBundleMessages().invoke(addBundleMessagesInput);
254 Futures.addCallback(addFuture, new FutureCallback<RpcResult<AddBundleMessagesOutput>>() {
256 public void onSuccess(final RpcResult<AddBundleMessagesOutput> result) {
257 resultFuture.set(result);
258 if (!result.getErrors().isEmpty()) {
259 LOG.error("Flow add with flowId {} and tableId {} failed for node {} with error: {}",
260 flowId, tableId, nodeId, result.getErrors().toString());
265 public void onFailure(final Throwable failure) {
266 resultFuture.setException(failure);
268 }, MoreExecutors.directExecutor());
270 LOG.error("Error {} while pushing flow add bundle {} for device {}", rpcResult.getErrors(), messages,
272 resultFuture.set(rpcResult);
277 public void onFailure(final Throwable throwable) {
278 LOG.error("Error while pushing flow add bundle {} for device {}", messages, nodeId);
279 resultFuture.setException(throwable);