Convert arbitratorreconciliation to OSGi DS
[openflowplugin.git] / applications / arbitratorreconciliation / impl / src / main / java / org / opendaylight / openflowplugin / applications / arbitratorreconciliation / impl / ArbitratorReconciliationManagerImpl.java
1 /*
2  * Copyright (c) 2018 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.applications.arbitratorreconciliation.impl;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.base.Function;
13 import com.google.common.collect.ImmutableClassToInstanceMap;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import java.util.ArrayList;
19 import java.util.List;
20 import java.util.Map;
21 import java.util.Set;
22 import java.util.concurrent.Callable;
23 import java.util.concurrent.ConcurrentHashMap;
24 import java.util.concurrent.ConcurrentMap;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.atomic.AtomicLong;
29 import javax.annotation.PreDestroy;
30 import javax.inject.Inject;
31 import javax.inject.Singleton;
32 import org.eclipse.jdt.annotation.NonNullByDefault;
33 import org.opendaylight.mdsal.binding.api.RpcConsumerRegistry;
34 import org.opendaylight.mdsal.binding.api.RpcProviderService;
35 import org.opendaylight.openflowplugin.api.OFConstants;
36 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
37 import org.opendaylight.openflowplugin.applications.reconciliation.NotificationRegistration;
38 import org.opendaylight.openflowplugin.applications.reconciliation.ReconciliationManager;
39 import org.opendaylight.openflowplugin.applications.reconciliation.ReconciliationNotificationListener;
40 import org.opendaylight.serviceutils.upgrade.UpgradeState;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupId;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupTypes;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupBuilder;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessages;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesInputBuilder;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundle;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundleInputBuilder;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundleOutput;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.Messages;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.MessagesBuilder;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.MessageBuilder;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveFlowCase;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveFlowCaseBuilder;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveGroupCase;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveGroupCaseBuilder;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.remove.flow._case.RemoveFlowCaseDataBuilder;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.remove.group._case.RemoveGroupCaseDataBuilder;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleControlType;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleFlags;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleId;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.CommitActiveBundle;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.CommitActiveBundleInput;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.CommitActiveBundleOutput;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.CommitActiveBundleOutputBuilder;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundle;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleInput;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleOutput;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleOutputBuilder;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.rf.state.rev170713.ResultState;
75 import org.opendaylight.yangtools.concepts.Registration;
76 import org.opendaylight.yangtools.util.concurrent.FluentFutures;
77 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
78 import org.opendaylight.yangtools.yang.binding.Rpc;
79 import org.opendaylight.yangtools.yang.common.ErrorType;
80 import org.opendaylight.yangtools.yang.common.RpcError;
81 import org.opendaylight.yangtools.yang.common.RpcResult;
82 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
83 import org.opendaylight.yangtools.yang.common.Uint32;
84 import org.opendaylight.yangtools.yang.common.Uint64;
85 import org.osgi.service.component.annotations.Activate;
86 import org.osgi.service.component.annotations.Component;
87 import org.osgi.service.component.annotations.Deactivate;
88 import org.osgi.service.component.annotations.Reference;
89 import org.slf4j.Logger;
90 import org.slf4j.LoggerFactory;
91
92 @Singleton
93 @Component(service = { })
94 public final class ArbitratorReconciliationManagerImpl implements ReconciliationNotificationListener, AutoCloseable {
95     private static final Logger LOG = LoggerFactory.getLogger(ArbitratorReconciliationManagerImpl.class);
96     private static final AtomicLong BUNDLE_ID = new AtomicLong();
97     private static final BundleFlags BUNDLE_FLAGS = new BundleFlags(true, true);
98     private static final String SERVICE_NAME = "ArbitratorReconciliationManager";
99     private static final String SEPARATOR = ":";
100
101     private static final BundleRemoveFlowCase DELETE_ALL_FLOW = new BundleRemoveFlowCaseBuilder()
102         .setRemoveFlowCaseData(new RemoveFlowCaseDataBuilder()
103             .setTableId(OFConstants.OFPTT_ALL)
104             .build())
105         .build();
106     private static final BundleRemoveGroupCase DELETE_ALL_GROUP = new BundleRemoveGroupCaseBuilder()
107         .setRemoveGroupCaseData(new RemoveGroupCaseDataBuilder(new GroupBuilder()
108             .setGroupType(GroupTypes.GroupAll)
109             .setGroupId(new GroupId(OFConstants.OFPG_ALL))
110             .build()).build())
111         .build();
112
113     // FIXME: use CM to control this constant
114     private static final int THREAD_POOL_SIZE = 4;
115     // FIXME: use CM to control this constant
116     private static final int ARBITRATOR_RECONCILIATION_PRIORITY =
117         Integer.getInteger("arbitrator.reconciliation.manager.priority", 0 /*default*/);
118
119     private final ExecutorService executor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
120     private final Map<Uint64, BundleDetails> bundleIdMap = new ConcurrentHashMap<>();
121     private final ConcurrentMap<String, Registration> rpcRegistrations = new ConcurrentHashMap<>();
122
123     private final RpcProviderService rpcProviderService;
124     private final UpgradeState upgradeState;
125     private final NotificationRegistration registration;
126     private final AddBundleMessages addBundleMessages;
127     private final ControlBundle controlBundle;
128
129     @Inject
130     @Activate
131     public ArbitratorReconciliationManagerImpl(@Reference final ReconciliationManager reconciliationManager,
132             @Reference final RpcProviderService rpcProviderService, @Reference final RpcConsumerRegistry rpcService,
133             @Reference final UpgradeState upgradeState) {
134         this.rpcProviderService = requireNonNull(rpcProviderService);
135         this.upgradeState = requireNonNull(upgradeState);
136         addBundleMessages = requireNonNull(rpcService.getRpc(AddBundleMessages.class));
137         controlBundle = requireNonNull(rpcService.getRpc(ControlBundle.class));
138         registration = reconciliationManager.registerService(this);
139         LOG.info("ArbitratorReconciliationManager has started successfully.");
140     }
141
142     @Deactivate
143     @PreDestroy
144     @Override
145     public void close() throws Exception {
146         executor.shutdown();
147         registration.close();
148     }
149
150     private ListenableFuture<RpcResult<CommitActiveBundleOutput>> commitActiveBundle(
151             final CommitActiveBundleInput input) {
152         final var nodeId = input.getNodeId();
153         final var details = bundleIdMap.get(nodeId);
154         if (details != null) {
155             final var rpcResult = controlBundle.invoke(new ControlBundleInputBuilder()
156                 .setNode(input.getNode())
157                 .setBundleId(details.bundleId)
158                 .setFlags(BUNDLE_FLAGS)
159                 .setType(BundleControlType.ONFBCTCOMMITREQUEST)
160                 .build());
161             bundleIdMap.put(nodeId, new BundleDetails(details.bundleId, rpcResult));
162
163             Futures.addCallback(rpcResult, new CommitActiveBundleCallback(nodeId), MoreExecutors.directExecutor());
164             return Futures.transform(rpcResult, createRpcResultCondenser("committed active bundle"),
165                 MoreExecutors.directExecutor());
166         }
167         return RpcResultBuilder.success(new CommitActiveBundleOutputBuilder()
168                 .setResult(null).build())
169                 .withRpcErrors(List.of(RpcResultBuilder.newError(ErrorType.APPLICATION,
170                         null, "No active bundle found for the node" + nodeId))).buildFuture();
171     }
172
173     private ListenableFuture<RpcResult<GetActiveBundleOutput>> getActiveBundle(final GetActiveBundleInput input) {
174         Uint64 nodeId = input.getNodeId();
175         BundleDetails bundleDetails = bundleIdMap.get(nodeId);
176         if (bundleDetails != null) {
177             try {
178                 //This blocking call is used to prevent the applications from pushing flows and groups via the default
179                 // pipeline when the commit bundle is ongoing.
180                 bundleDetails.result.get();
181                 return RpcResultBuilder.success(new GetActiveBundleOutputBuilder()
182                         .setResult(bundleDetails.bundleId)
183                         .build())
184                         .buildFuture();
185             } catch (InterruptedException | ExecutionException e) {
186                 return RpcResultBuilder.<GetActiveBundleOutput>failed()
187                         .withRpcErrors(List.of(RpcResultBuilder.newError(ErrorType.APPLICATION,
188                                 null, e.getMessage()))).buildFuture();
189             }
190         }
191         return RpcResultBuilder.success(new GetActiveBundleOutputBuilder().setResult(null).build()).buildFuture();
192     }
193
194     @Override
195     public ListenableFuture<Boolean> startReconciliation(final DeviceInfo node) {
196         registerRpc(node);
197         if (upgradeState.isUpgradeInProgress()) {
198             LOG.trace("Starting arbitrator reconciliation for node {}", node.getDatapathId());
199             return reconcileConfiguration(node);
200         }
201         LOG.trace("arbitrator reconciliation is disabled");
202         return FluentFutures.immediateTrueFluentFuture();
203     }
204
205     @Override
206     public ListenableFuture<Boolean> endReconciliation(final DeviceInfo node) {
207         Uint64 datapathId = node.getDatapathId();
208         LOG.trace("Stopping arbitrator reconciliation for node {}", datapathId);
209         bundleIdMap.remove(datapathId);
210         deregisterRpc(node);
211         return FluentFutures.immediateTrueFluentFuture();
212     }
213
214     @Override
215     public int getPriority() {
216         return ARBITRATOR_RECONCILIATION_PRIORITY;
217     }
218
219     @Override
220     public String getName() {
221         return SERVICE_NAME;
222     }
223
224     @Override
225     public ResultState getResultState() {
226         return ResultState.DONOTHING;
227     }
228
229
230     private static Messages createMessages(final NodeRef nodeRef) {
231         final var messages = List.of(
232             new MessageBuilder().setNode(nodeRef).setBundleInnerMessage(DELETE_ALL_FLOW).build(),
233             new MessageBuilder().setNode(nodeRef).setBundleInnerMessage(DELETE_ALL_GROUP).build());
234         LOG.debug("The size of the flows and group messages created in createMessage() {}", messages.size());
235         return new MessagesBuilder().setMessage(messages).build();
236     }
237
238     private ListenableFuture<Boolean> reconcileConfiguration(final DeviceInfo node) {
239         LOG.info("Triggering arbitrator reconciliation for device {}", node.getDatapathId());
240         return Futures.submit(new ArbitratorReconciliationTask(node), executor);
241     }
242
243     private final class ArbitratorReconciliationTask implements Callable<Boolean> {
244         private final DeviceInfo deviceInfo;
245
246         ArbitratorReconciliationTask(final DeviceInfo deviceInfo) {
247             this.deviceInfo = requireNonNull(deviceInfo);
248         }
249
250         @Override
251         public Boolean call() {
252             InstanceIdentifier<FlowCapableNode> nodeIdentity = deviceInfo.getNodeInstanceIdentifier()
253                     .augmentation(FlowCapableNode.class);
254             String node = nodeIdentity.firstKeyOf(Node.class).getId().getValue();
255             final var bundleIdValue = new BundleId(Uint32.valueOf(BUNDLE_ID.getAndIncrement()));
256             LOG.debug("Triggering arbitrator reconciliation for device :{}", node);
257             final NodeRef nodeRef = new NodeRef(nodeIdentity.firstIdentifierOf(Node.class));
258
259             final var openBundleMessagesFuture = Futures.transformAsync(
260                 controlBundle.invoke(new ControlBundleInputBuilder()
261                     .setNode(nodeRef)
262                     .setBundleId(bundleIdValue)
263                     .setFlags(BUNDLE_FLAGS)
264                     .setType(BundleControlType.ONFBCTCLOSEREQUEST)
265                     .build()),
266                 rpcResult -> controlBundle.invoke(new ControlBundleInputBuilder()
267                     .setNode(nodeRef)
268                     .setBundleId(bundleIdValue)
269                     .setFlags(BUNDLE_FLAGS)
270                     .setType(BundleControlType.ONFBCTOPENREQUEST)
271                     .build()), MoreExecutors.directExecutor());
272
273             final var addBundleMessagesFuture = Futures.transformAsync(openBundleMessagesFuture,
274                 rpcResult -> rpcResult.isSuccessful()
275                     ? addBundleMessages.invoke(new AddBundleMessagesInputBuilder()
276                         .setNode(nodeRef)
277                         .setBundleId(bundleIdValue)
278                         .setFlags(BUNDLE_FLAGS)
279                         .setMessages(createMessages(nodeRef))
280                         .build())
281                     : FluentFutures.immediateNullFluentFuture(), MoreExecutors.directExecutor());
282             final var nodeId = getDpnIdFromNodeName(node);
283             try {
284                 if (addBundleMessagesFuture.get().isSuccessful()) {
285                     bundleIdMap.put(nodeId, new BundleDetails(bundleIdValue,
286                         FluentFutures.immediateNullFluentFuture()));
287                     LOG.debug("Arbitrator reconciliation initial task has been completed for node {} ", nodeId);
288                     return true;
289                 } else {
290                     LOG.error("Error while performing arbitrator reconciliation for device:{}", nodeId);
291                     return false;
292                 }
293             } catch (InterruptedException | ExecutionException e) {
294                 LOG.error("Error while performing arbitrator reconciliation for device:{}", nodeId, e);
295                 return false;
296             }
297         }
298     }
299
300     public final class CommitActiveBundleCallback implements FutureCallback<RpcResult<?>> {
301         private final Uint64 nodeId;
302
303         private CommitActiveBundleCallback(final Uint64 nodeId) {
304             this.nodeId = nodeId;
305         }
306
307         @Override
308         public void onSuccess(final RpcResult<?> rpcResult) {
309             LOG.debug("Completed arbitrator reconciliation for device:{}", nodeId);
310             bundleIdMap.remove(nodeId);
311         }
312
313         @Override
314         public void onFailure(final Throwable throwable) {
315             LOG.error("Error while performing arbitrator reconciliation for device {}", nodeId, throwable);
316         }
317     }
318
319     private static <D> Function<RpcResult<D>, RpcResult<CommitActiveBundleOutput>> createRpcResultCondenser(
320             final String action) {
321         return input -> {
322             final RpcResultBuilder<CommitActiveBundleOutput> resultSink;
323             if (input != null) {
324                 final var errors = new ArrayList<RpcError>();
325                 if (!input.isSuccessful()) {
326                     errors.addAll(input.getErrors());
327                     resultSink = RpcResultBuilder.<CommitActiveBundleOutput>failed().withRpcErrors(errors);
328                 } else {
329                     resultSink = RpcResultBuilder.success();
330                 }
331             } else {
332                 resultSink = RpcResultBuilder.<CommitActiveBundleOutput>failed()
333                         .withError(ErrorType.APPLICATION, "action of " + action + " failed");
334             }
335             return resultSink.build();
336         };
337     }
338
339     private void registerRpc(final DeviceInfo node) {
340         final var path = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(node.getNodeId()));
341         LOG.debug("The path is registered : {}", path);
342         rpcRegistrations.put(node.getNodeId().getValue(), rpcProviderService.registerRpcImplementations(
343             ImmutableClassToInstanceMap.<Rpc<?, ?>>builder()
344                 .put(GetActiveBundle.class, this::getActiveBundle)
345                 .put(CommitActiveBundle.class, this::commitActiveBundle)
346                 .build(), Set.of(path)));
347     }
348
349     private void deregisterRpc(final DeviceInfo node) {
350         final var path = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(node.getNodeId()));
351         LOG.debug("The path is unregistered : {}", path);
352         final var reg = rpcRegistrations.remove(node.getNodeId().getValue());
353         if (reg != null) {
354             reg.close();
355         }
356     }
357
358     @NonNullByDefault
359     record BundleDetails(BundleId bundleId, ListenableFuture<RpcResult<ControlBundleOutput>> result) {
360         BundleDetails {
361             requireNonNull(bundleId);
362             requireNonNull(result);
363         }
364     }
365
366     private static Uint64 getDpnIdFromNodeName(final String nodeName) {
367         String dpnId = nodeName.substring(nodeName.lastIndexOf(SEPARATOR) + 1);
368         return Uint64.valueOf(dpnId);
369     }
370 }