OPNFLWPLUG-1032: Neon-MRI: Bump odlparent, yangtools, mdsal
[openflowplugin.git] / applications / arbitratorreconciliation / impl / src / main / java / org / opendaylight / openflowplugin / applications / arbitratorreconciliation / impl / ArbitratorReconciliationManagerImpl.java
1 /*
2  * Copyright (c) 2018 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.applications.arbitratorreconciliation.impl;
10
11 import com.google.common.base.Function;
12 import com.google.common.base.Preconditions;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.JdkFutureAdapters;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.MoreExecutors;
18
19 import java.math.BigInteger;
20 import java.util.ArrayList;
21 import java.util.Collections;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.concurrent.Callable;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.ExecutionException;
27 import java.util.concurrent.ExecutorService;
28 import java.util.concurrent.Executors;
29 import java.util.concurrent.atomic.AtomicLong;
30
31 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration;
32 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
33 import org.opendaylight.openflowplugin.api.OFConstants;
34 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
35 import org.opendaylight.openflowplugin.applications.reconciliation.NotificationRegistration;
36 import org.opendaylight.openflowplugin.applications.reconciliation.ReconciliationManager;
37 import org.opendaylight.openflowplugin.applications.reconciliation.ReconciliationNotificationListener;
38 import org.opendaylight.serviceutils.upgrade.UpgradeState;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupId;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupTypes;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupBuilder;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeContext;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesInput;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesInputBuilder;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesOutput;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundleInput;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundleInputBuilder;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundleOutput;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.SalBundleService;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.Messages;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.MessagesBuilder;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.Message;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.MessageBuilder;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveFlowCaseBuilder;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveGroupCaseBuilder;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.remove.flow._case.RemoveFlowCaseDataBuilder;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.remove.group._case.RemoveGroupCaseDataBuilder;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleControlType;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleFlags;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleId;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.ArbitratorReconcileService;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.CommitActiveBundleInput;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.CommitActiveBundleOutput;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.CommitActiveBundleOutputBuilder;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleInput;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleOutput;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleOutputBuilder;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.rf.state.rev170713.ResultState;
77 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
78 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
79 import org.opendaylight.yangtools.yang.common.RpcError;
80 import org.opendaylight.yangtools.yang.common.RpcResult;
81 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
82 import org.slf4j.Logger;
83 import org.slf4j.LoggerFactory;
84
85 public class ArbitratorReconciliationManagerImpl implements ArbitratorReconcileService,
86         ReconciliationNotificationListener, AutoCloseable {
87
88     private static final Logger LOG = LoggerFactory.getLogger(ArbitratorReconciliationManagerImpl.class);
89     private static final int THREAD_POOL_SIZE = 4;
90     private static final AtomicLong BUNDLE_ID = new AtomicLong();
91     private static final BundleFlags BUNDLE_FLAGS = new BundleFlags(true, true);
92     private static final int ARBITRATOR_RECONCILIATION_PRIORITY = Integer
93             .getInteger("arbitrator.reconciliation.manager.priority", 0/*default*/);
94     private static final String SERVICE_NAME = "ArbitratorReconciliationManager";
95     private static final String SEPARATOR = ":";
96
97     private final SalBundleService salBundleService;
98     private final ReconciliationManager reconciliationManager;
99     private final RoutedRpcRegistration routedRpcReg;
100     private final UpgradeState upgradeState;
101     private NotificationRegistration registration;
102     private final ExecutorService executor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
103     private final Map<BigInteger, BundleDetails> bundleIdMap = new ConcurrentHashMap<>();
104
105     public ArbitratorReconciliationManagerImpl(final RpcProviderRegistry rpcRegistry,
106             final ReconciliationManager reconciliationManager, final UpgradeState upgradeState) {
107         Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry cannot be null !");
108         this.reconciliationManager = Preconditions.checkNotNull(reconciliationManager,
109                 "ReconciliationManager cannot be null!");
110         this.salBundleService = Preconditions.checkNotNull(rpcRegistry.getRpcService(SalBundleService.class),
111                 "RPC SalBundlService not found.");
112         this.routedRpcReg = rpcRegistry.addRoutedRpcImplementation(ArbitratorReconcileService.class,
113                 this);
114         this.upgradeState = Preconditions.checkNotNull(upgradeState, "UpgradeState cannot be null!");
115     }
116
117     public void start() {
118         registration = reconciliationManager.registerService(this);
119         LOG.info("ArbitratorReconciliationManager has started successfully.");
120     }
121
122     @Override
123     public void close() throws Exception {
124         executor.shutdown();
125         if (registration != null) {
126             registration.close();
127             registration = null;
128         }
129     }
130
131     @Override
132     public ListenableFuture<RpcResult<CommitActiveBundleOutput>> commitActiveBundle(
133             CommitActiveBundleInput input) {
134         BigInteger nodeId = input.getNodeId();
135         if (bundleIdMap.containsKey(nodeId)) {
136             BundleId bundleId = bundleIdMap.get(nodeId).getBundleId();
137             if (bundleId != null) {
138                 final ControlBundleInput commitBundleInput = new ControlBundleInputBuilder()
139                         .setNode(input.getNode()).setBundleId(bundleId)
140                         .setFlags(BUNDLE_FLAGS)
141                         .setType(BundleControlType.ONFBCTCOMMITREQUEST).build();
142                 ListenableFuture<RpcResult<ControlBundleOutput>> rpcResult = salBundleService
143                         .controlBundle(commitBundleInput);
144                 bundleIdMap.put(nodeId, new BundleDetails(bundleId, rpcResult));
145                 Futures.addCallback(rpcResult, new CommitActiveBundleCallback(nodeId),
146                         MoreExecutors.directExecutor());
147                 return Futures.transform(
148                         rpcResult,
149                         this.<ControlBundleOutput>createRpcResultCondenser("committed active bundle"),
150                         MoreExecutors.directExecutor());
151             }
152         }
153         return RpcResultBuilder.success((new CommitActiveBundleOutputBuilder()
154                 .setResult(null).build()))
155                 .withRpcErrors(Collections.singleton(RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION,
156                 null, "No active bundle found for the node" + nodeId.toString()))).buildFuture();
157     }
158
159     @Override
160     public ListenableFuture<RpcResult<GetActiveBundleOutput>> getActiveBundle(GetActiveBundleInput input) {
161         BigInteger nodeId = input.getNodeId();
162         BundleDetails bundleDetails = bundleIdMap.get(nodeId);
163         if (bundleDetails != null) {
164             try {
165                 //This blocking call is used to prevent the applications from pushing flows and groups via the default
166                 // pipeline when the commit bundle is ongoing.
167                 bundleDetails.getResult().get();
168                 return RpcResultBuilder.success((new GetActiveBundleOutputBuilder()
169                         .setResult(bundleDetails.getBundleId()).build())).buildFuture();
170             } catch (InterruptedException | ExecutionException | NullPointerException e) {
171                 return RpcResultBuilder.<GetActiveBundleOutput>failed()
172                         .withRpcErrors(Collections.singleton(RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION,
173                                 null, e.getMessage()))).buildFuture();
174             }
175         }
176         return RpcResultBuilder.success((new GetActiveBundleOutputBuilder()
177                 .setResult(null).build())).buildFuture();
178     }
179
180     @Override
181     public ListenableFuture<Boolean> startReconciliation(DeviceInfo node) {
182         registerRpc(node);
183         if (upgradeState.isUpgradeInProgress()) {
184             LOG.trace("Starting arbitrator reconciliation for node {}", node.getDatapathId());
185             return reconcileConfiguration(node);
186         }
187         LOG.trace("arbitrator reconciliation is disabled");
188         return Futures.immediateFuture(true);
189     }
190
191     @Override
192     public ListenableFuture<Boolean> endReconciliation(DeviceInfo node) {
193         LOG.trace("Stopping arbitrator reconciliation for node {}", node.getDatapathId());
194         InstanceIdentifier<FlowCapableNode> connectedNode = node.getNodeInstanceIdentifier()
195                 .augmentation(FlowCapableNode.class);
196         bundleIdMap.remove(connectedNode);
197         deregisterRpc(node);
198         return Futures.immediateFuture(true);
199     }
200
201     @Override
202     public int getPriority() {
203         return ARBITRATOR_RECONCILIATION_PRIORITY;
204     }
205
206     @Override
207     public String getName() {
208         return SERVICE_NAME;
209     }
210
211     @Override
212     public ResultState getResultState() {
213         return ResultState.DONOTHING;
214     }
215
216     private ListenableFuture<Boolean> reconcileConfiguration(DeviceInfo node) {
217         LOG.info("Triggering arbitrator reconciliation for device {}", node.getDatapathId());
218         ArbitratorReconciliationTask upgradeReconTask = new ArbitratorReconciliationTask(node);
219         return JdkFutureAdapters.listenInPoolThread(executor.submit(upgradeReconTask));
220     }
221
222     private Messages createMessages(final NodeRef nodeRef) {
223         final List<Message> messages = new ArrayList<>();
224         messages.add(new MessageBuilder().setNode(nodeRef).setBundleInnerMessage(
225                 new BundleRemoveFlowCaseBuilder()
226                 .setRemoveFlowCaseData(new RemoveFlowCaseDataBuilder(getDeleteAllFlow()).build())
227                 .build()).build());
228
229         messages.add(new MessageBuilder().setNode(nodeRef).setBundleInnerMessage(
230                 new BundleRemoveGroupCaseBuilder()
231                 .setRemoveGroupCaseData(new RemoveGroupCaseDataBuilder(getDeleteAllGroup()).build())
232                 .build()).build());
233         LOG.debug("The size of the flows and group messages created in createMessage() {}", messages.size());
234         return new MessagesBuilder().setMessage(messages).build();
235     }
236
237     private Flow getDeleteAllFlow() {
238         final FlowBuilder flowBuilder = new FlowBuilder();
239         flowBuilder.setTableId(OFConstants.OFPTT_ALL);
240         return flowBuilder.build();
241     }
242
243     private Group getDeleteAllGroup() {
244         final GroupBuilder groupBuilder = new GroupBuilder();
245         groupBuilder.setGroupType(GroupTypes.GroupAll);
246         groupBuilder.setGroupId(new GroupId(OFConstants.OFPG_ALL));
247         return groupBuilder.build();
248     }
249
250     private class ArbitratorReconciliationTask implements Callable<Boolean> {
251         final DeviceInfo deviceInfo;
252
253         ArbitratorReconciliationTask(final DeviceInfo deviceInfo) {
254             this.deviceInfo = deviceInfo;
255         }
256
257         @Override
258         public Boolean call() {
259             InstanceIdentifier<FlowCapableNode> nodeIdentity = deviceInfo.getNodeInstanceIdentifier()
260                     .augmentation(FlowCapableNode.class);
261             String node = nodeIdentity.firstKeyOf(Node.class).getId().getValue();
262             BundleId bundleIdValue = new BundleId(BUNDLE_ID.getAndIncrement());
263             LOG.debug("Triggering arbitrator reconciliation for device :{}", node);
264             final NodeRef nodeRef = new NodeRef(nodeIdentity.firstIdentifierOf(Node.class));
265
266             final ControlBundleInput closeBundleInput = new ControlBundleInputBuilder().setNode(nodeRef)
267                     .setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
268                     .setType(BundleControlType.ONFBCTCLOSEREQUEST).build();
269
270             final ControlBundleInput openBundleInput = new ControlBundleInputBuilder().setNode(nodeRef)
271                     .setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
272                     .setType(BundleControlType.ONFBCTOPENREQUEST).build();
273
274             final AddBundleMessagesInput addBundleMessagesInput = new AddBundleMessagesInputBuilder()
275                     .setNode(nodeRef).setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
276                     .setMessages(createMessages(nodeRef)).build();
277
278             ListenableFuture<RpcResult<ControlBundleOutput>> closeBundle = salBundleService
279                     .controlBundle(closeBundleInput);
280
281             ListenableFuture<RpcResult<ControlBundleOutput>> openBundleMessagesFuture = Futures
282                     .transformAsync(closeBundle, rpcResult -> salBundleService
283                             .controlBundle(openBundleInput), MoreExecutors.directExecutor());
284
285             ListenableFuture<RpcResult<AddBundleMessagesOutput>> addBundleMessagesFuture = Futures
286                     .transformAsync(openBundleMessagesFuture, rpcResult -> {
287                         if (rpcResult.isSuccessful()) {
288                             return salBundleService
289                                     .addBundleMessages(addBundleMessagesInput);
290                         }
291                         return Futures.immediateFuture(null);
292                     }, MoreExecutors.directExecutor());
293             BigInteger nodeId = getDpnIdFromNodeName(node);
294             try {
295                 if (addBundleMessagesFuture.get().isSuccessful()) {
296                     bundleIdMap.put(nodeId, new BundleDetails(bundleIdValue,
297                             Futures.immediateFuture(null)));
298                     LOG.debug("Arbitrator reconciliation initial task has been completed for node {} and open up"
299                             + " for application programming.", nodeId);
300                     return true;
301                 } else {
302                     LOG.error("Error while performing arbitrator reconciliation for device:{}", nodeId);
303                     return false;
304                 }
305             } catch (InterruptedException | ExecutionException e) {
306                 LOG.error("Error while performing arbitrator reconciliation for device:{}", nodeId, e);
307                 return false;
308             }
309         }
310     }
311
312     public final class CommitActiveBundleCallback implements FutureCallback<RpcResult<?>> {
313         private final BigInteger nodeId;
314
315         private CommitActiveBundleCallback(final BigInteger nodeId) {
316             this.nodeId = nodeId;
317         }
318
319         @Override
320         public void onSuccess(RpcResult<?> rpcResult) {
321             LOG.debug("Completed arbitrator reconciliation for device:{}", nodeId);
322             bundleIdMap.remove(nodeId);
323         }
324
325         @Override
326         public void onFailure(Throwable throwable) {
327             LOG.error("Error while performing arbitrator reconciliation for device {}",
328                     nodeId, throwable);
329         }
330     }
331
332     private <D> Function<RpcResult<D>,
333             RpcResult<CommitActiveBundleOutput>> createRpcResultCondenser(final String action) {
334         return input -> {
335             final RpcResultBuilder<CommitActiveBundleOutput> resultSink;
336             if (input != null) {
337                 List<RpcError> errors = new ArrayList<>();
338                 if (!input.isSuccessful()) {
339                     errors.addAll(input.getErrors());
340                     resultSink = RpcResultBuilder.<CommitActiveBundleOutput>failed().withRpcErrors(errors);
341                 } else {
342                     resultSink = RpcResultBuilder.success();
343                 }
344             } else {
345                 resultSink = RpcResultBuilder.<CommitActiveBundleOutput>failed()
346                         .withError(RpcError.ErrorType.APPLICATION, "action of " + action + " failed");
347             }
348             return resultSink.build();
349         };
350     }
351
352     private void registerRpc(DeviceInfo node) {
353         KeyedInstanceIdentifier<Node, NodeKey> path = InstanceIdentifier.create(Nodes.class)
354                 .child(Node.class, new NodeKey(node.getNodeId()));
355         LOG.debug("The path is registered : {}", path);
356         routedRpcReg.registerPath(NodeContext.class, path);
357     }
358
359     private void deregisterRpc(DeviceInfo node) {
360         KeyedInstanceIdentifier<Node, NodeKey> path = InstanceIdentifier.create(Nodes.class).child(Node.class,
361                 new NodeKey(node.getNodeId()));
362         LOG.debug("The path is unregistered : {}", path);
363         routedRpcReg.unregisterPath(NodeContext.class, path);
364     }
365
366     private static class BundleDetails {
367         private final BundleId bundleId;
368         private final ListenableFuture<RpcResult<ControlBundleOutput>> result;
369
370         BundleDetails(BundleId bundleId, ListenableFuture<RpcResult<ControlBundleOutput>> result) {
371             this.bundleId = bundleId;
372             this.result = result;
373         }
374
375         public BundleId getBundleId() {
376             return bundleId;
377         }
378
379         public ListenableFuture<RpcResult<ControlBundleOutput>> getResult() {
380             return result;
381         }
382     }
383
384     private BigInteger getDpnIdFromNodeName(String nodeName) {
385         String dpnId = nodeName.substring(nodeName.lastIndexOf(SEPARATOR) + 1);
386         return new BigInteger(dpnId);
387     }
388
389 }