Failed to cancel service reconciliation, When controller become slave.
[openflowplugin.git] / applications / forwardingrules-manager / src / main / java / org / opendaylight / openflowplugin / applications / frm / impl / BundleFlowForwarder.java
1 /*
2  * Copyright (c) 2018 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.applications.frm.impl;
9
10 import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.buildGroupInstanceIdentifier;
11 import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.getFlowId;
12 import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.getNodeIdFromNodeIdentifier;
13 import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.getTableId;
14 import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.isFlowDependentOnGroup;
15 import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.isGroupExistsOnDevice;
16
17 import com.google.common.base.Preconditions;
18 import com.google.common.util.concurrent.FutureCallback;
19 import com.google.common.util.concurrent.Futures;
20 import com.google.common.util.concurrent.ListenableFuture;
21 import com.google.common.util.concurrent.MoreExecutors;
22 import com.google.common.util.concurrent.SettableFuture;
23 import java.util.ArrayList;
24 import java.util.Collections;
25 import java.util.List;
26 import java.util.Optional;
27 import java.util.concurrent.ExecutionException;
28 import org.opendaylight.infrautils.utils.concurrent.LoggingFutures;
29 import org.opendaylight.mdsal.binding.api.ReadTransaction;
30 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
31 import org.opendaylight.openflowplugin.applications.frm.BundleMessagesCommiter;
32 import org.opendaylight.openflowplugin.applications.frm.ForwardingRulesManager;
33 import org.opendaylight.openflowplugin.applications.frm.NodeConfigurator;
34 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Uri;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowTableRef;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.FlowRef;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupInputBuilder;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupRef;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesInput;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesInputBuilder;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesOutput;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.MessagesBuilder;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.Message;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.MessageBuilder;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.BundleInnerMessage;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleAddFlowCaseBuilder;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleAddGroupCaseBuilder;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveFlowCaseBuilder;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.add.flow._case.AddFlowCaseDataBuilder;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.add.group._case.AddGroupCaseDataBuilder;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.remove.flow._case.RemoveFlowCaseDataBuilder;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleFlags;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleId;
60 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
61 import org.opendaylight.yangtools.yang.common.RpcError;
62 import org.opendaylight.yangtools.yang.common.RpcResult;
63 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
64 import org.opendaylight.yangtools.yang.common.Uint32;
65 import org.opendaylight.yangtools.yang.common.Uint8;
66 import org.slf4j.Logger;
67 import org.slf4j.LoggerFactory;
68
69 public class BundleFlowForwarder implements BundleMessagesCommiter<Flow> {
70
71     private static final Logger LOG = LoggerFactory.getLogger(BundleFlowForwarder.class);
72     private static final BundleFlags BUNDLE_FLAGS = new BundleFlags(true, true);
73     private final ForwardingRulesManager forwardingRulesManager;
74     private final NodeConfigurator nodeConfigurator;
75
76     public BundleFlowForwarder(ForwardingRulesManager forwardingRulesManager) {
77         this.forwardingRulesManager = Preconditions.checkNotNull(forwardingRulesManager,
78                 "ForwardingRulesManager can not be null!");
79         this.nodeConfigurator = Preconditions.checkNotNull(forwardingRulesManager.getNodeConfigurator(),
80                 "NodeConfigurator can not be null!");
81     }
82
83     @Override
84     public void remove(final InstanceIdentifier<Flow> identifier, final Flow flow,
85             final InstanceIdentifier<FlowCapableNode> nodeIdent, final BundleId bundleId) {
86         final NodeId nodeId = getNodeIdFromNodeIdentifier(nodeIdent);
87         nodeConfigurator.enqueueJob(nodeId.getValue(), () -> {
88             final List<Message> messages = new ArrayList<>(1);
89             String node = nodeIdent.firstKeyOf(Node.class).getId().getValue();
90             BundleInnerMessage bundleInnerMessage = new BundleRemoveFlowCaseBuilder()
91                 .setRemoveFlowCaseData(new RemoveFlowCaseDataBuilder(flow).build()).build();
92             Message message = new MessageBuilder().setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)))
93                 .setBundleInnerMessage(bundleInnerMessage).build();
94             messages.add(message);
95             AddBundleMessagesInput addBundleMessagesInput = new AddBundleMessagesInputBuilder()
96                 .setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class))).setBundleId(bundleId)
97                 .setFlags(BUNDLE_FLAGS).setMessages(new MessagesBuilder().setMessage(messages).build()).build();
98             final ListenableFuture<RpcResult<AddBundleMessagesOutput>> resultFuture = forwardingRulesManager
99                 .getSalBundleService().addBundleMessages(addBundleMessagesInput);
100             LOG.trace("Pushing flow remove message {} to bundle {} for device {}", addBundleMessagesInput,
101                 bundleId.getValue(), node);
102             LoggingFutures.addErrorLogging(resultFuture, LOG, "removeBundleFlow");
103             return resultFuture;
104         });
105     }
106
107     @Override
108     public void update(final InstanceIdentifier<Flow> identifier, final Flow originalFlow, final Flow updatedFlow,
109             final InstanceIdentifier<FlowCapableNode> nodeIdent, final BundleId bundleId) {
110         remove(identifier, originalFlow, nodeIdent, bundleId);
111         add(identifier, updatedFlow, nodeIdent, bundleId);
112     }
113
114     @Override
115     public ListenableFuture<RpcResult<AddBundleMessagesOutput>> add(final InstanceIdentifier<Flow> identifier,
116                                                                     final Flow flow,
117             final InstanceIdentifier<FlowCapableNode> nodeIdent, final BundleId bundleId) {
118         final NodeId nodeId = getNodeIdFromNodeIdentifier(nodeIdent);
119         return nodeConfigurator.enqueueJob(nodeId.getValue(), () -> {
120             BundleInnerMessage bundleInnerMessage = new BundleAddFlowCaseBuilder()
121                     .setAddFlowCaseData(new AddFlowCaseDataBuilder(flow).build()).build();
122             Message message = new MessageBuilder().setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)))
123                     .setBundleInnerMessage(bundleInnerMessage).build();
124             ListenableFuture<RpcResult<AddBundleMessagesOutput>> groupFuture = pushDependentGroup(nodeIdent, flow,
125                     identifier, bundleId);
126             SettableFuture<RpcResult<AddBundleMessagesOutput>> resultFuture = SettableFuture.create();
127             Futures.addCallback(groupFuture, new BundleFlowCallBack(nodeIdent, bundleId, message, identifier,
128                     resultFuture), MoreExecutors.directExecutor());
129             return resultFuture;
130         });
131     }
132
133     private ListenableFuture<RpcResult<AddBundleMessagesOutput>> pushDependentGroup(
134             final InstanceIdentifier<FlowCapableNode> nodeIdent, Flow updatedFlow, InstanceIdentifier<Flow> identifier,
135             BundleId bundleId) {
136         //TODO This read to the DS might have a performance impact.
137         //if the dependent group is not installed than we should just cache the parent group,
138         //till we receive the dependent group DTCN and then push it.
139         Uint32 groupId = isFlowDependentOnGroup(updatedFlow);
140         ListenableFuture<RpcResult<AddBundleMessagesOutput>> resultFuture;
141         if (groupId != null) {
142             LOG.trace("The flow {} is dependent on group {}. Checking if the group is already present",
143                     getFlowId(new FlowRef(identifier)), groupId);
144             if (isGroupExistsOnDevice(nodeIdent, groupId, forwardingRulesManager)) {
145                 LOG.trace("The dependent group {} is already programmed. Updating the flow {}", groupId,
146                         getFlowId(new FlowRef(identifier)));
147                 resultFuture = Futures.immediateFuture(RpcResultBuilder.<AddBundleMessagesOutput>success().build());
148             } else {
149                 LOG.trace("The dependent group {} isn't programmed yet. Pushing the group", groupId);
150                 InstanceIdentifier<Group> groupIdent = buildGroupInstanceIdentifier(nodeIdent, groupId);
151                 LOG.info("Reading the group from config inventory: {}", groupId);
152                 try (ReadTransaction readTransaction = forwardingRulesManager.getReadTransaction()) {
153                     Optional<Group> group = readTransaction.read(LogicalDatastoreType.CONFIGURATION, groupIdent).get();
154                     if (group.isPresent()) {
155                         final AddGroupInputBuilder builder = new AddGroupInputBuilder(group.get());
156                         builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
157                         builder.setGroupRef(new GroupRef(nodeIdent));
158                         builder.setTransactionUri(new Uri(forwardingRulesManager.getNewTransactionId()));
159                         BundleInnerMessage bundleInnerMessage = new BundleAddGroupCaseBuilder()
160                                 .setAddGroupCaseData(new AddGroupCaseDataBuilder(group.get()).build()).build();
161                         Message groupMessage = new MessageBuilder().setNode(
162                                 new NodeRef(nodeIdent.firstIdentifierOf(Node.class)))
163                                 .setBundleInnerMessage(bundleInnerMessage).build();
164                         final List<Message> messages = new ArrayList<>(1);
165                         messages.add(groupMessage);
166                         AddBundleMessagesInput addBundleMessagesInput = new AddBundleMessagesInputBuilder()
167                                 .setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class))).setBundleId(bundleId)
168                                 .setFlags(BUNDLE_FLAGS).setMessages(new MessagesBuilder().setMessage(messages).build())
169                                 .build();
170                         LOG.trace("Pushing flow update message {} to bundle {} for device {}", addBundleMessagesInput,
171                                 bundleId.getValue(), getNodeIdFromNodeIdentifier(nodeIdent));
172                         resultFuture = forwardingRulesManager
173                                 .getSalBundleService().addBundleMessages(addBundleMessagesInput);
174                         Futures.transformAsync(resultFuture, rpcResult -> {
175                             if (rpcResult.isSuccessful()) {
176                                 forwardingRulesManager.getDevicesGroupRegistry()
177                                         .storeGroup(getNodeIdFromNodeIdentifier(nodeIdent), groupId);
178                                 LOG.trace("Group {} stored in cache", groupId);
179                             }
180                             return Futures.immediateFuture(null);
181                         }, MoreExecutors.directExecutor());
182                     } else {
183                         LOG.debug("Group {} not present in the config inventory", groupId);
184                         resultFuture = Futures.immediateFuture(RpcResultBuilder.<AddBundleMessagesOutput>failed()
185                                 .withError(RpcError.ErrorType.APPLICATION,
186                                         "Group " + groupId + " not present in the config inventory").build());
187                     }
188                 } catch (InterruptedException | ExecutionException e) {
189                     LOG.error("Error while reading group from config datastore for the group ID {}", groupId, e);
190                     resultFuture = Futures.immediateFuture(RpcResultBuilder.<AddBundleMessagesOutput>failed()
191                             .withError(RpcError.ErrorType.APPLICATION,
192                                     "Group " + groupId + " not present in the config inventory").build());
193                 }
194             }
195         } else {
196             resultFuture = Futures.immediateFuture(RpcResultBuilder.<AddBundleMessagesOutput>success().build());
197         }
198         return resultFuture;
199     }
200
201     private final class BundleFlowCallBack implements FutureCallback<RpcResult<AddBundleMessagesOutput>> {
202         private final InstanceIdentifier<FlowCapableNode> nodeIdent;
203         private final BundleId bundleId;
204         private final Message messages;
205         private final NodeId nodeId;
206         private final String flowId;
207         private final Uint8 tableId;
208         private final SettableFuture<RpcResult<AddBundleMessagesOutput>> resultFuture;
209
210         BundleFlowCallBack(InstanceIdentifier<FlowCapableNode> nodeIdent, BundleId bundleId, Message messages,
211             InstanceIdentifier<Flow> identifier , SettableFuture<RpcResult<AddBundleMessagesOutput>> resultFuture) {
212             this.nodeIdent = nodeIdent;
213             this.bundleId = bundleId;
214             this.messages = messages;
215             this.resultFuture = resultFuture;
216             this.flowId = getFlowId(new FlowRef(identifier));
217             this.tableId = getTableId(new FlowTableRef(identifier));
218             nodeId = getNodeIdFromNodeIdentifier(nodeIdent);
219         }
220
221         @Override
222         public void onSuccess(RpcResult<AddBundleMessagesOutput> rpcResult) {
223             if (rpcResult.isSuccessful()) {
224                 AddBundleMessagesInput addBundleMessagesInput = new AddBundleMessagesInputBuilder()
225                         .setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class))).setBundleId(bundleId)
226                         .setFlags(BUNDLE_FLAGS).setMessages(new MessagesBuilder().setMessage(
227                                 Collections.singletonList(messages)).build()).build();
228
229                 LOG.trace("Pushing flow add message {} to bundle {} for device {}", addBundleMessagesInput,
230                         bundleId.getValue(), nodeId.getValue());
231
232                 final ListenableFuture<RpcResult<AddBundleMessagesOutput>> addFuture =
233                         forwardingRulesManager.getSalBundleService().addBundleMessages(addBundleMessagesInput);
234                 Futures.addCallback(addFuture, new FutureCallback<RpcResult<AddBundleMessagesOutput>>() {
235                     @Override
236                     public void onSuccess(RpcResult<AddBundleMessagesOutput> result) {
237                         resultFuture.set(result);
238                         if (!result.getErrors().isEmpty()) {
239                             LOG.error("Flow add with flowId {} and tableId {} failed for node {} with error: {}",
240                                     flowId, tableId, nodeId.getValue(), result.getErrors().toString());
241                         }
242
243                     }
244
245                     @Override
246                     public void onFailure(Throwable failure) {
247                         resultFuture.setException(failure);
248                     }
249                 },  MoreExecutors.directExecutor());
250             } else {
251                 LOG.error("Error {} while pushing flow add bundle {} for device {}", rpcResult.getErrors(), messages,
252                         nodeId.getValue());
253                 resultFuture.set(rpcResult);
254             }
255         }
256
257         @Override
258         public void onFailure(Throwable throwable) {
259             LOG.error("Error while pushing flow add bundle {} for device {}", messages, nodeId.getValue());
260             resultFuture.setException(throwable);
261         }
262     }
263 }