Bump MRI upstreams
[openflowplugin.git] / applications / arbitratorreconciliation / impl / src / main / java / org / opendaylight / openflowplugin / applications / arbitratorreconciliation / impl / ArbitratorReconciliationManagerImpl.java
1 /*
2  * Copyright (c) 2018 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.applications.arbitratorreconciliation.impl;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.base.Function;
13 import com.google.common.base.Preconditions;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.ListeningExecutorService;
18 import com.google.common.util.concurrent.MoreExecutors;
19 import java.util.ArrayList;
20 import java.util.List;
21 import java.util.Map;
22 import java.util.Set;
23 import java.util.concurrent.Callable;
24 import java.util.concurrent.ConcurrentHashMap;
25 import java.util.concurrent.ConcurrentMap;
26 import java.util.concurrent.ExecutionException;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.atomic.AtomicLong;
29 import javax.annotation.PostConstruct;
30 import javax.annotation.PreDestroy;
31 import javax.inject.Inject;
32 import javax.inject.Singleton;
33 import org.opendaylight.mdsal.binding.api.RpcConsumerRegistry;
34 import org.opendaylight.mdsal.binding.api.RpcProviderService;
35 import org.opendaylight.openflowplugin.api.OFConstants;
36 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
37 import org.opendaylight.openflowplugin.applications.reconciliation.NotificationRegistration;
38 import org.opendaylight.openflowplugin.applications.reconciliation.ReconciliationManager;
39 import org.opendaylight.openflowplugin.applications.reconciliation.ReconciliationNotificationListener;
40 import org.opendaylight.serviceutils.upgrade.UpgradeState;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupId;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupTypes;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupBuilder;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesInput;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesInputBuilder;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesOutput;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundleInput;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundleInputBuilder;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundleOutput;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.SalBundleService;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.Messages;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.MessagesBuilder;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.Message;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.MessageBuilder;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveFlowCase;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveFlowCaseBuilder;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveGroupCase;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveGroupCaseBuilder;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.remove.flow._case.RemoveFlowCaseDataBuilder;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.remove.group._case.RemoveGroupCaseDataBuilder;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleControlType;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleFlags;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleId;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.ArbitratorReconcileService;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.CommitActiveBundleInput;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.CommitActiveBundleOutput;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.CommitActiveBundleOutputBuilder;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleInput;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleOutput;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleOutputBuilder;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.rf.state.rev170713.ResultState;
77 import org.opendaylight.yangtools.concepts.ObjectRegistration;
78 import org.opendaylight.yangtools.util.concurrent.FluentFutures;
79 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
80 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
81 import org.opendaylight.yangtools.yang.binding.RpcService;
82 import org.opendaylight.yangtools.yang.common.ErrorType;
83 import org.opendaylight.yangtools.yang.common.RpcError;
84 import org.opendaylight.yangtools.yang.common.RpcResult;
85 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
86 import org.opendaylight.yangtools.yang.common.Uint32;
87 import org.opendaylight.yangtools.yang.common.Uint64;
88 import org.slf4j.Logger;
89 import org.slf4j.LoggerFactory;
90
91 @Singleton
92 public class ArbitratorReconciliationManagerImpl implements ArbitratorReconcileService,
93         ReconciliationNotificationListener, AutoCloseable {
94
95     private static final Logger LOG = LoggerFactory.getLogger(ArbitratorReconciliationManagerImpl.class);
96     private static final int THREAD_POOL_SIZE = 4;
97     private static final AtomicLong BUNDLE_ID = new AtomicLong();
98     private static final BundleFlags BUNDLE_FLAGS = new BundleFlags(true, true);
99     private static final int ARBITRATOR_RECONCILIATION_PRIORITY = Integer
100             .getInteger("arbitrator.reconciliation.manager.priority", 0/*default*/);
101     private static final String SERVICE_NAME = "ArbitratorReconciliationManager";
102     private static final String SEPARATOR = ":";
103
104     private static final BundleRemoveFlowCase DELETE_ALL_FLOW = new BundleRemoveFlowCaseBuilder()
105             .setRemoveFlowCaseData(new RemoveFlowCaseDataBuilder()
106                 .setTableId(OFConstants.OFPTT_ALL)
107                 .build())
108             .build();
109     private static final BundleRemoveGroupCase DELETE_ALL_GROUP = new BundleRemoveGroupCaseBuilder()
110             .setRemoveGroupCaseData(new RemoveGroupCaseDataBuilder(new GroupBuilder()
111                     .setGroupType(GroupTypes.GroupAll)
112                     .setGroupId(new GroupId(OFConstants.OFPG_ALL))
113                     .build()).build())
114             .build();
115
116     private final SalBundleService salBundleService;
117     private final ReconciliationManager reconciliationManager;
118     private final RpcProviderService rpcProviderService;
119     private final UpgradeState upgradeState;
120     private NotificationRegistration registration;
121     private final ListeningExecutorService executor = MoreExecutors.listeningDecorator(
122             Executors.newFixedThreadPool(THREAD_POOL_SIZE));
123     private final Map<Uint64, BundleDetails> bundleIdMap = new ConcurrentHashMap<>();
124     private final ConcurrentMap<String,
125             ObjectRegistration<? extends RpcService>> rpcRegistrations = new ConcurrentHashMap<>();
126
127     @Inject
128     public ArbitratorReconciliationManagerImpl(final ReconciliationManager reconciliationManager,
129             final RpcProviderService rpcProviderService, final RpcConsumerRegistry rpcRegistry,
130             final UpgradeState upgradeState) {
131         Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry cannot be null !");
132         this.reconciliationManager = requireNonNull(reconciliationManager, "ReconciliationManager cannot be null!");
133         salBundleService = requireNonNull(rpcRegistry.getRpcService(SalBundleService.class),
134                 "RPC SalBundleService not found.");
135         this.rpcProviderService = rpcProviderService;
136         this.upgradeState = requireNonNull(upgradeState, "UpgradeState cannot be null!");
137     }
138
139     @PostConstruct
140     public void start() {
141         registration = reconciliationManager.registerService(this);
142         LOG.info("ArbitratorReconciliationManager has started successfully.");
143     }
144
145     @Override
146     @PreDestroy
147     public void close() throws Exception {
148         executor.shutdown();
149         if (registration != null) {
150             registration.close();
151             registration = null;
152         }
153     }
154
155     @Override
156     public ListenableFuture<RpcResult<CommitActiveBundleOutput>> commitActiveBundle(
157             final CommitActiveBundleInput input) {
158         Uint64 nodeId = input.getNodeId();
159         if (bundleIdMap.containsKey(nodeId)) {
160             BundleId bundleId = bundleIdMap.get(nodeId).getBundleId();
161             if (bundleId != null) {
162                 final ControlBundleInput commitBundleInput = new ControlBundleInputBuilder()
163                         .setNode(input.getNode())
164                         .setBundleId(bundleId)
165                         .setFlags(BUNDLE_FLAGS)
166                         .setType(BundleControlType.ONFBCTCOMMITREQUEST)
167                         .build();
168                 ListenableFuture<RpcResult<ControlBundleOutput>> rpcResult = salBundleService
169                         .controlBundle(commitBundleInput);
170                 bundleIdMap.put(nodeId, new BundleDetails(bundleId, rpcResult));
171
172                 Futures.addCallback(rpcResult,
173                         new CommitActiveBundleCallback(nodeId),
174                         MoreExecutors.directExecutor());
175                 return Futures.transform(
176                         rpcResult,
177                         createRpcResultCondenser("committed active bundle"),
178                         MoreExecutors.directExecutor());
179             }
180         }
181         return RpcResultBuilder.success(new CommitActiveBundleOutputBuilder()
182                 .setResult(null).build())
183                 .withRpcErrors(List.of(RpcResultBuilder.newError(ErrorType.APPLICATION,
184                         null, "No active bundle found for the node" + nodeId))).buildFuture();
185     }
186
187     @Override
188     public ListenableFuture<RpcResult<GetActiveBundleOutput>> getActiveBundle(final GetActiveBundleInput input) {
189         Uint64 nodeId = input.getNodeId();
190         BundleDetails bundleDetails = bundleIdMap.get(nodeId);
191         if (bundleDetails != null) {
192             try {
193                 //This blocking call is used to prevent the applications from pushing flows and groups via the default
194                 // pipeline when the commit bundle is ongoing.
195                 bundleDetails.getResult().get();
196                 return RpcResultBuilder.success(new GetActiveBundleOutputBuilder()
197                         .setResult(bundleDetails.getBundleId())
198                         .build())
199                         .buildFuture();
200             } catch (InterruptedException | ExecutionException e) {
201                 return RpcResultBuilder.<GetActiveBundleOutput>failed()
202                         .withRpcErrors(List.of(RpcResultBuilder.newError(ErrorType.APPLICATION,
203                                 null, e.getMessage()))).buildFuture();
204             }
205         }
206         return RpcResultBuilder.success(new GetActiveBundleOutputBuilder()
207                 .setResult(null).build()).buildFuture();
208     }
209
210     @Override
211     public ListenableFuture<Boolean> startReconciliation(final DeviceInfo node) {
212         registerRpc(node);
213         if (upgradeState.isUpgradeInProgress()) {
214             LOG.trace("Starting arbitrator reconciliation for node {}", node.getDatapathId());
215             return reconcileConfiguration(node);
216         }
217         LOG.trace("arbitrator reconciliation is disabled");
218         return FluentFutures.immediateTrueFluentFuture();
219     }
220
221     @Override
222     public ListenableFuture<Boolean> endReconciliation(final DeviceInfo node) {
223         Uint64 datapathId = node.getDatapathId();
224         LOG.trace("Stopping arbitrator reconciliation for node {}", datapathId);
225         bundleIdMap.remove(datapathId);
226         deregisterRpc(node);
227         return FluentFutures.immediateTrueFluentFuture();
228     }
229
230     @Override
231     public int getPriority() {
232         return ARBITRATOR_RECONCILIATION_PRIORITY;
233     }
234
235     @Override
236     public String getName() {
237         return SERVICE_NAME;
238     }
239
240     @Override
241     public ResultState getResultState() {
242         return ResultState.DONOTHING;
243     }
244
245     private ListenableFuture<Boolean> reconcileConfiguration(final DeviceInfo node) {
246         LOG.info("Triggering arbitrator reconciliation for device {}", node.getDatapathId());
247         ArbitratorReconciliationTask upgradeReconTask = new ArbitratorReconciliationTask(node);
248         return executor.submit(upgradeReconTask);
249     }
250
251     private static Messages createMessages(final NodeRef nodeRef) {
252         final List<Message> messages = new ArrayList<>();
253         messages.add(new MessageBuilder()
254                 .setNode(nodeRef)
255                 .setBundleInnerMessage(DELETE_ALL_FLOW).build());
256         messages.add(new MessageBuilder()
257                 .setNode(nodeRef)
258                 .setBundleInnerMessage(DELETE_ALL_GROUP).build());
259         LOG.debug("The size of the flows and group messages created in createMessage() {}", messages.size());
260         return new MessagesBuilder().setMessage(messages).build();
261     }
262
263     private class ArbitratorReconciliationTask implements Callable<Boolean> {
264         final DeviceInfo deviceInfo;
265
266         ArbitratorReconciliationTask(final DeviceInfo deviceInfo) {
267             this.deviceInfo = deviceInfo;
268         }
269
270         @Override
271         public Boolean call() {
272             InstanceIdentifier<FlowCapableNode> nodeIdentity = deviceInfo.getNodeInstanceIdentifier()
273                     .augmentation(FlowCapableNode.class);
274             String node = nodeIdentity.firstKeyOf(Node.class).getId().getValue();
275             BundleId bundleIdValue = new BundleId(Uint32.valueOf(BUNDLE_ID.getAndIncrement()));
276             LOG.debug("Triggering arbitrator reconciliation for device :{}", node);
277             final NodeRef nodeRef = new NodeRef(nodeIdentity.firstIdentifierOf(Node.class));
278
279             final ControlBundleInput closeBundleInput = new ControlBundleInputBuilder()
280                     .setNode(nodeRef)
281                     .setBundleId(bundleIdValue)
282                     .setFlags(BUNDLE_FLAGS)
283                     .setType(BundleControlType.ONFBCTCLOSEREQUEST)
284                     .build();
285
286             final ControlBundleInput openBundleInput = new ControlBundleInputBuilder()
287                     .setNode(nodeRef)
288                     .setBundleId(bundleIdValue)
289                     .setFlags(BUNDLE_FLAGS)
290                     .setType(BundleControlType.ONFBCTOPENREQUEST)
291                     .build();
292
293             final AddBundleMessagesInput addBundleMessagesInput = new AddBundleMessagesInputBuilder()
294                     .setNode(nodeRef)
295                     .setBundleId(bundleIdValue)
296                     .setFlags(BUNDLE_FLAGS)
297                     .setMessages(createMessages(nodeRef))
298                     .build();
299
300             ListenableFuture<RpcResult<ControlBundleOutput>> closeBundle = salBundleService
301                     .controlBundle(closeBundleInput);
302
303             ListenableFuture<RpcResult<ControlBundleOutput>> openBundleMessagesFuture = Futures
304                     .transformAsync(closeBundle, rpcResult -> salBundleService
305                             .controlBundle(openBundleInput), MoreExecutors.directExecutor());
306
307             ListenableFuture<RpcResult<AddBundleMessagesOutput>> addBundleMessagesFuture = Futures
308                     .transformAsync(openBundleMessagesFuture, rpcResult -> {
309                         if (rpcResult.isSuccessful()) {
310                             return salBundleService
311                                     .addBundleMessages(addBundleMessagesInput);
312                         }
313                         return FluentFutures.immediateNullFluentFuture();
314                     }, MoreExecutors.directExecutor());
315             Uint64 nodeId = getDpnIdFromNodeName(node);
316             try {
317                 if (addBundleMessagesFuture.get().isSuccessful()) {
318                     bundleIdMap.put(nodeId, new BundleDetails(bundleIdValue,FluentFutures.immediateNullFluentFuture()));
319                     LOG.debug("Arbitrator reconciliation initial task has been completed for node {} ", nodeId);
320                     return true;
321                 } else {
322                     LOG.error("Error while performing arbitrator reconciliation for device:{}", nodeId);
323                     return false;
324                 }
325             } catch (InterruptedException | ExecutionException e) {
326                 LOG.error("Error while performing arbitrator reconciliation for device:{}", nodeId, e);
327                 return false;
328             }
329         }
330     }
331
332     public final class CommitActiveBundleCallback implements FutureCallback<RpcResult<?>> {
333         private final Uint64 nodeId;
334
335         private CommitActiveBundleCallback(final Uint64 nodeId) {
336             this.nodeId = nodeId;
337         }
338
339         @Override
340         public void onSuccess(final RpcResult<?> rpcResult) {
341             LOG.debug("Completed arbitrator reconciliation for device:{}", nodeId);
342             bundleIdMap.remove(nodeId);
343         }
344
345         @Override
346         public void onFailure(final Throwable throwable) {
347             LOG.error("Error while performing arbitrator reconciliation for device {}", nodeId, throwable);
348         }
349     }
350
351     private static <D> Function<RpcResult<D>,
352             RpcResult<CommitActiveBundleOutput>> createRpcResultCondenser(final String action) {
353         return input -> {
354             final RpcResultBuilder<CommitActiveBundleOutput> resultSink;
355             if (input != null) {
356                 List<RpcError> errors = new ArrayList<>();
357                 if (!input.isSuccessful()) {
358                     errors.addAll(input.getErrors());
359                     resultSink = RpcResultBuilder.<CommitActiveBundleOutput>failed().withRpcErrors(errors);
360                 } else {
361                     resultSink = RpcResultBuilder.success();
362                 }
363             } else {
364                 resultSink = RpcResultBuilder.<CommitActiveBundleOutput>failed()
365                         .withError(ErrorType.APPLICATION, "action of " + action + " failed");
366             }
367             return resultSink.build();
368         };
369     }
370
371     private void registerRpc(final DeviceInfo node) {
372         KeyedInstanceIdentifier<Node, NodeKey> path = InstanceIdentifier.create(Nodes.class)
373                 .child(Node.class, new NodeKey(node.getNodeId()));
374         LOG.debug("The path is registered : {}", path);
375         ObjectRegistration<? extends RpcService> rpcRegistration =
376                 rpcProviderService.registerRpcImplementation(ArbitratorReconcileService.class, this, Set.of(path));
377         rpcRegistrations.put(node.getNodeId().getValue(), rpcRegistration);
378     }
379
380     private void deregisterRpc(final DeviceInfo node) {
381         KeyedInstanceIdentifier<Node, NodeKey> path = InstanceIdentifier.create(Nodes.class)
382                 .child(Node.class, new NodeKey(node.getNodeId()));
383         LOG.debug("The path is unregistered : {}", path);
384         ObjectRegistration<? extends RpcService> rpcRegistration = rpcRegistrations.get(node.getNodeId().getValue());
385         if (rpcRegistration != null) {
386             rpcRegistration.close();
387             rpcRegistrations.remove(node.getNodeId().getValue());
388         }
389     }
390
391     private static class BundleDetails {
392         private final BundleId bundleId;
393         private final ListenableFuture<RpcResult<ControlBundleOutput>> result;
394
395         BundleDetails(final BundleId bundleId, final ListenableFuture<RpcResult<ControlBundleOutput>> result) {
396             this.bundleId = bundleId;
397             this.result = result;
398         }
399
400         public BundleId getBundleId() {
401             return bundleId;
402         }
403
404         public ListenableFuture<RpcResult<ControlBundleOutput>> getResult() {
405             return result;
406         }
407     }
408
409     private static Uint64 getDpnIdFromNodeName(final String nodeName) {
410         String dpnId = nodeName.substring(nodeName.lastIndexOf(SEPARATOR) + 1);
411         return Uint64.valueOf(dpnId);
412     }
413 }