Bump MRI upstreams
[openflowplugin.git] / applications / forwardingrules-manager / src / main / java / org / opendaylight / openflowplugin / applications / frm / impl / BundleFlowForwarder.java
1 /*
2  * Copyright (c) 2018 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.applications.frm.impl;
9
10 import static java.util.Objects.requireNonNull;
11 import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.buildGroupInstanceIdentifier;
12 import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.getFlowId;
13 import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.getNodeIdValueFromNodeIdentifier;
14 import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.getTableId;
15 import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.isFlowDependentOnGroup;
16 import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.isGroupExistsOnDevice;
17
18 import com.google.common.util.concurrent.FutureCallback;
19 import com.google.common.util.concurrent.Futures;
20 import com.google.common.util.concurrent.ListenableFuture;
21 import com.google.common.util.concurrent.MoreExecutors;
22 import com.google.common.util.concurrent.SettableFuture;
23 import java.util.ArrayList;
24 import java.util.Collections;
25 import java.util.List;
26 import java.util.Optional;
27 import java.util.concurrent.ExecutionException;
28 import org.opendaylight.infrautils.utils.concurrent.LoggingFutures;
29 import org.opendaylight.mdsal.binding.api.ReadTransaction;
30 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
31 import org.opendaylight.openflowplugin.applications.frm.BundleMessagesCommiter;
32 import org.opendaylight.openflowplugin.applications.frm.ForwardingRulesManager;
33 import org.opendaylight.openflowplugin.applications.frm.NodeConfigurator;
34 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Uri;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupInputBuilder;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupRef;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesInput;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesInputBuilder;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesOutput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.MessagesBuilder;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.Message;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.MessageBuilder;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.BundleInnerMessage;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleAddFlowCaseBuilder;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleAddGroupCaseBuilder;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveFlowCaseBuilder;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.add.flow._case.AddFlowCaseDataBuilder;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.add.group._case.AddGroupCaseDataBuilder;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.remove.flow._case.RemoveFlowCaseDataBuilder;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleFlags;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleId;
57 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
58 import org.opendaylight.yangtools.yang.common.ErrorType;
59 import org.opendaylight.yangtools.yang.common.RpcResult;
60 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
61 import org.opendaylight.yangtools.yang.common.Uint32;
62 import org.opendaylight.yangtools.yang.common.Uint8;
63 import org.slf4j.Logger;
64 import org.slf4j.LoggerFactory;
65
66 public class BundleFlowForwarder implements BundleMessagesCommiter<Flow> {
67     private static final Logger LOG = LoggerFactory.getLogger(BundleFlowForwarder.class);
68     private static final BundleFlags BUNDLE_FLAGS = new BundleFlags(true, true);
69
70     private final ForwardingRulesManager forwardingRulesManager;
71     private final NodeConfigurator nodeConfigurator;
72
73     public BundleFlowForwarder(final ForwardingRulesManager forwardingRulesManager) {
74         this.forwardingRulesManager = requireNonNull(forwardingRulesManager, "ForwardingRulesManager can not be null!");
75         nodeConfigurator = requireNonNull(forwardingRulesManager.getNodeConfigurator(),
76                 "NodeConfigurator can not be null!");
77     }
78
79     @Override
80     public void remove(final InstanceIdentifier<Flow> identifier,
81                        final Flow flow,
82                        final InstanceIdentifier<FlowCapableNode> nodeIdent,
83                        final BundleId bundleId) {
84         final String nodeId = getNodeIdValueFromNodeIdentifier(nodeIdent);
85         nodeConfigurator.enqueueJob(nodeId, () -> {
86             final List<Message> messages = new ArrayList<>(1);
87             String node = nodeIdent.firstKeyOf(Node.class).getId().getValue();
88             BundleInnerMessage bundleInnerMessage = new BundleRemoveFlowCaseBuilder()
89                 .setRemoveFlowCaseData(new RemoveFlowCaseDataBuilder(flow).build()).build();
90             Message message = new MessageBuilder()
91                 .setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)))
92                 .setBundleInnerMessage(bundleInnerMessage)
93                 .build();
94             messages.add(message);
95             AddBundleMessagesInput addBundleMessagesInput = new AddBundleMessagesInputBuilder()
96                 .setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)))
97                 .setBundleId(bundleId)
98                 .setFlags(BUNDLE_FLAGS).setMessages(new MessagesBuilder()
99                         .setMessage(messages)
100                         .build())
101                 .build();
102             final ListenableFuture<RpcResult<AddBundleMessagesOutput>> resultFuture = forwardingRulesManager
103                 .getSalBundleService().addBundleMessages(addBundleMessagesInput);
104             LOG.trace("Pushing flow remove message {} to bundle {} for device {}", addBundleMessagesInput,
105                 bundleId.getValue(), node);
106             LoggingFutures.addErrorLogging(resultFuture, LOG, "removeBundleFlow");
107             return resultFuture;
108         });
109     }
110
111     @Override
112     public void update(final InstanceIdentifier<Flow> identifier,
113                        final Flow originalFlow,
114                        final Flow updatedFlow,
115                        final InstanceIdentifier<FlowCapableNode> nodeIdent,
116                        final BundleId bundleId) {
117         remove(identifier, originalFlow, nodeIdent, bundleId);
118         add(identifier, updatedFlow, nodeIdent, bundleId);
119     }
120
121     @Override
122     public ListenableFuture<RpcResult<AddBundleMessagesOutput>> add(final InstanceIdentifier<Flow> identifier,
123                                                                     final Flow flow,
124                                                                     final InstanceIdentifier<FlowCapableNode> nodeIdent,
125                                                                     final BundleId bundleId) {
126         final String nodeId = getNodeIdValueFromNodeIdentifier(nodeIdent);
127         return nodeConfigurator.enqueueJob(nodeId, () -> {
128             BundleInnerMessage bundleInnerMessage = new BundleAddFlowCaseBuilder()
129                     .setAddFlowCaseData(new AddFlowCaseDataBuilder(flow)
130                             .build())
131                     .build();
132             Message message = new MessageBuilder().setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)))
133                     .setBundleInnerMessage(bundleInnerMessage)
134                     .build();
135             ListenableFuture<RpcResult<AddBundleMessagesOutput>> groupFuture = pushDependentGroup(nodeIdent, flow,
136                     identifier, bundleId);
137             SettableFuture<RpcResult<AddBundleMessagesOutput>> resultFuture = SettableFuture.create();
138             Futures.addCallback(groupFuture,
139                     new BundleFlowCallBack(nodeIdent, bundleId, message, identifier, resultFuture),
140                     MoreExecutors.directExecutor());
141             return resultFuture;
142         });
143     }
144
145     private ListenableFuture<RpcResult<AddBundleMessagesOutput>> pushDependentGroup(
146             final InstanceIdentifier<FlowCapableNode> nodeIdent,
147             final Flow updatedFlow,
148             final InstanceIdentifier<Flow> identifier,
149             final BundleId bundleId) {
150         //TODO This read to the DS might have a performance impact.
151         //if the dependent group is not installed than we should just cache the parent group,
152         //till we receive the dependent group DTCN and then push it.
153         Uint32 groupId = isFlowDependentOnGroup(updatedFlow);
154         ListenableFuture<RpcResult<AddBundleMessagesOutput>> resultFuture;
155         if (groupId != null) {
156             LOG.trace("The flow {} is dependent on group {}. Checking if the group is already present",
157                     getFlowId(identifier), groupId);
158             if (isGroupExistsOnDevice(nodeIdent, groupId, forwardingRulesManager)) {
159                 LOG.trace("The dependent group {} is already programmed. Updating the flow {}", groupId,
160                         getFlowId(identifier));
161                 resultFuture = Futures.immediateFuture(RpcResultBuilder.<AddBundleMessagesOutput>success().build());
162             } else {
163                 LOG.trace("The dependent group {} isn't programmed yet. Pushing the group", groupId);
164                 InstanceIdentifier<Group> groupIdent = buildGroupInstanceIdentifier(nodeIdent, groupId);
165                 LOG.info("Reading the group from config inventory: {}", groupId);
166                 try (ReadTransaction readTransaction = forwardingRulesManager.getReadTransaction()) {
167                     Optional<Group> group = readTransaction.read(LogicalDatastoreType.CONFIGURATION, groupIdent).get();
168                     if (group.isPresent()) {
169                         final AddGroupInputBuilder builder = new AddGroupInputBuilder(group.get());
170                         builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
171                         builder.setGroupRef(new GroupRef(nodeIdent));
172                         builder.setTransactionUri(new Uri(forwardingRulesManager.getNewTransactionId()));
173                         BundleInnerMessage bundleInnerMessage = new BundleAddGroupCaseBuilder()
174                                 .setAddGroupCaseData(new AddGroupCaseDataBuilder(group.get()).build()).build();
175                         Message groupMessage = new MessageBuilder()
176                                 .setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)))
177                                 .setBundleInnerMessage(bundleInnerMessage)
178                                 .build();
179                         final List<Message> messages = new ArrayList<>(1);
180                         messages.add(groupMessage);
181                         AddBundleMessagesInput addBundleMessagesInput = new AddBundleMessagesInputBuilder()
182                                 .setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)))
183                                 .setBundleId(bundleId)
184                                 .setFlags(BUNDLE_FLAGS)
185                                 .setMessages(new MessagesBuilder()
186                                         .setMessage(messages).build())
187                                 .build();
188                         LOG.trace("Pushing flow update message {} to bundle {} for device {}", addBundleMessagesInput,
189                                 bundleId.getValue(), getNodeIdValueFromNodeIdentifier(nodeIdent));
190                         resultFuture = forwardingRulesManager
191                                 .getSalBundleService()
192                                 .addBundleMessages(addBundleMessagesInput);
193                         Futures.transformAsync(resultFuture, rpcResult -> {
194                             if (rpcResult.isSuccessful()) {
195                                 forwardingRulesManager.getDevicesGroupRegistry()
196                                         .storeGroup(getNodeIdValueFromNodeIdentifier(nodeIdent), groupId);
197                                 LOG.trace("Group {} stored in cache", groupId);
198                             }
199                             return Futures.immediateFuture(null);
200                         }, MoreExecutors.directExecutor());
201                     } else {
202                         LOG.debug("Group {} not present in the config inventory", groupId);
203                         resultFuture = RpcResultBuilder.<AddBundleMessagesOutput>failed()
204                                 .withError(ErrorType.APPLICATION,
205                                         "Group " + groupId + " not present in the config inventory").buildFuture();
206                     }
207                 } catch (InterruptedException | ExecutionException e) {
208                     LOG.error("Error while reading group from config datastore for the group ID {}", groupId, e);
209                     resultFuture = RpcResultBuilder.<AddBundleMessagesOutput>failed()
210                             .withError(ErrorType.APPLICATION,
211                                     "Group " + groupId + " not present in the config inventory").buildFuture();
212                 }
213             }
214         } else {
215             resultFuture = RpcResultBuilder.<AddBundleMessagesOutput>success().buildFuture();
216         }
217         return resultFuture;
218     }
219
220     private final class BundleFlowCallBack implements FutureCallback<RpcResult<AddBundleMessagesOutput>> {
221         private final InstanceIdentifier<FlowCapableNode> nodeIdent;
222         private final BundleId bundleId;
223         private final Message messages;
224         private final String nodeId;
225         private final String flowId;
226         private final Uint8 tableId;
227         private final SettableFuture<RpcResult<AddBundleMessagesOutput>> resultFuture;
228
229         BundleFlowCallBack(final InstanceIdentifier<FlowCapableNode> nodeIdent, final BundleId bundleId,
230                 final Message messages, final InstanceIdentifier<Flow> identifier,
231                 final SettableFuture<RpcResult<AddBundleMessagesOutput>> resultFuture) {
232             this.nodeIdent = nodeIdent;
233             this.bundleId = bundleId;
234             this.messages = messages;
235             this.resultFuture = resultFuture;
236             flowId = getFlowId(identifier);
237             tableId = getTableId(identifier);
238             nodeId = getNodeIdValueFromNodeIdentifier(nodeIdent);
239         }
240
241         @Override
242         public void onSuccess(final RpcResult<AddBundleMessagesOutput> rpcResult) {
243             if (rpcResult.isSuccessful()) {
244                 AddBundleMessagesInput addBundleMessagesInput = new AddBundleMessagesInputBuilder()
245                         .setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)))
246                         .setBundleId(bundleId)
247                         .setFlags(BUNDLE_FLAGS)
248                         .setMessages(new MessagesBuilder()
249                                 .setMessage(
250                                         Collections.singletonList(messages)).build())
251                         .build();
252
253                 LOG.trace("Pushing flow add message {} to bundle {} for device {}", addBundleMessagesInput,
254                         bundleId.getValue(), nodeId);
255
256                 final ListenableFuture<RpcResult<AddBundleMessagesOutput>> addFuture =
257                         forwardingRulesManager.getSalBundleService().addBundleMessages(addBundleMessagesInput);
258                 Futures.addCallback(addFuture, new FutureCallback<RpcResult<AddBundleMessagesOutput>>() {
259                     @Override
260                     public void onSuccess(final RpcResult<AddBundleMessagesOutput> result) {
261                         resultFuture.set(result);
262                         if (!result.getErrors().isEmpty()) {
263                             LOG.error("Flow add with flowId {} and tableId {} failed for node {} with error: {}",
264                                     flowId, tableId, nodeId, result.getErrors().toString());
265                         }
266
267                     }
268
269                     @Override
270                     public void onFailure(final Throwable failure) {
271                         resultFuture.setException(failure);
272                     }
273                 },  MoreExecutors.directExecutor());
274             } else {
275                 LOG.error("Error {} while pushing flow add bundle {} for device {}", rpcResult.getErrors(), messages,
276                         nodeId);
277                 resultFuture.set(rpcResult);
278             }
279         }
280
281         @Override
282         public void onFailure(final Throwable throwable) {
283             LOG.error("Error while pushing flow add bundle {} for device {}", messages, nodeId);
284             resultFuture.setException(throwable);
285         }
286     }
287 }