Merge "OPNFLWPLUG-1005 : Implementation of arbitrator reconciliation"
[openflowplugin.git] / applications / arbitratorreconciliation / impl / src / main / java / org / opendaylight / openflowplugin / applications / arbitratorreconciliation / impl / ArbitratorReconciliationManagerImpl.java
1 /*
2  * Copyright (c) 2018 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.applications.arbitratorreconciliation.impl;
10
11 import com.google.common.base.Function;
12 import com.google.common.base.Preconditions;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.JdkFutureAdapters;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.MoreExecutors;
18
19 import java.math.BigInteger;
20 import java.util.ArrayList;
21 import java.util.Collections;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.concurrent.Callable;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.ExecutionException;
27 import java.util.concurrent.ExecutorService;
28 import java.util.concurrent.Executors;
29 import java.util.concurrent.atomic.AtomicLong;
30
31 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration;
32 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
33 import org.opendaylight.openflowplugin.api.OFConstants;
34 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
35 import org.opendaylight.openflowplugin.applications.reconciliation.NotificationRegistration;
36 import org.opendaylight.openflowplugin.applications.reconciliation.ReconciliationManager;
37 import org.opendaylight.openflowplugin.applications.reconciliation.ReconciliationNotificationListener;
38 import org.opendaylight.serviceutils.upgrade.UpgradeState;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupId;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupTypes;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupBuilder;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeContext;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesInput;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesInputBuilder;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesOutput;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundleInput;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundleInputBuilder;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundleOutput;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.SalBundleService;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.Messages;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.MessagesBuilder;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.Message;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.MessageBuilder;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveFlowCaseBuilder;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveGroupCaseBuilder;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.remove.flow._case.RemoveFlowCaseDataBuilder;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.remove.group._case.RemoveGroupCaseDataBuilder;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleControlType;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleFlags;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleId;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.ArbitratorReconcileService;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.CommitActiveBundleInput;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.CommitActiveBundleOutput;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.CommitActiveBundleOutputBuilder;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleInput;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleOutput;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleOutputBuilder;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.rf.state.rev170713.ResultState;
77 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
78 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
79 import org.opendaylight.yangtools.yang.common.RpcError;
80 import org.opendaylight.yangtools.yang.common.RpcResult;
81 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
82 import org.slf4j.Logger;
83 import org.slf4j.LoggerFactory;
84
85 public class ArbitratorReconciliationManagerImpl implements ArbitratorReconcileService,
86         ReconciliationNotificationListener, AutoCloseable {
87
88     private static final Logger LOG = LoggerFactory.getLogger(ArbitratorReconciliationManagerImpl.class);
89     private static final int THREAD_POOL_SIZE = 4;
90     private static final AtomicLong BUNDLE_ID = new AtomicLong();
91     private static final BundleFlags BUNDLE_FLAGS = new BundleFlags(true, true);
92     private static final int ARBITRATOR_RECONCILIATION_PRIORITY = Integer
93             .getInteger("arbitrator.reconciliation.manager.priority", 1/*default*/);
94     private static final String SERVICE_NAME = "ArbitratorReconciliationManager";
95     private static final String SEPARATOR = ":";
96
97     private final SalBundleService salBundleService;
98     private final ReconciliationManager reconciliationManager;
99     private final RoutedRpcRegistration routedRpcReg;
100     private final UpgradeState upgradeState;
101     private NotificationRegistration registration;
102     private final ExecutorService executor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
103     private final Map<BigInteger, BundleDetails> bundleIdMap = new ConcurrentHashMap<>();
104
105     public ArbitratorReconciliationManagerImpl(final RpcProviderRegistry rpcRegistry,
106             final ReconciliationManager reconciliationManager, final UpgradeState upgradeState) {
107         Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry cannot be null !");
108         this.reconciliationManager = Preconditions.checkNotNull(reconciliationManager,
109                 "ReconciliationManager cannot be null!");
110         this.salBundleService = Preconditions.checkNotNull(rpcRegistry.getRpcService(SalBundleService.class),
111                 "RPC SalBundlService not found.");
112         this.routedRpcReg = rpcRegistry.addRoutedRpcImplementation(ArbitratorReconcileService.class,
113                 this);
114         this.upgradeState = Preconditions.checkNotNull(upgradeState, "UpgradeState cannot be null!");
115     }
116
117     public void start() {
118         registration = reconciliationManager.registerService(this);
119         LOG.info("ArbitratorReconciliationManager has started successfully.");
120     }
121
122     @Override
123     public void close() throws Exception {
124         executor.shutdown();
125         registration.close();
126
127     }
128
129     @Override
130     public ListenableFuture<RpcResult<CommitActiveBundleOutput>> commitActiveBundle(
131             CommitActiveBundleInput input) {
132         BigInteger nodeId = input.getNodeId();
133         if (bundleIdMap.containsKey(nodeId)) {
134             BundleId bundleId = bundleIdMap.get(nodeId).getBundleId();
135             if (bundleId != null) {
136                 final ControlBundleInput commitBundleInput = new ControlBundleInputBuilder()
137                         .setNode(input.getNode()).setBundleId(bundleId)
138                         .setFlags(BUNDLE_FLAGS)
139                         .setType(BundleControlType.ONFBCTCOMMITREQUEST).build();
140                 ListenableFuture<RpcResult<ControlBundleOutput>> rpcResult = salBundleService
141                         .controlBundle(commitBundleInput);
142                 bundleIdMap.put(nodeId, new BundleDetails(bundleId, rpcResult));
143                 Futures.addCallback(rpcResult, new CommitActiveBundleCallback(nodeId),
144                         MoreExecutors.directExecutor());
145                 return Futures.transform(
146                         rpcResult,
147                         this.<ControlBundleOutput>createRpcResultCondenser("committed active bundle"),
148                         MoreExecutors.directExecutor());
149             }
150         }
151         return RpcResultBuilder.success((new CommitActiveBundleOutputBuilder()
152                 .setResult(null).build()))
153                 .withRpcErrors(Collections.singleton(RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION,
154                 null, "No active bundle found for the node" + nodeId.toString()))).buildFuture();
155     }
156
157     @Override
158     public ListenableFuture<RpcResult<GetActiveBundleOutput>> getActiveBundle(GetActiveBundleInput input) {
159         BigInteger nodeId = input.getNodeId();
160         BundleDetails bundleDetails = bundleIdMap.get(nodeId);
161         if (bundleDetails != null) {
162             try {
163                 //This blocking call is used to prevent the applications from pushing flows and groups via the default
164                 // pipeline when the commit bundle is ongoing.
165                 bundleDetails.getResult().get();
166                 return RpcResultBuilder.success((new GetActiveBundleOutputBuilder()
167                         .setResult(bundleDetails.getBundleId()).build())).buildFuture();
168             } catch (InterruptedException | ExecutionException | NullPointerException e) {
169                 return RpcResultBuilder.<GetActiveBundleOutput>failed()
170                         .withRpcErrors(Collections.singleton(RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION,
171                                 null, e.getMessage()))).buildFuture();
172             }
173         }
174         return RpcResultBuilder.success((new GetActiveBundleOutputBuilder()
175                 .setResult(null).build())).buildFuture();
176     }
177
178     @Override
179     public ListenableFuture<Boolean> startReconciliation(DeviceInfo node) {
180         registerRpc(node);
181         if (upgradeState.isUpgradeInProgress()) {
182             LOG.trace("Starting arbitrator reconciliation for node {}", node.getDatapathId());
183             return reconcileConfiguration(node);
184         }
185         LOG.trace("arbitrator reconciliation is disabled");
186         return Futures.immediateFuture(true);
187     }
188
189     @Override
190     public ListenableFuture<Boolean> endReconciliation(DeviceInfo node) {
191         LOG.trace("Stopping arbitrator reconciliation for node {}", node.getDatapathId());
192         InstanceIdentifier<FlowCapableNode> connectedNode = node.getNodeInstanceIdentifier()
193                 .augmentation(FlowCapableNode.class);
194         bundleIdMap.remove(connectedNode);
195         deregisterRpc(node);
196         return Futures.immediateFuture(true);
197     }
198
199     @Override
200     public int getPriority() {
201         return ARBITRATOR_RECONCILIATION_PRIORITY;
202     }
203
204     @Override
205     public String getName() {
206         return SERVICE_NAME;
207     }
208
209     @Override
210     public ResultState getResultState() {
211         return ResultState.DONOTHING;
212     }
213
214     private ListenableFuture<Boolean> reconcileConfiguration(DeviceInfo node) {
215         LOG.info("Triggering arbitrator reconciliation for device {}", node.getDatapathId());
216         ArbitratorReconciliationTask upgradeReconTask = new ArbitratorReconciliationTask(node);
217         return JdkFutureAdapters.listenInPoolThread(executor.submit(upgradeReconTask));
218     }
219
220     private Messages createMessages(final NodeRef nodeRef) {
221         final List<Message> messages = new ArrayList<>();
222         messages.add(new MessageBuilder().setNode(nodeRef).setBundleInnerMessage(
223                 new BundleRemoveFlowCaseBuilder()
224                 .setRemoveFlowCaseData(new RemoveFlowCaseDataBuilder(getDeleteAllFlow()).build())
225                 .build()).build());
226
227         messages.add(new MessageBuilder().setNode(nodeRef).setBundleInnerMessage(
228                 new BundleRemoveGroupCaseBuilder()
229                 .setRemoveGroupCaseData(new RemoveGroupCaseDataBuilder(getDeleteAllGroup()).build())
230                 .build()).build());
231         LOG.debug("The size of the flows and group messages created in createMessage() {}", messages.size());
232         return new MessagesBuilder().setMessage(messages).build();
233     }
234
235     private Flow getDeleteAllFlow() {
236         final FlowBuilder flowBuilder = new FlowBuilder();
237         flowBuilder.setTableId(OFConstants.OFPTT_ALL);
238         return flowBuilder.build();
239     }
240
241     private Group getDeleteAllGroup() {
242         final GroupBuilder groupBuilder = new GroupBuilder();
243         groupBuilder.setGroupType(GroupTypes.GroupAll);
244         groupBuilder.setGroupId(new GroupId(OFConstants.OFPG_ALL));
245         return groupBuilder.build();
246     }
247
248     private class ArbitratorReconciliationTask implements Callable<Boolean> {
249         final DeviceInfo deviceInfo;
250
251         ArbitratorReconciliationTask(final DeviceInfo deviceInfo) {
252             this.deviceInfo = deviceInfo;
253         }
254
255         @Override
256         public Boolean call() {
257             InstanceIdentifier<FlowCapableNode> nodeIdentity = deviceInfo.getNodeInstanceIdentifier()
258                     .augmentation(FlowCapableNode.class);
259             String node = nodeIdentity.firstKeyOf(Node.class, NodeKey.class).getId().getValue();
260             BundleId bundleIdValue = new BundleId(BUNDLE_ID.getAndIncrement());
261             LOG.debug("Triggering arbitrator reconciliation for device :{}", node);
262             final NodeRef nodeRef = new NodeRef(nodeIdentity.firstIdentifierOf(Node.class));
263
264             final ControlBundleInput closeBundleInput = new ControlBundleInputBuilder().setNode(nodeRef)
265                     .setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
266                     .setType(BundleControlType.ONFBCTCLOSEREQUEST).build();
267
268             final ControlBundleInput openBundleInput = new ControlBundleInputBuilder().setNode(nodeRef)
269                     .setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
270                     .setType(BundleControlType.ONFBCTOPENREQUEST).build();
271
272             final AddBundleMessagesInput addBundleMessagesInput = new AddBundleMessagesInputBuilder()
273                     .setNode(nodeRef).setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
274                     .setMessages(createMessages(nodeRef)).build();
275
276             ListenableFuture<RpcResult<ControlBundleOutput>> closeBundle = salBundleService
277                     .controlBundle(closeBundleInput);
278
279             ListenableFuture<RpcResult<ControlBundleOutput>> openBundleMessagesFuture = Futures
280                     .transformAsync(closeBundle, rpcResult -> salBundleService
281                             .controlBundle(openBundleInput), MoreExecutors.directExecutor());
282
283             ListenableFuture<RpcResult<AddBundleMessagesOutput>> addBundleMessagesFuture = Futures
284                     .transformAsync(openBundleMessagesFuture, rpcResult -> {
285                         if (rpcResult.isSuccessful()) {
286                             return salBundleService
287                                     .addBundleMessages(addBundleMessagesInput);
288                         }
289                         return Futures.immediateFuture(null);
290                     }, MoreExecutors.directExecutor());
291             BigInteger nodeId = getDpnIdFromNodeName(node);
292             try {
293                 if (addBundleMessagesFuture.get().isSuccessful()) {
294                     bundleIdMap.put(nodeId, new BundleDetails(bundleIdValue,
295                             Futures.immediateFuture(null)));
296                     LOG.debug("Arbitrator reconciliation initial task has been completed for node {} and open up"
297                             + " for application programming.", nodeId);
298                     return true;
299                 } else {
300                     LOG.error("Error while performing arbitrator reconciliation for device:{}", nodeId);
301                     return false;
302                 }
303             } catch (InterruptedException | ExecutionException e) {
304                 LOG.error("Error while performing arbitrator reconciliation for device:{}", nodeId, e);
305                 return false;
306             }
307         }
308     }
309
310     public final class CommitActiveBundleCallback implements FutureCallback<RpcResult<?>> {
311         private final BigInteger nodeId;
312
313         private CommitActiveBundleCallback(final BigInteger nodeId) {
314             this.nodeId = nodeId;
315         }
316
317         @Override
318         public void onSuccess(RpcResult<?> rpcResult) {
319             LOG.debug("Completed arbitrator reconciliation for device:{}", nodeId);
320             bundleIdMap.remove(nodeId);
321         }
322
323         @Override
324         public void onFailure(Throwable throwable) {
325             LOG.error("Error while performing arbitrator reconciliation for device {}",
326                     nodeId, throwable);
327         }
328     }
329
330     private <D> Function<RpcResult<D>,
331             RpcResult<CommitActiveBundleOutput>> createRpcResultCondenser(final String action) {
332         return input -> {
333             final RpcResultBuilder<CommitActiveBundleOutput> resultSink;
334             if (input != null) {
335                 List<RpcError> errors = new ArrayList<>();
336                 if (!input.isSuccessful()) {
337                     errors.addAll(input.getErrors());
338                     resultSink = RpcResultBuilder.<CommitActiveBundleOutput>failed().withRpcErrors(errors);
339                 } else {
340                     resultSink = RpcResultBuilder.success();
341                 }
342             } else {
343                 resultSink = RpcResultBuilder.<CommitActiveBundleOutput>failed()
344                         .withError(RpcError.ErrorType.APPLICATION, "action of " + action + " failed");
345             }
346             return resultSink.build();
347         };
348     }
349
350     private void registerRpc(DeviceInfo node) {
351         KeyedInstanceIdentifier<Node, NodeKey> path = InstanceIdentifier.create(Nodes.class)
352                 .child(Node.class, new NodeKey(node.getNodeId()));
353         LOG.debug("The path is registered : {}", path);
354         routedRpcReg.registerPath(NodeContext.class, path);
355     }
356
357     private void deregisterRpc(DeviceInfo node) {
358         KeyedInstanceIdentifier<Node, NodeKey> path = InstanceIdentifier.create(Nodes.class).child(Node.class,
359                 new NodeKey(node.getNodeId()));
360         LOG.debug("The path is unregistered : {}", path);
361         routedRpcReg.unregisterPath(NodeContext.class, path);
362     }
363
364     private static class BundleDetails {
365         private final BundleId bundleId;
366         private final ListenableFuture<RpcResult<ControlBundleOutput>> result;
367
368         BundleDetails(BundleId bundleId, ListenableFuture<RpcResult<ControlBundleOutput>> result) {
369             this.bundleId = bundleId;
370             this.result = result;
371         }
372
373         public BundleId getBundleId() {
374             return bundleId;
375         }
376
377         public ListenableFuture<RpcResult<ControlBundleOutput>> getResult() {
378             return result;
379         }
380     }
381
382     private BigInteger getDpnIdFromNodeName(String nodeName) {
383         String dpnId = nodeName.substring(nodeName.lastIndexOf(SEPARATOR) + 1);
384         return new BigInteger(dpnId);
385     }
386
387 }