2 * Copyright (c) 2018 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.applications.frm.impl;
10 import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.buildGroupInstanceIdentifier;
11 import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.getFlowId;
12 import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.getNodeIdFromNodeIdentifier;
13 import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.isFlowDependentOnGroup;
14 import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.isGroupExistsOnDevice;
16 import com.google.common.base.Preconditions;
17 import com.google.common.util.concurrent.FutureCallback;
18 import com.google.common.util.concurrent.Futures;
19 import com.google.common.util.concurrent.ListenableFuture;
20 import com.google.common.util.concurrent.MoreExecutors;
21 import com.google.common.util.concurrent.SettableFuture;
22 import java.util.ArrayList;
23 import java.util.Arrays;
24 import java.util.Collections;
25 import java.util.List;
26 import java.util.Optional;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.Future;
29 import org.opendaylight.infrautils.utils.concurrent.JdkFutures;
30 import org.opendaylight.mdsal.binding.api.ReadTransaction;
31 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
32 import org.opendaylight.openflowplugin.applications.frm.ForwardingRulesManager;
33 import org.opendaylight.openflowplugin.applications.frm.NodeConfigurator;
34 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Uri;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.FlowRef;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupInputBuilder;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupRef;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesInput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesInputBuilder;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesOutput;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.MessagesBuilder;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.Message;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.MessageBuilder;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.BundleInnerMessage;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleAddFlowCaseBuilder;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleAddGroupCaseBuilder;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveFlowCaseBuilder;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleUpdateFlowCaseBuilder;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.add.flow._case.AddFlowCaseDataBuilder;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.add.group._case.AddGroupCaseDataBuilder;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.remove.flow._case.RemoveFlowCaseDataBuilder;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.update.flow._case.UpdateFlowCaseDataBuilder;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleFlags;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleId;
61 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
62 import org.opendaylight.yangtools.yang.common.RpcResult;
63 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
64 import org.slf4j.Logger;
65 import org.slf4j.LoggerFactory;
67 public class BundleFlowForwarder {
69 private static final Logger LOG = LoggerFactory.getLogger(BundleFlowForwarder.class);
70 private static final BundleFlags BUNDLE_FLAGS = new BundleFlags(true, true);
71 private final ForwardingRulesManager forwardingRulesManager;
72 private final NodeConfigurator nodeConfigurator;
74 public BundleFlowForwarder(ForwardingRulesManager forwardingRulesManager) {
75 this.forwardingRulesManager = Preconditions.checkNotNull(forwardingRulesManager,
76 "ForwardingRulesManager can not be null!");
77 this.nodeConfigurator = Preconditions.checkNotNull(forwardingRulesManager.getNodeConfigurator(),
78 "NodeConfigurator can not be null!");
81 public void remove(final InstanceIdentifier<Flow> identifier, final Flow flow,
82 final InstanceIdentifier<FlowCapableNode> nodeIdent, final BundleId bundleId) {
83 final List<Message> messages = new ArrayList<>(1);
84 String node = nodeIdent.firstKeyOf(Node.class).getId().getValue();
85 BundleInnerMessage bundleInnerMessage = new BundleRemoveFlowCaseBuilder()
86 .setRemoveFlowCaseData(new RemoveFlowCaseDataBuilder(flow).build()).build();
87 Message message = new MessageBuilder().setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)))
88 .setBundleInnerMessage(bundleInnerMessage).build();
89 messages.add(message);
90 AddBundleMessagesInput addBundleMessagesInput = new AddBundleMessagesInputBuilder()
91 .setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class))).setBundleId(bundleId)
92 .setFlags(BUNDLE_FLAGS).setMessages(new MessagesBuilder().setMessage(messages).build()).build();
93 final ListenableFuture<RpcResult<AddBundleMessagesOutput>> resultFuture = forwardingRulesManager
94 .getSalBundleService().addBundleMessages(addBundleMessagesInput);
95 LOG.trace("Pushing flow remove message {} to bundle {} for device {}", addBundleMessagesInput,
96 bundleId.getValue(), node);
97 JdkFutures.addErrorLogging(resultFuture, LOG, "removeBundleFlow");
100 public void update(final InstanceIdentifier<Flow> identifier, final Flow originalFlow, final Flow updatedFlow,
101 final InstanceIdentifier<FlowCapableNode> nodeIdent, final BundleId bundleId) {
102 final NodeId nodeId = getNodeIdFromNodeIdentifier(nodeIdent);
103 nodeConfigurator.enqueueJob(nodeId.getValue(), () -> {
104 BundleInnerMessage innerDeleteMessage = new BundleRemoveFlowCaseBuilder()
105 .setRemoveFlowCaseData(new RemoveFlowCaseDataBuilder(originalFlow).build()).build();
106 Message deleteMessage = new MessageBuilder().setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)))
107 .setBundleInnerMessage(innerDeleteMessage).build();
108 BundleInnerMessage innerUpdateMessage = new BundleUpdateFlowCaseBuilder()
109 .setUpdateFlowCaseData(new UpdateFlowCaseDataBuilder(updatedFlow).build()).build();
110 Message updateMessage = new MessageBuilder().setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)))
111 .setBundleInnerMessage(innerUpdateMessage).build();
112 ListenableFuture<RpcResult<AddBundleMessagesOutput>> groupFuture = pushDependentGroup(nodeIdent,
113 updatedFlow, identifier, bundleId);
114 List<Message> messages = Arrays.asList(deleteMessage, updateMessage);
115 SettableFuture<RpcResult<AddBundleMessagesOutput>> resultFuture = SettableFuture.create();
116 Futures.addCallback(groupFuture, new BundleFlowCallBack(nodeIdent, bundleId, messages, resultFuture),
117 MoreExecutors.directExecutor());
122 public Future<? extends RpcResult<?>> add(final InstanceIdentifier<Flow> identifier, final Flow flow,
123 final InstanceIdentifier<FlowCapableNode> nodeIdent, final BundleId bundleId) {
124 final NodeId nodeId = getNodeIdFromNodeIdentifier(nodeIdent);
125 return nodeConfigurator.enqueueJob(nodeId.getValue(), () -> {
126 BundleInnerMessage bundleInnerMessage = new BundleAddFlowCaseBuilder()
127 .setAddFlowCaseData(new AddFlowCaseDataBuilder(flow).build()).build();
128 Message message = new MessageBuilder().setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)))
129 .setBundleInnerMessage(bundleInnerMessage).build();
130 ListenableFuture<RpcResult<AddBundleMessagesOutput>> groupFuture = pushDependentGroup(nodeIdent, flow,
131 identifier, bundleId);
132 SettableFuture<RpcResult<AddBundleMessagesOutput>> resultFuture = SettableFuture.create();
133 Futures.addCallback(groupFuture, new BundleFlowCallBack(nodeIdent, bundleId,
134 Collections.singletonList(message), resultFuture),
135 MoreExecutors.directExecutor());
140 private ListenableFuture<RpcResult<AddBundleMessagesOutput>> pushDependentGroup(
141 final InstanceIdentifier<FlowCapableNode> nodeIdent, Flow updatedFlow, InstanceIdentifier<Flow> identifier,
143 //TODO This read to the DS might have a performance impact.
144 //if the dependent group is not installed than we should just cache the parent group,
145 //till we receive the dependent group DTCN and then push it.
146 Long groupId = isFlowDependentOnGroup(updatedFlow);
147 ListenableFuture<RpcResult<AddBundleMessagesOutput>> resultFuture;
148 if (groupId != null) {
149 LOG.trace("The flow {} is dependent on group {}. Checking if the group is already present",
150 getFlowId(new FlowRef(identifier)), groupId);
151 if (isGroupExistsOnDevice(nodeIdent, groupId, forwardingRulesManager)) {
152 LOG.trace("The dependent group {} is already programmed. Updating the flow {}", groupId,
153 getFlowId(new FlowRef(identifier)));
154 resultFuture = Futures.immediateFuture(RpcResultBuilder.<AddBundleMessagesOutput>success().build());
156 LOG.trace("The dependent group {} isn't programmed yet. Pushing the group", groupId);
157 InstanceIdentifier<Group> groupIdent = buildGroupInstanceIdentifier(nodeIdent, groupId);
158 LOG.info("Reading the group from config inventory: {}", groupId);
159 try (ReadTransaction readTransaction = forwardingRulesManager.getReadTransaction()) {
160 Optional<Group> group = readTransaction.read(LogicalDatastoreType.CONFIGURATION, groupIdent).get();
161 if (group.isPresent()) {
162 final AddGroupInputBuilder builder = new AddGroupInputBuilder(group.get());
163 builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
164 builder.setGroupRef(new GroupRef(nodeIdent));
165 builder.setTransactionUri(new Uri(forwardingRulesManager.getNewTransactionId()));
166 BundleInnerMessage bundleInnerMessage = new BundleAddGroupCaseBuilder()
167 .setAddGroupCaseData(new AddGroupCaseDataBuilder(group.get()).build()).build();
168 Message groupMessage = new MessageBuilder().setNode(
169 new NodeRef(nodeIdent.firstIdentifierOf(Node.class)))
170 .setBundleInnerMessage(bundleInnerMessage).build();
171 final List<Message> messages = new ArrayList<>(1);
172 messages.add(groupMessage);
173 AddBundleMessagesInput addBundleMessagesInput = new AddBundleMessagesInputBuilder()
174 .setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class))).setBundleId(bundleId)
175 .setFlags(BUNDLE_FLAGS).setMessages(new MessagesBuilder().setMessage(messages).build())
177 LOG.trace("Pushing flow update message {} to bundle {} for device {}", addBundleMessagesInput,
178 bundleId.getValue(), getNodeIdFromNodeIdentifier(nodeIdent));
179 resultFuture = forwardingRulesManager
180 .getSalBundleService().addBundleMessages(addBundleMessagesInput);
181 Futures.transformAsync(resultFuture, rpcResult -> {
182 if (rpcResult.isSuccessful()) {
183 forwardingRulesManager.getDevicesGroupRegistry()
184 .storeGroup(getNodeIdFromNodeIdentifier(nodeIdent), groupId);
185 LOG.trace("Group {} stored in cache", groupId);
187 return Futures.immediateFuture(null);
188 }, MoreExecutors.directExecutor());
190 LOG.debug("Group {} not present in the config inventory", groupId);
191 resultFuture = Futures.immediateFuture(RpcResultBuilder.<AddBundleMessagesOutput>success()
194 } catch (InterruptedException | ExecutionException e) {
195 LOG.error("Error while reading group from config datastore for the group ID {}", groupId, e);
196 resultFuture = Futures.immediateFuture(RpcResultBuilder.<AddBundleMessagesOutput>success().build());
200 resultFuture = Futures.immediateFuture(RpcResultBuilder.<AddBundleMessagesOutput>success().build());
205 private final class BundleFlowCallBack implements FutureCallback<RpcResult<AddBundleMessagesOutput>> {
206 private final InstanceIdentifier<FlowCapableNode> nodeIdent;
207 private final BundleId bundleId;
208 private final List<Message> messages;
209 private final NodeId nodeId;
210 private final SettableFuture<RpcResult<AddBundleMessagesOutput>> resultFuture;
212 BundleFlowCallBack(InstanceIdentifier<FlowCapableNode> nodeIdent, BundleId bundleId, List<Message> messages,
213 SettableFuture<RpcResult<AddBundleMessagesOutput>> resultFuture) {
214 this.nodeIdent = nodeIdent;
215 this.bundleId = bundleId;
216 this.messages = messages;
217 this.resultFuture = resultFuture;
218 nodeId = getNodeIdFromNodeIdentifier(nodeIdent);
222 public void onSuccess(RpcResult<AddBundleMessagesOutput> rpcResult) {
223 if (rpcResult.isSuccessful()) {
224 AddBundleMessagesInput addBundleMessagesInput = new AddBundleMessagesInputBuilder()
225 .setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class))).setBundleId(bundleId)
226 .setFlags(BUNDLE_FLAGS).setMessages(new MessagesBuilder().setMessage(messages).build()).build();
228 LOG.trace("Pushing flow add message {} to bundle {} for device {}", addBundleMessagesInput,
229 bundleId.getValue(), nodeId.getValue());
231 final ListenableFuture<RpcResult<AddBundleMessagesOutput>> addFuture =
232 forwardingRulesManager.getSalBundleService().addBundleMessages(addBundleMessagesInput);
233 Futures.addCallback(addFuture, new FutureCallback<RpcResult<AddBundleMessagesOutput>>() {
235 public void onSuccess(RpcResult<AddBundleMessagesOutput> result) {
236 resultFuture.set(result);
240 public void onFailure(Throwable failure) {
241 resultFuture.setException(failure);
243 }, MoreExecutors.directExecutor());
245 resultFuture.set(rpcResult);
250 public void onFailure(Throwable throwable) {
251 LOG.error("Error while pushing flow add bundle {} for device {}", messages, nodeId);
252 resultFuture.setException(throwable);