Merge "OPNFLWPLUG-1087: ODL controller to provide view of openflow node reconciliatio...
[openflowplugin.git] / applications / arbitratorreconciliation / impl / src / main / java / org / opendaylight / openflowplugin / applications / arbitratorreconciliation / impl / ArbitratorReconciliationManagerImpl.java
1 /*
2  * Copyright (c) 2018 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.applications.arbitratorreconciliation.impl;
10
11 import com.google.common.base.Function;
12 import com.google.common.base.Preconditions;
13 import com.google.common.collect.ImmutableSet;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.ListeningExecutorService;
18 import com.google.common.util.concurrent.MoreExecutors;
19 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
20 import java.util.ArrayList;
21 import java.util.Collections;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.concurrent.Callable;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.ExecutionException;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.atomic.AtomicLong;
29 import javax.annotation.PostConstruct;
30 import javax.annotation.PreDestroy;
31 import javax.inject.Inject;
32 import javax.inject.Singleton;
33 import org.apache.aries.blueprint.annotation.service.Reference;
34 import org.opendaylight.mdsal.binding.api.RpcConsumerRegistry;
35 import org.opendaylight.mdsal.binding.api.RpcProviderService;
36 import org.opendaylight.openflowplugin.api.OFConstants;
37 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
38 import org.opendaylight.openflowplugin.applications.reconciliation.NotificationRegistration;
39 import org.opendaylight.openflowplugin.applications.reconciliation.ReconciliationManager;
40 import org.opendaylight.openflowplugin.applications.reconciliation.ReconciliationNotificationListener;
41 import org.opendaylight.serviceutils.upgrade.UpgradeState;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupId;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupTypes;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupBuilder;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesInput;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesInputBuilder;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesOutput;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundleInput;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundleInputBuilder;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundleOutput;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.SalBundleService;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.Messages;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.MessagesBuilder;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.Message;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.MessageBuilder;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveFlowCase;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveFlowCaseBuilder;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveGroupCase;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveGroupCaseBuilder;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.remove.flow._case.RemoveFlowCaseDataBuilder;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.remove.group._case.RemoveGroupCaseDataBuilder;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleControlType;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleFlags;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleId;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.ArbitratorReconcileService;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.CommitActiveBundleInput;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.CommitActiveBundleOutput;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.CommitActiveBundleOutputBuilder;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleInput;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleOutput;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleOutputBuilder;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.rf.state.rev170713.ResultState;
79 import org.opendaylight.yangtools.concepts.ObjectRegistration;
80 import org.opendaylight.yangtools.util.concurrent.FluentFutures;
81 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
82 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
83 import org.opendaylight.yangtools.yang.binding.RpcService;
84 import org.opendaylight.yangtools.yang.common.RpcError;
85 import org.opendaylight.yangtools.yang.common.RpcResult;
86 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
87 import org.opendaylight.yangtools.yang.common.Uint64;
88 import org.slf4j.Logger;
89 import org.slf4j.LoggerFactory;
90
91 @Singleton
92 public class ArbitratorReconciliationManagerImpl implements ArbitratorReconcileService,
93         ReconciliationNotificationListener, AutoCloseable {
94
95     private static final Logger LOG = LoggerFactory.getLogger(ArbitratorReconciliationManagerImpl.class);
96     private static final int THREAD_POOL_SIZE = 4;
97     private static final AtomicLong BUNDLE_ID = new AtomicLong();
98     private static final BundleFlags BUNDLE_FLAGS = new BundleFlags(true, true);
99     private static final int ARBITRATOR_RECONCILIATION_PRIORITY = Integer
100             .getInteger("arbitrator.reconciliation.manager.priority", 0/*default*/);
101     private static final String SERVICE_NAME = "ArbitratorReconciliationManager";
102     private static final String SEPARATOR = ":";
103
104     private static final BundleRemoveFlowCase DELETE_ALL_FLOW = new BundleRemoveFlowCaseBuilder()
105             .setRemoveFlowCaseData(
106                     new RemoveFlowCaseDataBuilder(new FlowBuilder().setTableId(OFConstants.OFPTT_ALL).build()).build())
107             .build();
108     private static final BundleRemoveGroupCase DELETE_ALL_GROUP = new BundleRemoveGroupCaseBuilder()
109             .setRemoveGroupCaseData(new RemoveGroupCaseDataBuilder(new GroupBuilder()
110                     .setGroupType(GroupTypes.GroupAll)
111                     .setGroupId(new GroupId(OFConstants.OFPG_ALL))
112                     .build()).build())
113             .build();
114
115     private final SalBundleService salBundleService;
116     private final ReconciliationManager reconciliationManager;
117     private final RpcProviderService rpcProviderService;
118     private final UpgradeState upgradeState;
119     private NotificationRegistration registration;
120     private final ListeningExecutorService executor = MoreExecutors.listeningDecorator(
121             Executors.newFixedThreadPool(THREAD_POOL_SIZE));
122     private final Map<Uint64, BundleDetails> bundleIdMap = new ConcurrentHashMap<>();
123     private  ObjectRegistration<? extends RpcService> rpcRegistration;
124
125     @Inject
126     public ArbitratorReconciliationManagerImpl(
127             @Reference ReconciliationManager reconciliationManager, @Reference RpcProviderService rpcProviderService,
128             @Reference final RpcConsumerRegistry rpcRegistry, @Reference UpgradeState upgradeState) {
129         Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry cannot be null !");
130         this.reconciliationManager = Preconditions.checkNotNull(reconciliationManager,
131                 "ReconciliationManager cannot be null!");
132         this.salBundleService = Preconditions.checkNotNull(rpcRegistry.getRpcService(SalBundleService.class),
133                 "RPC SalBundleService not found.");
134         this.rpcProviderService = rpcProviderService;
135         this.upgradeState = Preconditions.checkNotNull(upgradeState, "UpgradeState cannot be null!");
136     }
137
138     @PostConstruct
139     public void start() {
140         registration = reconciliationManager.registerService(this);
141         LOG.info("ArbitratorReconciliationManager has started successfully.");
142     }
143
144     @Override
145     @PreDestroy
146     public void close() throws Exception {
147         executor.shutdown();
148         if (registration != null) {
149             registration.close();
150             registration = null;
151         }
152     }
153
154     @Override
155     public ListenableFuture<RpcResult<CommitActiveBundleOutput>> commitActiveBundle(
156             CommitActiveBundleInput input) {
157         Uint64 nodeId = input.getNodeId();
158         if (bundleIdMap.containsKey(nodeId)) {
159             BundleId bundleId = bundleIdMap.get(nodeId).getBundleId();
160             if (bundleId != null) {
161                 final ControlBundleInput commitBundleInput = new ControlBundleInputBuilder()
162                         .setNode(input.getNode())
163                         .setBundleId(bundleId)
164                         .setFlags(BUNDLE_FLAGS)
165                         .setType(BundleControlType.ONFBCTCOMMITREQUEST)
166                         .build();
167                 ListenableFuture<RpcResult<ControlBundleOutput>> rpcResult = salBundleService
168                         .controlBundle(commitBundleInput);
169                 bundleIdMap.put(nodeId, new BundleDetails(bundleId, rpcResult));
170
171                 Futures.addCallback(rpcResult,
172                         new CommitActiveBundleCallback(nodeId),
173                         MoreExecutors.directExecutor());
174                 return Futures.transform(
175                         rpcResult,
176                         this.createRpcResultCondenser("committed active bundle"),
177                         MoreExecutors.directExecutor());
178             }
179         }
180         return RpcResultBuilder.success(new CommitActiveBundleOutputBuilder()
181                 .setResult(null).build())
182                 .withRpcErrors(Collections.singleton(RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION,
183                         null, "No active bundle found for the node" + nodeId.toString()))).buildFuture();
184     }
185
186     @Override
187     public ListenableFuture<RpcResult<GetActiveBundleOutput>> getActiveBundle(final GetActiveBundleInput input) {
188         Uint64 nodeId = input.getNodeId();
189         BundleDetails bundleDetails = bundleIdMap.get(nodeId);
190         if (bundleDetails != null) {
191             try {
192                 //This blocking call is used to prevent the applications from pushing flows and groups via the default
193                 // pipeline when the commit bundle is ongoing.
194                 bundleDetails.getResult().get();
195                 return RpcResultBuilder.success(new GetActiveBundleOutputBuilder()
196                         .setResult(bundleDetails.getBundleId())
197                         .build())
198                         .buildFuture();
199             } catch (InterruptedException | ExecutionException | NullPointerException e) {
200                 return RpcResultBuilder.<GetActiveBundleOutput>failed()
201                         .withRpcErrors(Collections.singleton(RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION,
202                                 null, e.getMessage()))).buildFuture();
203             }
204         }
205         return RpcResultBuilder.success(new GetActiveBundleOutputBuilder()
206                 .setResult(null).build()).buildFuture();
207     }
208
209     @Override
210     public ListenableFuture<Boolean> startReconciliation(DeviceInfo node) {
211         registerRpc(node);
212         if (upgradeState.isUpgradeInProgress()) {
213             LOG.trace("Starting arbitrator reconciliation for node {}", node.getDatapathId());
214             return reconcileConfiguration(node);
215         }
216         LOG.trace("arbitrator reconciliation is disabled");
217         return FluentFutures.immediateTrueFluentFuture();
218     }
219
220     @Override
221     public ListenableFuture<Boolean> endReconciliation(DeviceInfo node) {
222         Uint64 datapathId = node.getDatapathId();
223         LOG.trace("Stopping arbitrator reconciliation for node {}", datapathId);
224         bundleIdMap.remove(datapathId);
225         deregisterRpc(node);
226         return FluentFutures.immediateTrueFluentFuture();
227     }
228
229     @Override
230     public int getPriority() {
231         return ARBITRATOR_RECONCILIATION_PRIORITY;
232     }
233
234     @Override
235     public String getName() {
236         return SERVICE_NAME;
237     }
238
239     @Override
240     public ResultState getResultState() {
241         return ResultState.DONOTHING;
242     }
243
244     private ListenableFuture<Boolean> reconcileConfiguration(DeviceInfo node) {
245         LOG.info("Triggering arbitrator reconciliation for device {}", node.getDatapathId());
246         ArbitratorReconciliationTask upgradeReconTask = new ArbitratorReconciliationTask(node);
247         return executor.submit(upgradeReconTask);
248     }
249
250     @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
251             justification = "https://github.com/spotbugs/spotbugs/issues/811")
252     private Messages createMessages(final NodeRef nodeRef) {
253         final List<Message> messages = new ArrayList<>();
254         messages.add(new MessageBuilder()
255                 .setNode(nodeRef)
256                 .setBundleInnerMessage(DELETE_ALL_FLOW).build());
257         messages.add(new MessageBuilder()
258                 .setNode(nodeRef)
259                 .setBundleInnerMessage(DELETE_ALL_GROUP).build());
260         LOG.debug("The size of the flows and group messages created in createMessage() {}", messages.size());
261         return new MessagesBuilder().setMessage(messages).build();
262     }
263
264     private class ArbitratorReconciliationTask implements Callable<Boolean> {
265         final DeviceInfo deviceInfo;
266
267         ArbitratorReconciliationTask(final DeviceInfo deviceInfo) {
268             this.deviceInfo = deviceInfo;
269         }
270
271         @Override
272         public Boolean call() {
273             InstanceIdentifier<FlowCapableNode> nodeIdentity = deviceInfo.getNodeInstanceIdentifier()
274                     .augmentation(FlowCapableNode.class);
275             String node = nodeIdentity.firstKeyOf(Node.class).getId().getValue();
276             BundleId bundleIdValue = new BundleId(BUNDLE_ID.getAndIncrement());
277             LOG.debug("Triggering arbitrator reconciliation for device :{}", node);
278             final NodeRef nodeRef = new NodeRef(nodeIdentity.firstIdentifierOf(Node.class));
279
280             final ControlBundleInput closeBundleInput = new ControlBundleInputBuilder()
281                     .setNode(nodeRef)
282                     .setBundleId(bundleIdValue)
283                     .setFlags(BUNDLE_FLAGS)
284                     .setType(BundleControlType.ONFBCTCLOSEREQUEST)
285                     .build();
286
287             final ControlBundleInput openBundleInput = new ControlBundleInputBuilder()
288                     .setNode(nodeRef)
289                     .setBundleId(bundleIdValue)
290                     .setFlags(BUNDLE_FLAGS)
291                     .setType(BundleControlType.ONFBCTOPENREQUEST)
292                     .build();
293
294             final AddBundleMessagesInput addBundleMessagesInput = new AddBundleMessagesInputBuilder()
295                     .setNode(nodeRef)
296                     .setBundleId(bundleIdValue)
297                     .setFlags(BUNDLE_FLAGS)
298                     .setMessages(createMessages(nodeRef))
299                     .build();
300
301             ListenableFuture<RpcResult<ControlBundleOutput>> closeBundle = salBundleService
302                     .controlBundle(closeBundleInput);
303
304             ListenableFuture<RpcResult<ControlBundleOutput>> openBundleMessagesFuture = Futures
305                     .transformAsync(closeBundle, rpcResult -> salBundleService
306                             .controlBundle(openBundleInput), MoreExecutors.directExecutor());
307
308             ListenableFuture<RpcResult<AddBundleMessagesOutput>> addBundleMessagesFuture = Futures
309                     .transformAsync(openBundleMessagesFuture, rpcResult -> {
310                         if (rpcResult.isSuccessful()) {
311                             return salBundleService
312                                     .addBundleMessages(addBundleMessagesInput);
313                         }
314                         return FluentFutures.immediateNullFluentFuture();
315                     }, MoreExecutors.directExecutor());
316             Uint64 nodeId = getDpnIdFromNodeName(node);
317             try {
318                 if (addBundleMessagesFuture.get().isSuccessful()) {
319                     bundleIdMap.put(nodeId, new BundleDetails(bundleIdValue,FluentFutures.immediateNullFluentFuture()));
320                     LOG.debug("Arbitrator reconciliation initial task has been completed for node {} ", nodeId);
321                     return true;
322                 } else {
323                     LOG.error("Error while performing arbitrator reconciliation for device:{}", nodeId);
324                     return false;
325                 }
326             } catch (InterruptedException | ExecutionException e) {
327                 LOG.error("Error while performing arbitrator reconciliation for device:{}", nodeId, e);
328                 return false;
329             }
330         }
331     }
332
333     public final class CommitActiveBundleCallback implements FutureCallback<RpcResult<?>> {
334         private final Uint64 nodeId;
335
336         private CommitActiveBundleCallback(final Uint64 nodeId) {
337             this.nodeId = nodeId;
338         }
339
340         @Override
341         public void onSuccess(RpcResult<?> rpcResult) {
342             LOG.debug("Completed arbitrator reconciliation for device:{}", nodeId);
343             bundleIdMap.remove(nodeId);
344         }
345
346         @Override
347         public void onFailure(Throwable throwable) {
348             LOG.error("Error while performing arbitrator reconciliation for device {}", nodeId, throwable);
349         }
350     }
351
352     private <D> Function<RpcResult<D>,
353             RpcResult<CommitActiveBundleOutput>> createRpcResultCondenser(final String action) {
354         return input -> {
355             final RpcResultBuilder<CommitActiveBundleOutput> resultSink;
356             if (input != null) {
357                 List<RpcError> errors = new ArrayList<>();
358                 if (!input.isSuccessful()) {
359                     errors.addAll(input.getErrors());
360                     resultSink = RpcResultBuilder.<CommitActiveBundleOutput>failed().withRpcErrors(errors);
361                 } else {
362                     resultSink = RpcResultBuilder.success();
363                 }
364             } else {
365                 resultSink = RpcResultBuilder.<CommitActiveBundleOutput>failed()
366                         .withError(RpcError.ErrorType.APPLICATION, "action of " + action + " failed");
367             }
368             return resultSink.build();
369         };
370     }
371
372     private void registerRpc(DeviceInfo node) {
373         KeyedInstanceIdentifier<Node, NodeKey> path = InstanceIdentifier.create(Nodes.class)
374                 .child(Node.class, new NodeKey(node.getNodeId()));
375         LOG.debug("The path is registered : {}", path);
376         rpcRegistration = rpcProviderService.registerRpcImplementation(ArbitratorReconcileService.class,
377                 this, ImmutableSet.of(path));
378     }
379
380     private void deregisterRpc(DeviceInfo node) {
381         KeyedInstanceIdentifier<Node, NodeKey> path = InstanceIdentifier.create(Nodes.class)
382                 .child(Node.class, new NodeKey(node.getNodeId()));
383         LOG.debug("The path is unregistered : {}", path);
384         if (rpcRegistration != null) {
385             rpcRegistration.close();
386         }
387     }
388
389     private static class BundleDetails {
390         private final BundleId bundleId;
391         private final ListenableFuture<RpcResult<ControlBundleOutput>> result;
392
393         BundleDetails(BundleId bundleId, ListenableFuture<RpcResult<ControlBundleOutput>> result) {
394             this.bundleId = bundleId;
395             this.result = result;
396         }
397
398         public BundleId getBundleId() {
399             return bundleId;
400         }
401
402         public ListenableFuture<RpcResult<ControlBundleOutput>> getResult() {
403             return result;
404         }
405     }
406
407     @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
408             justification = "https://github.com/spotbugs/spotbugs/issues/811")
409     private Uint64 getDpnIdFromNodeName(String nodeName) {
410         String dpnId = nodeName.substring(nodeName.lastIndexOf(SEPARATOR) + 1);
411         return Uint64.valueOf(dpnId);
412     }
413 }