2 * Copyright (c) 2018 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.applications.arbitratorreconciliation.impl;
11 import com.google.common.base.Function;
12 import com.google.common.base.Preconditions;
13 import com.google.common.collect.ImmutableSet;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.ListeningExecutorService;
18 import com.google.common.util.concurrent.MoreExecutors;
19 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
20 import java.util.ArrayList;
21 import java.util.Collections;
22 import java.util.List;
24 import java.util.concurrent.Callable;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.ConcurrentMap;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.Executors;
29 import java.util.concurrent.atomic.AtomicLong;
30 import javax.annotation.PostConstruct;
31 import javax.annotation.PreDestroy;
32 import javax.inject.Inject;
33 import javax.inject.Singleton;
34 import org.apache.aries.blueprint.annotation.service.Reference;
35 import org.opendaylight.mdsal.binding.api.RpcConsumerRegistry;
36 import org.opendaylight.mdsal.binding.api.RpcProviderService;
37 import org.opendaylight.openflowplugin.api.OFConstants;
38 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
39 import org.opendaylight.openflowplugin.applications.reconciliation.NotificationRegistration;
40 import org.opendaylight.openflowplugin.applications.reconciliation.ReconciliationManager;
41 import org.opendaylight.openflowplugin.applications.reconciliation.ReconciliationNotificationListener;
42 import org.opendaylight.serviceutils.upgrade.UpgradeState;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupId;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupTypes;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupBuilder;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesInput;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesInputBuilder;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesOutput;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundleInput;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundleInputBuilder;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundleOutput;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.SalBundleService;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.Messages;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.MessagesBuilder;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.Message;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.MessageBuilder;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveFlowCase;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveFlowCaseBuilder;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveGroupCase;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveGroupCaseBuilder;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.remove.flow._case.RemoveFlowCaseDataBuilder;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.remove.group._case.RemoveGroupCaseDataBuilder;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleControlType;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleFlags;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleId;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.ArbitratorReconcileService;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.CommitActiveBundleInput;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.CommitActiveBundleOutput;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.CommitActiveBundleOutputBuilder;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleInput;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleOutput;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleOutputBuilder;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.rf.state.rev170713.ResultState;
80 import org.opendaylight.yangtools.concepts.ObjectRegistration;
81 import org.opendaylight.yangtools.util.concurrent.FluentFutures;
82 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
83 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
84 import org.opendaylight.yangtools.yang.binding.RpcService;
85 import org.opendaylight.yangtools.yang.common.RpcError;
86 import org.opendaylight.yangtools.yang.common.RpcResult;
87 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
88 import org.opendaylight.yangtools.yang.common.Uint32;
89 import org.opendaylight.yangtools.yang.common.Uint64;
90 import org.slf4j.Logger;
91 import org.slf4j.LoggerFactory;
94 public class ArbitratorReconciliationManagerImpl implements ArbitratorReconcileService,
95 ReconciliationNotificationListener, AutoCloseable {
97 private static final Logger LOG = LoggerFactory.getLogger(ArbitratorReconciliationManagerImpl.class);
98 private static final int THREAD_POOL_SIZE = 4;
99 private static final AtomicLong BUNDLE_ID = new AtomicLong();
100 private static final BundleFlags BUNDLE_FLAGS = new BundleFlags(true, true);
101 private static final int ARBITRATOR_RECONCILIATION_PRIORITY = Integer
102 .getInteger("arbitrator.reconciliation.manager.priority", 0/*default*/);
103 private static final String SERVICE_NAME = "ArbitratorReconciliationManager";
104 private static final String SEPARATOR = ":";
106 private static final BundleRemoveFlowCase DELETE_ALL_FLOW = new BundleRemoveFlowCaseBuilder()
107 .setRemoveFlowCaseData(
108 new RemoveFlowCaseDataBuilder(new FlowBuilder().setTableId(OFConstants.OFPTT_ALL).build()).build())
110 private static final BundleRemoveGroupCase DELETE_ALL_GROUP = new BundleRemoveGroupCaseBuilder()
111 .setRemoveGroupCaseData(new RemoveGroupCaseDataBuilder(new GroupBuilder()
112 .setGroupType(GroupTypes.GroupAll)
113 .setGroupId(new GroupId(OFConstants.OFPG_ALL))
117 private final SalBundleService salBundleService;
118 private final ReconciliationManager reconciliationManager;
119 private final RpcProviderService rpcProviderService;
120 private final UpgradeState upgradeState;
121 private NotificationRegistration registration;
122 private final ListeningExecutorService executor = MoreExecutors.listeningDecorator(
123 Executors.newFixedThreadPool(THREAD_POOL_SIZE));
124 private final Map<Uint64, BundleDetails> bundleIdMap = new ConcurrentHashMap<>();
125 private final ConcurrentMap<String,
126 ObjectRegistration<? extends RpcService>> rpcRegistrations = new ConcurrentHashMap<>();
129 public ArbitratorReconciliationManagerImpl(
130 @Reference ReconciliationManager reconciliationManager, @Reference RpcProviderService rpcProviderService,
131 @Reference final RpcConsumerRegistry rpcRegistry, @Reference UpgradeState upgradeState) {
132 Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry cannot be null !");
133 this.reconciliationManager = Preconditions.checkNotNull(reconciliationManager,
134 "ReconciliationManager cannot be null!");
135 this.salBundleService = Preconditions.checkNotNull(rpcRegistry.getRpcService(SalBundleService.class),
136 "RPC SalBundleService not found.");
137 this.rpcProviderService = rpcProviderService;
138 this.upgradeState = Preconditions.checkNotNull(upgradeState, "UpgradeState cannot be null!");
142 public void start() {
143 registration = reconciliationManager.registerService(this);
144 LOG.info("ArbitratorReconciliationManager has started successfully.");
149 public void close() throws Exception {
151 if (registration != null) {
152 registration.close();
158 public ListenableFuture<RpcResult<CommitActiveBundleOutput>> commitActiveBundle(
159 CommitActiveBundleInput input) {
160 Uint64 nodeId = input.getNodeId();
161 if (bundleIdMap.containsKey(nodeId)) {
162 BundleId bundleId = bundleIdMap.get(nodeId).getBundleId();
163 if (bundleId != null) {
164 final ControlBundleInput commitBundleInput = new ControlBundleInputBuilder()
165 .setNode(input.getNode())
166 .setBundleId(bundleId)
167 .setFlags(BUNDLE_FLAGS)
168 .setType(BundleControlType.ONFBCTCOMMITREQUEST)
170 ListenableFuture<RpcResult<ControlBundleOutput>> rpcResult = salBundleService
171 .controlBundle(commitBundleInput);
172 bundleIdMap.put(nodeId, new BundleDetails(bundleId, rpcResult));
174 Futures.addCallback(rpcResult,
175 new CommitActiveBundleCallback(nodeId),
176 MoreExecutors.directExecutor());
177 return Futures.transform(
179 createRpcResultCondenser("committed active bundle"),
180 MoreExecutors.directExecutor());
183 return RpcResultBuilder.success(new CommitActiveBundleOutputBuilder()
184 .setResult(null).build())
185 .withRpcErrors(Collections.singleton(RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION,
186 null, "No active bundle found for the node" + nodeId.toString()))).buildFuture();
190 public ListenableFuture<RpcResult<GetActiveBundleOutput>> getActiveBundle(final GetActiveBundleInput input) {
191 Uint64 nodeId = input.getNodeId();
192 BundleDetails bundleDetails = bundleIdMap.get(nodeId);
193 if (bundleDetails != null) {
195 //This blocking call is used to prevent the applications from pushing flows and groups via the default
196 // pipeline when the commit bundle is ongoing.
197 bundleDetails.getResult().get();
198 return RpcResultBuilder.success(new GetActiveBundleOutputBuilder()
199 .setResult(bundleDetails.getBundleId())
202 } catch (InterruptedException | ExecutionException | NullPointerException e) {
203 return RpcResultBuilder.<GetActiveBundleOutput>failed()
204 .withRpcErrors(Collections.singleton(RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION,
205 null, e.getMessage()))).buildFuture();
208 return RpcResultBuilder.success(new GetActiveBundleOutputBuilder()
209 .setResult(null).build()).buildFuture();
213 public ListenableFuture<Boolean> startReconciliation(DeviceInfo node) {
215 if (upgradeState.isUpgradeInProgress()) {
216 LOG.trace("Starting arbitrator reconciliation for node {}", node.getDatapathId());
217 return reconcileConfiguration(node);
219 LOG.trace("arbitrator reconciliation is disabled");
220 return FluentFutures.immediateTrueFluentFuture();
224 public ListenableFuture<Boolean> endReconciliation(DeviceInfo node) {
225 Uint64 datapathId = node.getDatapathId();
226 LOG.trace("Stopping arbitrator reconciliation for node {}", datapathId);
227 bundleIdMap.remove(datapathId);
229 return FluentFutures.immediateTrueFluentFuture();
233 public int getPriority() {
234 return ARBITRATOR_RECONCILIATION_PRIORITY;
238 public String getName() {
243 public ResultState getResultState() {
244 return ResultState.DONOTHING;
247 private ListenableFuture<Boolean> reconcileConfiguration(DeviceInfo node) {
248 LOG.info("Triggering arbitrator reconciliation for device {}", node.getDatapathId());
249 ArbitratorReconciliationTask upgradeReconTask = new ArbitratorReconciliationTask(node);
250 return executor.submit(upgradeReconTask);
253 @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
254 justification = "https://github.com/spotbugs/spotbugs/issues/811")
255 private static Messages createMessages(final NodeRef nodeRef) {
256 final List<Message> messages = new ArrayList<>();
257 messages.add(new MessageBuilder()
259 .setBundleInnerMessage(DELETE_ALL_FLOW).build());
260 messages.add(new MessageBuilder()
262 .setBundleInnerMessage(DELETE_ALL_GROUP).build());
263 LOG.debug("The size of the flows and group messages created in createMessage() {}", messages.size());
264 return new MessagesBuilder().setMessage(messages).build();
267 private class ArbitratorReconciliationTask implements Callable<Boolean> {
268 final DeviceInfo deviceInfo;
270 ArbitratorReconciliationTask(final DeviceInfo deviceInfo) {
271 this.deviceInfo = deviceInfo;
275 public Boolean call() {
276 InstanceIdentifier<FlowCapableNode> nodeIdentity = deviceInfo.getNodeInstanceIdentifier()
277 .augmentation(FlowCapableNode.class);
278 String node = nodeIdentity.firstKeyOf(Node.class).getId().getValue();
279 BundleId bundleIdValue = new BundleId(Uint32.valueOf(BUNDLE_ID.getAndIncrement()));
280 LOG.debug("Triggering arbitrator reconciliation for device :{}", node);
281 final NodeRef nodeRef = new NodeRef(nodeIdentity.firstIdentifierOf(Node.class));
283 final ControlBundleInput closeBundleInput = new ControlBundleInputBuilder()
285 .setBundleId(bundleIdValue)
286 .setFlags(BUNDLE_FLAGS)
287 .setType(BundleControlType.ONFBCTCLOSEREQUEST)
290 final ControlBundleInput openBundleInput = new ControlBundleInputBuilder()
292 .setBundleId(bundleIdValue)
293 .setFlags(BUNDLE_FLAGS)
294 .setType(BundleControlType.ONFBCTOPENREQUEST)
297 final AddBundleMessagesInput addBundleMessagesInput = new AddBundleMessagesInputBuilder()
299 .setBundleId(bundleIdValue)
300 .setFlags(BUNDLE_FLAGS)
301 .setMessages(createMessages(nodeRef))
304 ListenableFuture<RpcResult<ControlBundleOutput>> closeBundle = salBundleService
305 .controlBundle(closeBundleInput);
307 ListenableFuture<RpcResult<ControlBundleOutput>> openBundleMessagesFuture = Futures
308 .transformAsync(closeBundle, rpcResult -> salBundleService
309 .controlBundle(openBundleInput), MoreExecutors.directExecutor());
311 ListenableFuture<RpcResult<AddBundleMessagesOutput>> addBundleMessagesFuture = Futures
312 .transformAsync(openBundleMessagesFuture, rpcResult -> {
313 if (rpcResult.isSuccessful()) {
314 return salBundleService
315 .addBundleMessages(addBundleMessagesInput);
317 return FluentFutures.immediateNullFluentFuture();
318 }, MoreExecutors.directExecutor());
319 Uint64 nodeId = getDpnIdFromNodeName(node);
321 if (addBundleMessagesFuture.get().isSuccessful()) {
322 bundleIdMap.put(nodeId, new BundleDetails(bundleIdValue,FluentFutures.immediateNullFluentFuture()));
323 LOG.debug("Arbitrator reconciliation initial task has been completed for node {} ", nodeId);
326 LOG.error("Error while performing arbitrator reconciliation for device:{}", nodeId);
329 } catch (InterruptedException | ExecutionException e) {
330 LOG.error("Error while performing arbitrator reconciliation for device:{}", nodeId, e);
336 public final class CommitActiveBundleCallback implements FutureCallback<RpcResult<?>> {
337 private final Uint64 nodeId;
339 private CommitActiveBundleCallback(final Uint64 nodeId) {
340 this.nodeId = nodeId;
344 public void onSuccess(RpcResult<?> rpcResult) {
345 LOG.debug("Completed arbitrator reconciliation for device:{}", nodeId);
346 bundleIdMap.remove(nodeId);
350 public void onFailure(Throwable throwable) {
351 LOG.error("Error while performing arbitrator reconciliation for device {}", nodeId, throwable);
355 private static <D> Function<RpcResult<D>,
356 RpcResult<CommitActiveBundleOutput>> createRpcResultCondenser(final String action) {
358 final RpcResultBuilder<CommitActiveBundleOutput> resultSink;
360 List<RpcError> errors = new ArrayList<>();
361 if (!input.isSuccessful()) {
362 errors.addAll(input.getErrors());
363 resultSink = RpcResultBuilder.<CommitActiveBundleOutput>failed().withRpcErrors(errors);
365 resultSink = RpcResultBuilder.success();
368 resultSink = RpcResultBuilder.<CommitActiveBundleOutput>failed()
369 .withError(RpcError.ErrorType.APPLICATION, "action of " + action + " failed");
371 return resultSink.build();
375 private void registerRpc(DeviceInfo node) {
376 KeyedInstanceIdentifier<Node, NodeKey> path = InstanceIdentifier.create(Nodes.class)
377 .child(Node.class, new NodeKey(node.getNodeId()));
378 LOG.debug("The path is registered : {}", path);
379 ObjectRegistration<? extends RpcService> rpcRegistration =
380 rpcProviderService.registerRpcImplementation(ArbitratorReconcileService.class,
381 this, ImmutableSet.of(path));
382 rpcRegistrations.put(node.getNodeId().getValue(), rpcRegistration);
385 private void deregisterRpc(DeviceInfo node) {
386 KeyedInstanceIdentifier<Node, NodeKey> path = InstanceIdentifier.create(Nodes.class)
387 .child(Node.class, new NodeKey(node.getNodeId()));
388 LOG.debug("The path is unregistered : {}", path);
389 ObjectRegistration<? extends RpcService> rpcRegistration = rpcRegistrations.get(node.getNodeId().getValue());
390 if (rpcRegistration != null) {
391 rpcRegistration.close();
392 rpcRegistrations.remove(node.getNodeId().getValue());
396 private static class BundleDetails {
397 private final BundleId bundleId;
398 private final ListenableFuture<RpcResult<ControlBundleOutput>> result;
400 BundleDetails(BundleId bundleId, ListenableFuture<RpcResult<ControlBundleOutput>> result) {
401 this.bundleId = bundleId;
402 this.result = result;
405 public BundleId getBundleId() {
409 public ListenableFuture<RpcResult<ControlBundleOutput>> getResult() {
414 @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
415 justification = "https://github.com/spotbugs/spotbugs/issues/811")
416 private static Uint64 getDpnIdFromNodeName(String nodeName) {
417 String dpnId = nodeName.substring(nodeName.lastIndexOf(SEPARATOR) + 1);
418 return Uint64.valueOf(dpnId);