be8168f19ba32664aceeb8affb806fae353e5b72
[openflowplugin.git] / applications / arbitratorreconciliation / impl / src / main / java / org / opendaylight / openflowplugin / applications / arbitratorreconciliation / impl / ArbitratorReconciliationManagerImpl.java
1 /*
2  * Copyright (c) 2018 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.applications.arbitratorreconciliation.impl;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.base.Function;
13 import com.google.common.base.Preconditions;
14 import com.google.common.collect.ImmutableSet;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.ListeningExecutorService;
19 import com.google.common.util.concurrent.MoreExecutors;
20 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
21 import java.util.ArrayList;
22 import java.util.Collections;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.concurrent.Callable;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.ConcurrentMap;
28 import java.util.concurrent.ExecutionException;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.atomic.AtomicLong;
31 import javax.annotation.PostConstruct;
32 import javax.annotation.PreDestroy;
33 import javax.inject.Inject;
34 import javax.inject.Singleton;
35 import org.apache.aries.blueprint.annotation.service.Reference;
36 import org.opendaylight.mdsal.binding.api.RpcConsumerRegistry;
37 import org.opendaylight.mdsal.binding.api.RpcProviderService;
38 import org.opendaylight.openflowplugin.api.OFConstants;
39 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
40 import org.opendaylight.openflowplugin.applications.reconciliation.NotificationRegistration;
41 import org.opendaylight.openflowplugin.applications.reconciliation.ReconciliationManager;
42 import org.opendaylight.openflowplugin.applications.reconciliation.ReconciliationNotificationListener;
43 import org.opendaylight.serviceutils.upgrade.UpgradeState;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupId;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupTypes;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupBuilder;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesInput;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesInputBuilder;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesOutput;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundleInput;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundleInputBuilder;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundleOutput;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.SalBundleService;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.Messages;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.MessagesBuilder;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.Message;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.MessageBuilder;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveFlowCase;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveFlowCaseBuilder;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveGroupCase;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveGroupCaseBuilder;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.remove.flow._case.RemoveFlowCaseDataBuilder;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.remove.group._case.RemoveGroupCaseDataBuilder;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleControlType;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleFlags;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleId;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.ArbitratorReconcileService;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.CommitActiveBundleInput;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.CommitActiveBundleOutput;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.CommitActiveBundleOutputBuilder;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleInput;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleOutput;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleOutputBuilder;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.rf.state.rev170713.ResultState;
80 import org.opendaylight.yangtools.concepts.ObjectRegistration;
81 import org.opendaylight.yangtools.util.concurrent.FluentFutures;
82 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
83 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
84 import org.opendaylight.yangtools.yang.binding.RpcService;
85 import org.opendaylight.yangtools.yang.common.RpcError;
86 import org.opendaylight.yangtools.yang.common.RpcResult;
87 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
88 import org.opendaylight.yangtools.yang.common.Uint32;
89 import org.opendaylight.yangtools.yang.common.Uint64;
90 import org.slf4j.Logger;
91 import org.slf4j.LoggerFactory;
92
93 @Singleton
94 public class ArbitratorReconciliationManagerImpl implements ArbitratorReconcileService,
95         ReconciliationNotificationListener, AutoCloseable {
96
97     private static final Logger LOG = LoggerFactory.getLogger(ArbitratorReconciliationManagerImpl.class);
98     private static final int THREAD_POOL_SIZE = 4;
99     private static final AtomicLong BUNDLE_ID = new AtomicLong();
100     private static final BundleFlags BUNDLE_FLAGS = new BundleFlags(true, true);
101     private static final int ARBITRATOR_RECONCILIATION_PRIORITY = Integer
102             .getInteger("arbitrator.reconciliation.manager.priority", 0/*default*/);
103     private static final String SERVICE_NAME = "ArbitratorReconciliationManager";
104     private static final String SEPARATOR = ":";
105
106     private static final BundleRemoveFlowCase DELETE_ALL_FLOW = new BundleRemoveFlowCaseBuilder()
107             .setRemoveFlowCaseData(new RemoveFlowCaseDataBuilder()
108                 .setTableId(OFConstants.OFPTT_ALL)
109                 .build())
110             .build();
111     private static final BundleRemoveGroupCase DELETE_ALL_GROUP = new BundleRemoveGroupCaseBuilder()
112             .setRemoveGroupCaseData(new RemoveGroupCaseDataBuilder(new GroupBuilder()
113                     .setGroupType(GroupTypes.GroupAll)
114                     .setGroupId(new GroupId(OFConstants.OFPG_ALL))
115                     .build()).build())
116             .build();
117
118     private final SalBundleService salBundleService;
119     private final ReconciliationManager reconciliationManager;
120     private final RpcProviderService rpcProviderService;
121     private final UpgradeState upgradeState;
122     private NotificationRegistration registration;
123     private final ListeningExecutorService executor = MoreExecutors.listeningDecorator(
124             Executors.newFixedThreadPool(THREAD_POOL_SIZE));
125     private final Map<Uint64, BundleDetails> bundleIdMap = new ConcurrentHashMap<>();
126     private final ConcurrentMap<String,
127             ObjectRegistration<? extends RpcService>> rpcRegistrations = new ConcurrentHashMap<>();
128
129     @Inject
130     public ArbitratorReconciliationManagerImpl(@Reference final ReconciliationManager reconciliationManager,
131             @Reference final RpcProviderService rpcProviderService, @Reference final RpcConsumerRegistry rpcRegistry,
132             @Reference final UpgradeState upgradeState) {
133         Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry cannot be null !");
134         this.reconciliationManager = requireNonNull(reconciliationManager, "ReconciliationManager cannot be null!");
135         this.salBundleService = requireNonNull(rpcRegistry.getRpcService(SalBundleService.class),
136                 "RPC SalBundleService not found.");
137         this.rpcProviderService = rpcProviderService;
138         this.upgradeState = requireNonNull(upgradeState, "UpgradeState cannot be null!");
139     }
140
141     @PostConstruct
142     public void start() {
143         registration = reconciliationManager.registerService(this);
144         LOG.info("ArbitratorReconciliationManager has started successfully.");
145     }
146
147     @Override
148     @PreDestroy
149     public void close() throws Exception {
150         executor.shutdown();
151         if (registration != null) {
152             registration.close();
153             registration = null;
154         }
155     }
156
157     @Override
158     public ListenableFuture<RpcResult<CommitActiveBundleOutput>> commitActiveBundle(
159             final CommitActiveBundleInput input) {
160         Uint64 nodeId = input.getNodeId();
161         if (bundleIdMap.containsKey(nodeId)) {
162             BundleId bundleId = bundleIdMap.get(nodeId).getBundleId();
163             if (bundleId != null) {
164                 final ControlBundleInput commitBundleInput = new ControlBundleInputBuilder()
165                         .setNode(input.getNode())
166                         .setBundleId(bundleId)
167                         .setFlags(BUNDLE_FLAGS)
168                         .setType(BundleControlType.ONFBCTCOMMITREQUEST)
169                         .build();
170                 ListenableFuture<RpcResult<ControlBundleOutput>> rpcResult = salBundleService
171                         .controlBundle(commitBundleInput);
172                 bundleIdMap.put(nodeId, new BundleDetails(bundleId, rpcResult));
173
174                 Futures.addCallback(rpcResult,
175                         new CommitActiveBundleCallback(nodeId),
176                         MoreExecutors.directExecutor());
177                 return Futures.transform(
178                         rpcResult,
179                         createRpcResultCondenser("committed active bundle"),
180                         MoreExecutors.directExecutor());
181             }
182         }
183         return RpcResultBuilder.success(new CommitActiveBundleOutputBuilder()
184                 .setResult(null).build())
185                 .withRpcErrors(Collections.singleton(RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION,
186                         null, "No active bundle found for the node" + nodeId.toString()))).buildFuture();
187     }
188
189     @Override
190     public ListenableFuture<RpcResult<GetActiveBundleOutput>> getActiveBundle(final GetActiveBundleInput input) {
191         Uint64 nodeId = input.getNodeId();
192         BundleDetails bundleDetails = bundleIdMap.get(nodeId);
193         if (bundleDetails != null) {
194             try {
195                 //This blocking call is used to prevent the applications from pushing flows and groups via the default
196                 // pipeline when the commit bundle is ongoing.
197                 bundleDetails.getResult().get();
198                 return RpcResultBuilder.success(new GetActiveBundleOutputBuilder()
199                         .setResult(bundleDetails.getBundleId())
200                         .build())
201                         .buildFuture();
202             } catch (InterruptedException | ExecutionException | NullPointerException e) {
203                 return RpcResultBuilder.<GetActiveBundleOutput>failed()
204                         .withRpcErrors(Collections.singleton(RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION,
205                                 null, e.getMessage()))).buildFuture();
206             }
207         }
208         return RpcResultBuilder.success(new GetActiveBundleOutputBuilder()
209                 .setResult(null).build()).buildFuture();
210     }
211
212     @Override
213     public ListenableFuture<Boolean> startReconciliation(final DeviceInfo node) {
214         registerRpc(node);
215         if (upgradeState.isUpgradeInProgress()) {
216             LOG.trace("Starting arbitrator reconciliation for node {}", node.getDatapathId());
217             return reconcileConfiguration(node);
218         }
219         LOG.trace("arbitrator reconciliation is disabled");
220         return FluentFutures.immediateTrueFluentFuture();
221     }
222
223     @Override
224     public ListenableFuture<Boolean> endReconciliation(final DeviceInfo node) {
225         Uint64 datapathId = node.getDatapathId();
226         LOG.trace("Stopping arbitrator reconciliation for node {}", datapathId);
227         bundleIdMap.remove(datapathId);
228         deregisterRpc(node);
229         return FluentFutures.immediateTrueFluentFuture();
230     }
231
232     @Override
233     public int getPriority() {
234         return ARBITRATOR_RECONCILIATION_PRIORITY;
235     }
236
237     @Override
238     public String getName() {
239         return SERVICE_NAME;
240     }
241
242     @Override
243     public ResultState getResultState() {
244         return ResultState.DONOTHING;
245     }
246
247     private ListenableFuture<Boolean> reconcileConfiguration(final DeviceInfo node) {
248         LOG.info("Triggering arbitrator reconciliation for device {}", node.getDatapathId());
249         ArbitratorReconciliationTask upgradeReconTask = new ArbitratorReconciliationTask(node);
250         return executor.submit(upgradeReconTask);
251     }
252
253     @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
254             justification = "https://github.com/spotbugs/spotbugs/issues/811")
255     private static Messages createMessages(final NodeRef nodeRef) {
256         final List<Message> messages = new ArrayList<>();
257         messages.add(new MessageBuilder()
258                 .setNode(nodeRef)
259                 .setBundleInnerMessage(DELETE_ALL_FLOW).build());
260         messages.add(new MessageBuilder()
261                 .setNode(nodeRef)
262                 .setBundleInnerMessage(DELETE_ALL_GROUP).build());
263         LOG.debug("The size of the flows and group messages created in createMessage() {}", messages.size());
264         return new MessagesBuilder().setMessage(messages).build();
265     }
266
267     private class ArbitratorReconciliationTask implements Callable<Boolean> {
268         final DeviceInfo deviceInfo;
269
270         ArbitratorReconciliationTask(final DeviceInfo deviceInfo) {
271             this.deviceInfo = deviceInfo;
272         }
273
274         @Override
275         public Boolean call() {
276             InstanceIdentifier<FlowCapableNode> nodeIdentity = deviceInfo.getNodeInstanceIdentifier()
277                     .augmentation(FlowCapableNode.class);
278             String node = nodeIdentity.firstKeyOf(Node.class).getId().getValue();
279             BundleId bundleIdValue = new BundleId(Uint32.valueOf(BUNDLE_ID.getAndIncrement()));
280             LOG.debug("Triggering arbitrator reconciliation for device :{}", node);
281             final NodeRef nodeRef = new NodeRef(nodeIdentity.firstIdentifierOf(Node.class));
282
283             final ControlBundleInput closeBundleInput = new ControlBundleInputBuilder()
284                     .setNode(nodeRef)
285                     .setBundleId(bundleIdValue)
286                     .setFlags(BUNDLE_FLAGS)
287                     .setType(BundleControlType.ONFBCTCLOSEREQUEST)
288                     .build();
289
290             final ControlBundleInput openBundleInput = new ControlBundleInputBuilder()
291                     .setNode(nodeRef)
292                     .setBundleId(bundleIdValue)
293                     .setFlags(BUNDLE_FLAGS)
294                     .setType(BundleControlType.ONFBCTOPENREQUEST)
295                     .build();
296
297             final AddBundleMessagesInput addBundleMessagesInput = new AddBundleMessagesInputBuilder()
298                     .setNode(nodeRef)
299                     .setBundleId(bundleIdValue)
300                     .setFlags(BUNDLE_FLAGS)
301                     .setMessages(createMessages(nodeRef))
302                     .build();
303
304             ListenableFuture<RpcResult<ControlBundleOutput>> closeBundle = salBundleService
305                     .controlBundle(closeBundleInput);
306
307             ListenableFuture<RpcResult<ControlBundleOutput>> openBundleMessagesFuture = Futures
308                     .transformAsync(closeBundle, rpcResult -> salBundleService
309                             .controlBundle(openBundleInput), MoreExecutors.directExecutor());
310
311             ListenableFuture<RpcResult<AddBundleMessagesOutput>> addBundleMessagesFuture = Futures
312                     .transformAsync(openBundleMessagesFuture, rpcResult -> {
313                         if (rpcResult.isSuccessful()) {
314                             return salBundleService
315                                     .addBundleMessages(addBundleMessagesInput);
316                         }
317                         return FluentFutures.immediateNullFluentFuture();
318                     }, MoreExecutors.directExecutor());
319             Uint64 nodeId = getDpnIdFromNodeName(node);
320             try {
321                 if (addBundleMessagesFuture.get().isSuccessful()) {
322                     bundleIdMap.put(nodeId, new BundleDetails(bundleIdValue,FluentFutures.immediateNullFluentFuture()));
323                     LOG.debug("Arbitrator reconciliation initial task has been completed for node {} ", nodeId);
324                     return true;
325                 } else {
326                     LOG.error("Error while performing arbitrator reconciliation for device:{}", nodeId);
327                     return false;
328                 }
329             } catch (InterruptedException | ExecutionException e) {
330                 LOG.error("Error while performing arbitrator reconciliation for device:{}", nodeId, e);
331                 return false;
332             }
333         }
334     }
335
336     public final class CommitActiveBundleCallback implements FutureCallback<RpcResult<?>> {
337         private final Uint64 nodeId;
338
339         private CommitActiveBundleCallback(final Uint64 nodeId) {
340             this.nodeId = nodeId;
341         }
342
343         @Override
344         public void onSuccess(final RpcResult<?> rpcResult) {
345             LOG.debug("Completed arbitrator reconciliation for device:{}", nodeId);
346             bundleIdMap.remove(nodeId);
347         }
348
349         @Override
350         public void onFailure(final Throwable throwable) {
351             LOG.error("Error while performing arbitrator reconciliation for device {}", nodeId, throwable);
352         }
353     }
354
355     private static <D> Function<RpcResult<D>,
356             RpcResult<CommitActiveBundleOutput>> createRpcResultCondenser(final String action) {
357         return input -> {
358             final RpcResultBuilder<CommitActiveBundleOutput> resultSink;
359             if (input != null) {
360                 List<RpcError> errors = new ArrayList<>();
361                 if (!input.isSuccessful()) {
362                     errors.addAll(input.getErrors());
363                     resultSink = RpcResultBuilder.<CommitActiveBundleOutput>failed().withRpcErrors(errors);
364                 } else {
365                     resultSink = RpcResultBuilder.success();
366                 }
367             } else {
368                 resultSink = RpcResultBuilder.<CommitActiveBundleOutput>failed()
369                         .withError(RpcError.ErrorType.APPLICATION, "action of " + action + " failed");
370             }
371             return resultSink.build();
372         };
373     }
374
375     private void registerRpc(final DeviceInfo node) {
376         KeyedInstanceIdentifier<Node, NodeKey> path = InstanceIdentifier.create(Nodes.class)
377                 .child(Node.class, new NodeKey(node.getNodeId()));
378         LOG.debug("The path is registered : {}", path);
379         ObjectRegistration<? extends RpcService> rpcRegistration =
380                 rpcProviderService.registerRpcImplementation(ArbitratorReconcileService.class,
381                         this, ImmutableSet.of(path));
382         rpcRegistrations.put(node.getNodeId().getValue(), rpcRegistration);
383     }
384
385     private void deregisterRpc(final DeviceInfo node) {
386         KeyedInstanceIdentifier<Node, NodeKey> path = InstanceIdentifier.create(Nodes.class)
387                 .child(Node.class, new NodeKey(node.getNodeId()));
388         LOG.debug("The path is unregistered : {}", path);
389         ObjectRegistration<? extends RpcService> rpcRegistration = rpcRegistrations.get(node.getNodeId().getValue());
390         if (rpcRegistration != null) {
391             rpcRegistration.close();
392             rpcRegistrations.remove(node.getNodeId().getValue());
393         }
394     }
395
396     private static class BundleDetails {
397         private final BundleId bundleId;
398         private final ListenableFuture<RpcResult<ControlBundleOutput>> result;
399
400         BundleDetails(final BundleId bundleId, final ListenableFuture<RpcResult<ControlBundleOutput>> result) {
401             this.bundleId = bundleId;
402             this.result = result;
403         }
404
405         public BundleId getBundleId() {
406             return bundleId;
407         }
408
409         public ListenableFuture<RpcResult<ControlBundleOutput>> getResult() {
410             return result;
411         }
412     }
413
414     @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
415             justification = "https://github.com/spotbugs/spotbugs/issues/811")
416     private static Uint64 getDpnIdFromNodeName(final String nodeName) {
417         String dpnId = nodeName.substring(nodeName.lastIndexOf(SEPARATOR) + 1);
418         return Uint64.valueOf(dpnId);
419     }
420 }