Merge changes from topic "odlparent-6.0.1"
[openflowplugin.git] / applications / arbitratorreconciliation / impl / src / main / java / org / opendaylight / openflowplugin / applications / arbitratorreconciliation / impl / ArbitratorReconciliationManagerImpl.java
1 /*
2  * Copyright (c) 2018 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.applications.arbitratorreconciliation.impl;
10
11 import com.google.common.base.Function;
12 import com.google.common.base.Preconditions;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import com.google.common.util.concurrent.ListeningExecutorService;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
19 import java.util.ArrayList;
20 import java.util.Collections;
21 import java.util.List;
22 import java.util.Map;
23 import java.util.concurrent.Callable;
24 import java.util.concurrent.ConcurrentHashMap;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.Executors;
27 import java.util.concurrent.atomic.AtomicLong;
28 import javax.annotation.PostConstruct;
29 import javax.annotation.PreDestroy;
30 import javax.inject.Inject;
31 import javax.inject.Singleton;
32 import org.apache.aries.blueprint.annotation.service.Reference;
33 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration;
34 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
35 import org.opendaylight.openflowplugin.api.OFConstants;
36 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
37 import org.opendaylight.openflowplugin.applications.reconciliation.NotificationRegistration;
38 import org.opendaylight.openflowplugin.applications.reconciliation.ReconciliationManager;
39 import org.opendaylight.openflowplugin.applications.reconciliation.ReconciliationNotificationListener;
40 import org.opendaylight.serviceutils.upgrade.UpgradeState;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupId;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupTypes;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupBuilder;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeContext;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesInput;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesInputBuilder;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesOutput;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundleInput;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundleInputBuilder;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundleOutput;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.SalBundleService;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.Messages;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.MessagesBuilder;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.Message;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.MessageBuilder;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveFlowCase;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveFlowCaseBuilder;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveGroupCase;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveGroupCaseBuilder;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.remove.flow._case.RemoveFlowCaseDataBuilder;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.remove.group._case.RemoveGroupCaseDataBuilder;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleControlType;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleFlags;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleId;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.ArbitratorReconcileService;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.CommitActiveBundleInput;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.CommitActiveBundleOutput;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.CommitActiveBundleOutputBuilder;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleInput;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleOutput;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleOutputBuilder;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.rf.state.rev170713.ResultState;
79 import org.opendaylight.yangtools.util.concurrent.FluentFutures;
80 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
81 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
82 import org.opendaylight.yangtools.yang.common.RpcError;
83 import org.opendaylight.yangtools.yang.common.RpcResult;
84 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
85 import org.opendaylight.yangtools.yang.common.Uint64;
86 import org.slf4j.Logger;
87 import org.slf4j.LoggerFactory;
88
89 @Singleton
90 public class ArbitratorReconciliationManagerImpl implements ArbitratorReconcileService,
91         ReconciliationNotificationListener, AutoCloseable {
92
93     private static final Logger LOG = LoggerFactory.getLogger(ArbitratorReconciliationManagerImpl.class);
94     private static final int THREAD_POOL_SIZE = 4;
95     private static final AtomicLong BUNDLE_ID = new AtomicLong();
96     private static final BundleFlags BUNDLE_FLAGS = new BundleFlags(true, true);
97     private static final int ARBITRATOR_RECONCILIATION_PRIORITY = Integer
98             .getInteger("arbitrator.reconciliation.manager.priority", 0/*default*/);
99     private static final String SERVICE_NAME = "ArbitratorReconciliationManager";
100     private static final String SEPARATOR = ":";
101
102     private static final BundleRemoveFlowCase DELETE_ALL_FLOW = new BundleRemoveFlowCaseBuilder()
103             .setRemoveFlowCaseData(
104                 new RemoveFlowCaseDataBuilder(new FlowBuilder().setTableId(OFConstants.OFPTT_ALL).build()).build())
105             .build();
106     private static final BundleRemoveGroupCase DELETE_ALL_GROUP = new BundleRemoveGroupCaseBuilder()
107             .setRemoveGroupCaseData(new RemoveGroupCaseDataBuilder(new GroupBuilder()
108                 .setGroupType(GroupTypes.GroupAll)
109                 .setGroupId(new GroupId(OFConstants.OFPG_ALL))
110                 .build()).build())
111             .build();
112
113     private final SalBundleService salBundleService;
114     private final ReconciliationManager reconciliationManager;
115     private final RoutedRpcRegistration routedRpcReg;
116     private final UpgradeState upgradeState;
117     private NotificationRegistration registration;
118     private final ListeningExecutorService executor = MoreExecutors.listeningDecorator(
119             Executors.newFixedThreadPool(THREAD_POOL_SIZE));
120     private final Map<Uint64, BundleDetails> bundleIdMap = new ConcurrentHashMap<>();
121
122     @Inject
123     public ArbitratorReconciliationManagerImpl(@Reference RpcProviderRegistry rpcRegistry,
124             @Reference ReconciliationManager reconciliationManager, @Reference UpgradeState upgradeState) {
125         Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry cannot be null !");
126         this.reconciliationManager = Preconditions.checkNotNull(reconciliationManager,
127                 "ReconciliationManager cannot be null!");
128         this.salBundleService = Preconditions.checkNotNull(rpcRegistry.getRpcService(SalBundleService.class),
129                 "RPC SalBundlService not found.");
130         this.routedRpcReg = rpcRegistry.addRoutedRpcImplementation(ArbitratorReconcileService.class,
131                 this);
132         this.upgradeState = Preconditions.checkNotNull(upgradeState, "UpgradeState cannot be null!");
133     }
134
135     @PostConstruct
136     public void start() {
137         registration = reconciliationManager.registerService(this);
138         LOG.info("ArbitratorReconciliationManager has started successfully.");
139     }
140
141     @Override
142     @PreDestroy
143     public void close() throws Exception {
144         executor.shutdown();
145         if (registration != null) {
146             registration.close();
147             registration = null;
148         }
149     }
150
151     @Override
152     public ListenableFuture<RpcResult<CommitActiveBundleOutput>> commitActiveBundle(
153             CommitActiveBundleInput input) {
154         Uint64 nodeId = input.getNodeId();
155         if (bundleIdMap.containsKey(nodeId)) {
156             BundleId bundleId = bundleIdMap.get(nodeId).getBundleId();
157             if (bundleId != null) {
158                 final ControlBundleInput commitBundleInput = new ControlBundleInputBuilder()
159                         .setNode(input.getNode()).setBundleId(bundleId)
160                         .setFlags(BUNDLE_FLAGS)
161                         .setType(BundleControlType.ONFBCTCOMMITREQUEST).build();
162                 ListenableFuture<RpcResult<ControlBundleOutput>> rpcResult = salBundleService
163                         .controlBundle(commitBundleInput);
164                 bundleIdMap.put(nodeId, new BundleDetails(bundleId, rpcResult));
165                 Futures.addCallback(rpcResult, new CommitActiveBundleCallback(nodeId),
166                         MoreExecutors.directExecutor());
167                 return Futures.transform(
168                         rpcResult,
169                         this.createRpcResultCondenser("committed active bundle"),
170                         MoreExecutors.directExecutor());
171             }
172         }
173         return RpcResultBuilder.success(new CommitActiveBundleOutputBuilder()
174                 .setResult(null).build())
175                 .withRpcErrors(Collections.singleton(RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION,
176                 null, "No active bundle found for the node" + nodeId.toString()))).buildFuture();
177     }
178
179     @Override
180     public ListenableFuture<RpcResult<GetActiveBundleOutput>> getActiveBundle(GetActiveBundleInput input) {
181         Uint64 nodeId = input.getNodeId();
182         BundleDetails bundleDetails = bundleIdMap.get(nodeId);
183         if (bundleDetails != null) {
184             try {
185                 //This blocking call is used to prevent the applications from pushing flows and groups via the default
186                 // pipeline when the commit bundle is ongoing.
187                 bundleDetails.getResult().get();
188                 return RpcResultBuilder.success(new GetActiveBundleOutputBuilder()
189                         .setResult(bundleDetails.getBundleId()).build()).buildFuture();
190             } catch (InterruptedException | ExecutionException | NullPointerException e) {
191                 return RpcResultBuilder.<GetActiveBundleOutput>failed()
192                         .withRpcErrors(Collections.singleton(RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION,
193                                 null, e.getMessage()))).buildFuture();
194             }
195         }
196         return RpcResultBuilder.success(new GetActiveBundleOutputBuilder()
197                 .setResult(null).build()).buildFuture();
198     }
199
200     @Override
201     public ListenableFuture<Boolean> startReconciliation(DeviceInfo node) {
202         registerRpc(node);
203         if (upgradeState.isUpgradeInProgress()) {
204             LOG.trace("Starting arbitrator reconciliation for node {}", node.getDatapathId());
205             return reconcileConfiguration(node);
206         }
207         LOG.trace("arbitrator reconciliation is disabled");
208         return FluentFutures.immediateTrueFluentFuture();
209     }
210
211     @Override
212     public ListenableFuture<Boolean> endReconciliation(DeviceInfo node) {
213         Uint64 datapathId = node.getDatapathId();
214         LOG.trace("Stopping arbitrator reconciliation for node {}", datapathId);
215         bundleIdMap.remove(datapathId);
216         deregisterRpc(node);
217         return FluentFutures.immediateTrueFluentFuture();
218     }
219
220     @Override
221     public int getPriority() {
222         return ARBITRATOR_RECONCILIATION_PRIORITY;
223     }
224
225     @Override
226     public String getName() {
227         return SERVICE_NAME;
228     }
229
230     @Override
231     public ResultState getResultState() {
232         return ResultState.DONOTHING;
233     }
234
235     private ListenableFuture<Boolean> reconcileConfiguration(DeviceInfo node) {
236         LOG.info("Triggering arbitrator reconciliation for device {}", node.getDatapathId());
237         ArbitratorReconciliationTask upgradeReconTask = new ArbitratorReconciliationTask(node);
238         return executor.submit(upgradeReconTask);
239     }
240
241     @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
242             justification = "https://github.com/spotbugs/spotbugs/issues/811")
243     private Messages createMessages(final NodeRef nodeRef) {
244         final List<Message> messages = new ArrayList<>();
245         messages.add(new MessageBuilder().setNode(nodeRef).setBundleInnerMessage(DELETE_ALL_FLOW).build());
246         messages.add(new MessageBuilder().setNode(nodeRef).setBundleInnerMessage(DELETE_ALL_GROUP).build());
247         LOG.debug("The size of the flows and group messages created in createMessage() {}", messages.size());
248         return new MessagesBuilder().setMessage(messages).build();
249     }
250
251     private class ArbitratorReconciliationTask implements Callable<Boolean> {
252         final DeviceInfo deviceInfo;
253
254         ArbitratorReconciliationTask(final DeviceInfo deviceInfo) {
255             this.deviceInfo = deviceInfo;
256         }
257
258         @Override
259         public Boolean call() {
260             InstanceIdentifier<FlowCapableNode> nodeIdentity = deviceInfo.getNodeInstanceIdentifier()
261                     .augmentation(FlowCapableNode.class);
262             String node = nodeIdentity.firstKeyOf(Node.class).getId().getValue();
263             BundleId bundleIdValue = new BundleId(BUNDLE_ID.getAndIncrement());
264             LOG.debug("Triggering arbitrator reconciliation for device :{}", node);
265             final NodeRef nodeRef = new NodeRef(nodeIdentity.firstIdentifierOf(Node.class));
266
267             final ControlBundleInput closeBundleInput = new ControlBundleInputBuilder().setNode(nodeRef)
268                     .setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
269                     .setType(BundleControlType.ONFBCTCLOSEREQUEST).build();
270
271             final ControlBundleInput openBundleInput = new ControlBundleInputBuilder().setNode(nodeRef)
272                     .setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
273                     .setType(BundleControlType.ONFBCTOPENREQUEST).build();
274
275             final AddBundleMessagesInput addBundleMessagesInput = new AddBundleMessagesInputBuilder()
276                     .setNode(nodeRef).setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
277                     .setMessages(createMessages(nodeRef)).build();
278
279             ListenableFuture<RpcResult<ControlBundleOutput>> closeBundle = salBundleService
280                     .controlBundle(closeBundleInput);
281
282             ListenableFuture<RpcResult<ControlBundleOutput>> openBundleMessagesFuture = Futures
283                     .transformAsync(closeBundle, rpcResult -> salBundleService
284                             .controlBundle(openBundleInput), MoreExecutors.directExecutor());
285
286             ListenableFuture<RpcResult<AddBundleMessagesOutput>> addBundleMessagesFuture = Futures
287                     .transformAsync(openBundleMessagesFuture, rpcResult -> {
288                         if (rpcResult.isSuccessful()) {
289                             return salBundleService
290                                     .addBundleMessages(addBundleMessagesInput);
291                         }
292                         return FluentFutures.immediateNullFluentFuture();
293                     }, MoreExecutors.directExecutor());
294             Uint64 nodeId = getDpnIdFromNodeName(node);
295             try {
296                 if (addBundleMessagesFuture.get().isSuccessful()) {
297                     bundleIdMap.put(nodeId, new BundleDetails(bundleIdValue,
298                         FluentFutures.immediateNullFluentFuture()));
299                     LOG.debug("Arbitrator reconciliation initial task has been completed for node {} and open up"
300                             + " for application programming.", nodeId);
301                     return true;
302                 } else {
303                     LOG.error("Error while performing arbitrator reconciliation for device:{}", nodeId);
304                     return false;
305                 }
306             } catch (InterruptedException | ExecutionException e) {
307                 LOG.error("Error while performing arbitrator reconciliation for device:{}", nodeId, e);
308                 return false;
309             }
310         }
311     }
312
313     public final class CommitActiveBundleCallback implements FutureCallback<RpcResult<?>> {
314         private final Uint64 nodeId;
315
316         private CommitActiveBundleCallback(final Uint64 nodeId) {
317             this.nodeId = nodeId;
318         }
319
320         @Override
321         public void onSuccess(RpcResult<?> rpcResult) {
322             LOG.debug("Completed arbitrator reconciliation for device:{}", nodeId);
323             bundleIdMap.remove(nodeId);
324         }
325
326         @Override
327         public void onFailure(Throwable throwable) {
328             LOG.error("Error while performing arbitrator reconciliation for device {}", nodeId, throwable);
329         }
330     }
331
332     private <D> Function<RpcResult<D>,
333             RpcResult<CommitActiveBundleOutput>> createRpcResultCondenser(final String action) {
334         return input -> {
335             final RpcResultBuilder<CommitActiveBundleOutput> resultSink;
336             if (input != null) {
337                 List<RpcError> errors = new ArrayList<>();
338                 if (!input.isSuccessful()) {
339                     errors.addAll(input.getErrors());
340                     resultSink = RpcResultBuilder.<CommitActiveBundleOutput>failed().withRpcErrors(errors);
341                 } else {
342                     resultSink = RpcResultBuilder.success();
343                 }
344             } else {
345                 resultSink = RpcResultBuilder.<CommitActiveBundleOutput>failed()
346                         .withError(RpcError.ErrorType.APPLICATION, "action of " + action + " failed");
347             }
348             return resultSink.build();
349         };
350     }
351
352     private void registerRpc(DeviceInfo node) {
353         KeyedInstanceIdentifier<Node, NodeKey> path = InstanceIdentifier.create(Nodes.class)
354                 .child(Node.class, new NodeKey(node.getNodeId()));
355         LOG.debug("The path is registered : {}", path);
356         routedRpcReg.registerPath(NodeContext.class, path);
357     }
358
359     private void deregisterRpc(DeviceInfo node) {
360         KeyedInstanceIdentifier<Node, NodeKey> path = InstanceIdentifier.create(Nodes.class).child(Node.class,
361                 new NodeKey(node.getNodeId()));
362         LOG.debug("The path is unregistered : {}", path);
363         routedRpcReg.unregisterPath(NodeContext.class, path);
364     }
365
366     private static class BundleDetails {
367         private final BundleId bundleId;
368         private final ListenableFuture<RpcResult<ControlBundleOutput>> result;
369
370         BundleDetails(BundleId bundleId, ListenableFuture<RpcResult<ControlBundleOutput>> result) {
371             this.bundleId = bundleId;
372             this.result = result;
373         }
374
375         public BundleId getBundleId() {
376             return bundleId;
377         }
378
379         public ListenableFuture<RpcResult<ControlBundleOutput>> getResult() {
380             return result;
381         }
382     }
383
384     @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
385             justification = "https://github.com/spotbugs/spotbugs/issues/811")
386     private Uint64 getDpnIdFromNodeName(String nodeName) {
387         String dpnId = nodeName.substring(nodeName.lastIndexOf(SEPARATOR) + 1);
388         return Uint64.valueOf(dpnId);
389     }
390 }