Merge "fix incorrect Future usage in (Bundle)FlowForwarder"
[openflowplugin.git] / applications / forwardingrules-manager / src / main / java / org / opendaylight / openflowplugin / applications / frm / impl / BundleFlowForwarder.java
1 /*
2  * Copyright (c) 2018 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.applications.frm.impl;
10
11 import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.buildGroupInstanceIdentifier;
12 import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.getFlowId;
13 import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.getNodeIdFromNodeIdentifier;
14 import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.isFlowDependentOnGroup;
15 import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.isGroupExistsOnDevice;
16
17 import com.google.common.base.Optional;
18 import com.google.common.base.Preconditions;
19 import com.google.common.util.concurrent.FutureCallback;
20 import com.google.common.util.concurrent.Futures;
21 import com.google.common.util.concurrent.ListenableFuture;
22 import com.google.common.util.concurrent.MoreExecutors;
23 import com.google.common.util.concurrent.SettableFuture;
24 import java.util.ArrayList;
25 import java.util.Collections;
26 import java.util.List;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.Future;
29 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
30 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
31 import org.opendaylight.infrautils.utils.concurrent.JdkFutures;
32 import org.opendaylight.openflowplugin.applications.frm.ForwardingRulesManager;
33 import org.opendaylight.openflowplugin.applications.frm.NodeConfigurator;
34 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Uri;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.FlowRef;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupInputBuilder;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupRef;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesInput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesInputBuilder;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesOutput;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.MessagesBuilder;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.Message;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.MessageBuilder;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.BundleInnerMessage;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleAddFlowCaseBuilder;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleAddGroupCaseBuilder;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveFlowCaseBuilder;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleUpdateFlowCaseBuilder;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.add.flow._case.AddFlowCaseDataBuilder;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.add.group._case.AddGroupCaseDataBuilder;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.remove.flow._case.RemoveFlowCaseDataBuilder;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.update.flow._case.UpdateFlowCaseDataBuilder;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleFlags;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleId;
61 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
62 import org.opendaylight.yangtools.yang.common.RpcResult;
63 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
64 import org.slf4j.Logger;
65 import org.slf4j.LoggerFactory;
66
67 public class BundleFlowForwarder {
68
69     private static final Logger LOG = LoggerFactory.getLogger(BundleFlowForwarder.class);
70     private static final BundleFlags BUNDLE_FLAGS = new BundleFlags(true, true);
71     private final ForwardingRulesManager forwardingRulesManager;
72     private final NodeConfigurator nodeConfigurator;
73
74     public BundleFlowForwarder(ForwardingRulesManager forwardingRulesManager) {
75         this.forwardingRulesManager = Preconditions.checkNotNull(forwardingRulesManager,
76                 "ForwardingRulesManager can not be null!");
77         this.nodeConfigurator = Preconditions.checkNotNull(forwardingRulesManager.getNodeConfigurator(),
78                 "NodeConfigurator can not be null!");
79     }
80
81     public void remove(final InstanceIdentifier<Flow> identifier, final Flow flow,
82             final InstanceIdentifier<FlowCapableNode> nodeIdent, final BundleId bundleId) {
83         final List<Message> messages = new ArrayList<>(1);
84         String node = nodeIdent.firstKeyOf(Node.class).getId().getValue();
85         BundleInnerMessage bundleInnerMessage = new BundleRemoveFlowCaseBuilder()
86                 .setRemoveFlowCaseData(new RemoveFlowCaseDataBuilder(flow).build()).build();
87         Message message = new MessageBuilder().setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)))
88                 .setBundleInnerMessage(bundleInnerMessage).build();
89         messages.add(message);
90         AddBundleMessagesInput addBundleMessagesInput = new AddBundleMessagesInputBuilder()
91                 .setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class))).setBundleId(bundleId)
92                 .setFlags(BUNDLE_FLAGS).setMessages(new MessagesBuilder().setMessage(messages).build()).build();
93         final ListenableFuture<RpcResult<AddBundleMessagesOutput>> resultFuture = forwardingRulesManager
94                 .getSalBundleService().addBundleMessages(addBundleMessagesInput);
95         LOG.trace("Pushing flow remove message {} to bundle {} for device {}", addBundleMessagesInput,
96                 bundleId.getValue(), node);
97         JdkFutures.addErrorLogging(resultFuture, LOG, "removeBundleFlow");
98     }
99
100     public void update(final InstanceIdentifier<Flow> identifier, final Flow originalFlow, final Flow updatedFlow,
101             final InstanceIdentifier<FlowCapableNode> nodeIdent, final BundleId bundleId) {
102         final NodeId nodeId = getNodeIdFromNodeIdentifier(nodeIdent);
103         nodeConfigurator.enqueueJob(nodeId.getValue(), () -> {
104             BundleInnerMessage bundleInnerMessage = new BundleUpdateFlowCaseBuilder()
105                     .setUpdateFlowCaseData(new UpdateFlowCaseDataBuilder(updatedFlow).build()).build();
106             Message message = new MessageBuilder().setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)))
107                     .setBundleInnerMessage(bundleInnerMessage).build();
108             ListenableFuture<RpcResult<AddBundleMessagesOutput>> groupFuture = pushDependentGroup(nodeIdent,
109                     updatedFlow, identifier, bundleId);
110             SettableFuture<RpcResult<AddBundleMessagesOutput>> resultFuture = SettableFuture.create();
111             Futures.addCallback(groupFuture, new BundleFlowCallBack(nodeIdent, bundleId, message, resultFuture),
112                     MoreExecutors.directExecutor());
113             return resultFuture;
114         });
115     }
116
117     public Future<? extends RpcResult<?>> add(final InstanceIdentifier<Flow> identifier, final Flow flow,
118             final InstanceIdentifier<FlowCapableNode> nodeIdent, final BundleId bundleId) {
119         final NodeId nodeId = getNodeIdFromNodeIdentifier(nodeIdent);
120         return nodeConfigurator.enqueueJob(nodeId.getValue(), () -> {
121             BundleInnerMessage bundleInnerMessage = new BundleAddFlowCaseBuilder()
122                     .setAddFlowCaseData(new AddFlowCaseDataBuilder(flow).build()).build();
123             Message message = new MessageBuilder().setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)))
124                     .setBundleInnerMessage(bundleInnerMessage).build();
125             ListenableFuture<RpcResult<AddBundleMessagesOutput>> groupFuture = pushDependentGroup(nodeIdent, flow,
126                     identifier, bundleId);
127             SettableFuture<RpcResult<AddBundleMessagesOutput>> resultFuture = SettableFuture.create();
128             Futures.addCallback(groupFuture, new BundleFlowCallBack(nodeIdent, bundleId, message, resultFuture),
129                     MoreExecutors.directExecutor());
130             return resultFuture;
131         });
132     }
133
134     private ListenableFuture<RpcResult<AddBundleMessagesOutput>> pushDependentGroup(
135             final InstanceIdentifier<FlowCapableNode> nodeIdent, Flow updatedFlow, InstanceIdentifier<Flow> identifier,
136             BundleId bundleId) {
137         //TODO This read to the DS might have a performance impact.
138         //if the dependent group is not installed than we should just cache the parent group,
139         //till we receive the dependent group DTCN and then push it.
140         Long groupId = isFlowDependentOnGroup(updatedFlow);
141         ListenableFuture<RpcResult<AddBundleMessagesOutput>> resultFuture;
142         if (groupId != null) {
143             LOG.trace("The flow {} is dependent on group {}. Checking if the group is already present",
144                     getFlowId(new FlowRef(identifier)), groupId);
145             if (isGroupExistsOnDevice(nodeIdent, groupId, forwardingRulesManager)) {
146                 LOG.trace("The dependent group {} is already programmed. Updating the flow {}", groupId,
147                         getFlowId(new FlowRef(identifier)));
148                 resultFuture = Futures.immediateFuture(RpcResultBuilder.<AddBundleMessagesOutput>success().build());
149             } else {
150                 LOG.trace("The dependent group {} isn't programmed yet. Pushing the group", groupId);
151                 InstanceIdentifier<Group> groupIdent = buildGroupInstanceIdentifier(nodeIdent, groupId);
152                 LOG.info("Reading the group from config inventory: {}", groupId);
153                 try (ReadOnlyTransaction readTransaction = forwardingRulesManager.getReadTransaction()) {
154                     Optional<Group> group = readTransaction
155                             .read(LogicalDatastoreType.CONFIGURATION, groupIdent).get();
156                     if (group.isPresent()) {
157                         final AddGroupInputBuilder builder = new AddGroupInputBuilder(group.get());
158                         builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
159                         builder.setGroupRef(new GroupRef(nodeIdent));
160                         builder.setTransactionUri(new Uri(forwardingRulesManager.getNewTransactionId()));
161                         BundleInnerMessage bundleInnerMessage = new BundleAddGroupCaseBuilder()
162                                 .setAddGroupCaseData(new AddGroupCaseDataBuilder(group.get()).build()).build();
163                         Message groupMessage = new MessageBuilder().setNode(
164                                 new NodeRef(nodeIdent.firstIdentifierOf(Node.class)))
165                                 .setBundleInnerMessage(bundleInnerMessage).build();
166                         final List<Message> messages = new ArrayList<>(1);
167                         messages.add(groupMessage);
168                         AddBundleMessagesInput addBundleMessagesInput = new AddBundleMessagesInputBuilder()
169                                 .setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class))).setBundleId(bundleId)
170                                 .setFlags(BUNDLE_FLAGS).setMessages(new MessagesBuilder().setMessage(messages).build())
171                                 .build();
172                         LOG.trace("Pushing flow update message {} to bundle {} for device {}", addBundleMessagesInput,
173                                 bundleId.getValue(), getNodeIdFromNodeIdentifier(nodeIdent));
174                         resultFuture = forwardingRulesManager
175                                 .getSalBundleService().addBundleMessages(addBundleMessagesInput);
176                         Futures.transformAsync(resultFuture, rpcResult -> {
177                             if (rpcResult.isSuccessful()) {
178                                 forwardingRulesManager.getDevicesGroupRegistry()
179                                         .storeGroup(getNodeIdFromNodeIdentifier(nodeIdent), groupId);
180                                 LOG.trace("Group {} stored in cache", groupId);
181                             }
182                             return Futures.immediateFuture(null);
183                         }, MoreExecutors.directExecutor());
184                     } else {
185                         LOG.debug("Group {} not present in the config inventory", groupId);
186                         resultFuture = Futures.immediateFuture(RpcResultBuilder.<AddBundleMessagesOutput>success()
187                                 .build());
188                     }
189                 } catch (InterruptedException | ExecutionException e) {
190                     LOG.error("Error while reading group from config datastore for the group ID {}", groupId, e);
191                     resultFuture = Futures.immediateFuture(RpcResultBuilder.<AddBundleMessagesOutput>success().build());
192                 }
193             }
194         } else {
195             resultFuture = Futures.immediateFuture(RpcResultBuilder.<AddBundleMessagesOutput>success().build());
196         }
197         return resultFuture;
198     }
199
200     private final class BundleFlowCallBack implements FutureCallback<RpcResult<AddBundleMessagesOutput>> {
201         private final InstanceIdentifier<FlowCapableNode> nodeIdent;
202         private final BundleId bundleId;
203         private final Message message;
204         private final NodeId nodeId;
205         private final SettableFuture<RpcResult<AddBundleMessagesOutput>> resultFuture;
206
207         BundleFlowCallBack(InstanceIdentifier<FlowCapableNode> nodeIdent, BundleId bundleId, Message message,
208                 SettableFuture<RpcResult<AddBundleMessagesOutput>> resultFuture) {
209             this.nodeIdent = nodeIdent;
210             this.bundleId = bundleId;
211             this.message = message;
212             this.resultFuture = resultFuture;
213             nodeId = getNodeIdFromNodeIdentifier(nodeIdent);
214         }
215
216         @Override
217         public void onSuccess(RpcResult<AddBundleMessagesOutput> rpcResult) {
218             if (rpcResult.isSuccessful()) {
219                 AddBundleMessagesInput addBundleMessagesInput = new AddBundleMessagesInputBuilder()
220                         .setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class))).setBundleId(bundleId)
221                         .setFlags(BUNDLE_FLAGS).setMessages(new MessagesBuilder().setMessage(
222                                 Collections.singletonList(message)).build()).build();
223
224                 LOG.trace("Pushing flow add message {} to bundle {} for device {}", addBundleMessagesInput,
225                         bundleId.getValue(), nodeId.getValue());
226
227                 final ListenableFuture<RpcResult<AddBundleMessagesOutput>> addFuture =
228                         forwardingRulesManager.getSalBundleService().addBundleMessages(addBundleMessagesInput);
229                 Futures.addCallback(addFuture, new FutureCallback<RpcResult<AddBundleMessagesOutput>>() {
230                     @Override
231                     public void onSuccess(RpcResult<AddBundleMessagesOutput> result) {
232                         resultFuture.set(result);
233                     }
234
235                     @Override
236                     public void onFailure(Throwable failure) {
237                         resultFuture.setException(failure);
238                     }
239                 },  MoreExecutors.directExecutor());
240             } else {
241                 resultFuture.set(rpcResult);
242             }
243         }
244
245         @Override
246         public void onFailure(Throwable throwable) {
247             LOG.error("Error while pushing flow add bundle {} for device {}", message, nodeId);
248         }
249     }
250 }