2 * Copyright (c) 2018 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.applications.arbitratorreconciliation.impl;
11 import com.google.common.base.Function;
12 import com.google.common.base.Preconditions;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.JdkFutureAdapters;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
19 import java.util.ArrayList;
20 import java.util.Collections;
21 import java.util.List;
23 import java.util.concurrent.Callable;
24 import java.util.concurrent.ConcurrentHashMap;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.atomic.AtomicLong;
29 import javax.annotation.PostConstruct;
30 import javax.annotation.PreDestroy;
31 import javax.inject.Inject;
32 import javax.inject.Singleton;
33 import org.apache.aries.blueprint.annotation.service.Reference;
34 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration;
35 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
36 import org.opendaylight.openflowplugin.api.OFConstants;
37 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
38 import org.opendaylight.openflowplugin.applications.reconciliation.NotificationRegistration;
39 import org.opendaylight.openflowplugin.applications.reconciliation.ReconciliationManager;
40 import org.opendaylight.openflowplugin.applications.reconciliation.ReconciliationNotificationListener;
41 import org.opendaylight.serviceutils.upgrade.UpgradeState;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupId;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupTypes;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupBuilder;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeContext;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesInput;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesInputBuilder;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesOutput;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundleInput;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundleInputBuilder;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundleOutput;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.SalBundleService;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.Messages;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.MessagesBuilder;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.Message;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.MessageBuilder;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveFlowCaseBuilder;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveGroupCaseBuilder;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.remove.flow._case.RemoveFlowCaseDataBuilder;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.remove.group._case.RemoveGroupCaseDataBuilder;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleControlType;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleFlags;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleId;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.ArbitratorReconcileService;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.CommitActiveBundleInput;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.CommitActiveBundleOutput;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.CommitActiveBundleOutputBuilder;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleInput;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleOutput;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleOutputBuilder;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.rf.state.rev170713.ResultState;
80 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
81 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
82 import org.opendaylight.yangtools.yang.common.RpcError;
83 import org.opendaylight.yangtools.yang.common.RpcResult;
84 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
85 import org.opendaylight.yangtools.yang.common.Uint64;
86 import org.slf4j.Logger;
87 import org.slf4j.LoggerFactory;
90 public class ArbitratorReconciliationManagerImpl implements ArbitratorReconcileService,
91 ReconciliationNotificationListener, AutoCloseable {
93 private static final Logger LOG = LoggerFactory.getLogger(ArbitratorReconciliationManagerImpl.class);
94 private static final int THREAD_POOL_SIZE = 4;
95 private static final AtomicLong BUNDLE_ID = new AtomicLong();
96 private static final BundleFlags BUNDLE_FLAGS = new BundleFlags(true, true);
97 private static final int ARBITRATOR_RECONCILIATION_PRIORITY = Integer
98 .getInteger("arbitrator.reconciliation.manager.priority", 0/*default*/);
99 private static final String SERVICE_NAME = "ArbitratorReconciliationManager";
100 private static final String SEPARATOR = ":";
102 private final SalBundleService salBundleService;
103 private final ReconciliationManager reconciliationManager;
104 private final RoutedRpcRegistration routedRpcReg;
105 private final UpgradeState upgradeState;
106 private NotificationRegistration registration;
107 private final ExecutorService executor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
108 private final Map<Uint64, BundleDetails> bundleIdMap = new ConcurrentHashMap<>();
111 public ArbitratorReconciliationManagerImpl(@Reference RpcProviderRegistry rpcRegistry,
112 @Reference ReconciliationManager reconciliationManager, @Reference UpgradeState upgradeState) {
113 Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry cannot be null !");
114 this.reconciliationManager = Preconditions.checkNotNull(reconciliationManager,
115 "ReconciliationManager cannot be null!");
116 this.salBundleService = Preconditions.checkNotNull(rpcRegistry.getRpcService(SalBundleService.class),
117 "RPC SalBundlService not found.");
118 this.routedRpcReg = rpcRegistry.addRoutedRpcImplementation(ArbitratorReconcileService.class,
120 this.upgradeState = Preconditions.checkNotNull(upgradeState, "UpgradeState cannot be null!");
124 public void start() {
125 registration = reconciliationManager.registerService(this);
126 LOG.info("ArbitratorReconciliationManager has started successfully.");
131 public void close() throws Exception {
133 if (registration != null) {
134 registration.close();
140 public ListenableFuture<RpcResult<CommitActiveBundleOutput>> commitActiveBundle(
141 CommitActiveBundleInput input) {
142 Uint64 nodeId = input.getNodeId();
143 if (bundleIdMap.containsKey(nodeId)) {
144 BundleId bundleId = bundleIdMap.get(nodeId).getBundleId();
145 if (bundleId != null) {
146 final ControlBundleInput commitBundleInput = new ControlBundleInputBuilder()
147 .setNode(input.getNode()).setBundleId(bundleId)
148 .setFlags(BUNDLE_FLAGS)
149 .setType(BundleControlType.ONFBCTCOMMITREQUEST).build();
150 ListenableFuture<RpcResult<ControlBundleOutput>> rpcResult = salBundleService
151 .controlBundle(commitBundleInput);
152 bundleIdMap.put(nodeId, new BundleDetails(bundleId, rpcResult));
153 Futures.addCallback(rpcResult, new CommitActiveBundleCallback(nodeId),
154 MoreExecutors.directExecutor());
155 return Futures.transform(
157 this.createRpcResultCondenser("committed active bundle"),
158 MoreExecutors.directExecutor());
161 return RpcResultBuilder.success(new CommitActiveBundleOutputBuilder()
162 .setResult(null).build())
163 .withRpcErrors(Collections.singleton(RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION,
164 null, "No active bundle found for the node" + nodeId.toString()))).buildFuture();
168 public ListenableFuture<RpcResult<GetActiveBundleOutput>> getActiveBundle(GetActiveBundleInput input) {
169 Uint64 nodeId = input.getNodeId();
170 BundleDetails bundleDetails = bundleIdMap.get(nodeId);
171 if (bundleDetails != null) {
173 //This blocking call is used to prevent the applications from pushing flows and groups via the default
174 // pipeline when the commit bundle is ongoing.
175 bundleDetails.getResult().get();
176 return RpcResultBuilder.success(new GetActiveBundleOutputBuilder()
177 .setResult(bundleDetails.getBundleId()).build()).buildFuture();
178 } catch (InterruptedException | ExecutionException | NullPointerException e) {
179 return RpcResultBuilder.<GetActiveBundleOutput>failed()
180 .withRpcErrors(Collections.singleton(RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION,
181 null, e.getMessage()))).buildFuture();
184 return RpcResultBuilder.success(new GetActiveBundleOutputBuilder()
185 .setResult(null).build()).buildFuture();
189 public ListenableFuture<Boolean> startReconciliation(DeviceInfo node) {
191 if (upgradeState.isUpgradeInProgress()) {
192 LOG.trace("Starting arbitrator reconciliation for node {}", node.getDatapathId());
193 return reconcileConfiguration(node);
195 LOG.trace("arbitrator reconciliation is disabled");
196 return Futures.immediateFuture(true);
200 public ListenableFuture<Boolean> endReconciliation(DeviceInfo node) {
201 Uint64 datapathId = node.getDatapathId();
202 LOG.trace("Stopping arbitrator reconciliation for node {}", datapathId);
203 bundleIdMap.remove(datapathId);
205 return Futures.immediateFuture(true);
209 public int getPriority() {
210 return ARBITRATOR_RECONCILIATION_PRIORITY;
214 public String getName() {
219 public ResultState getResultState() {
220 return ResultState.DONOTHING;
223 private ListenableFuture<Boolean> reconcileConfiguration(DeviceInfo node) {
224 LOG.info("Triggering arbitrator reconciliation for device {}", node.getDatapathId());
225 ArbitratorReconciliationTask upgradeReconTask = new ArbitratorReconciliationTask(node);
226 return JdkFutureAdapters.listenInPoolThread(executor.submit(upgradeReconTask));
229 @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
230 justification = "https://github.com/spotbugs/spotbugs/issues/811")
231 private Messages createMessages(final NodeRef nodeRef) {
232 final List<Message> messages = new ArrayList<>();
233 messages.add(new MessageBuilder().setNode(nodeRef).setBundleInnerMessage(
234 new BundleRemoveFlowCaseBuilder()
235 .setRemoveFlowCaseData(new RemoveFlowCaseDataBuilder(getDeleteAllFlow()).build())
238 messages.add(new MessageBuilder().setNode(nodeRef).setBundleInnerMessage(
239 new BundleRemoveGroupCaseBuilder()
240 .setRemoveGroupCaseData(new RemoveGroupCaseDataBuilder(getDeleteAllGroup()).build())
242 LOG.debug("The size of the flows and group messages created in createMessage() {}", messages.size());
243 return new MessagesBuilder().setMessage(messages).build();
246 private Flow getDeleteAllFlow() {
247 final FlowBuilder flowBuilder = new FlowBuilder();
248 flowBuilder.setTableId(OFConstants.OFPTT_ALL);
249 return flowBuilder.build();
252 private Group getDeleteAllGroup() {
253 final GroupBuilder groupBuilder = new GroupBuilder();
254 groupBuilder.setGroupType(GroupTypes.GroupAll);
255 groupBuilder.setGroupId(new GroupId(OFConstants.OFPG_ALL));
256 return groupBuilder.build();
259 private class ArbitratorReconciliationTask implements Callable<Boolean> {
260 final DeviceInfo deviceInfo;
262 ArbitratorReconciliationTask(final DeviceInfo deviceInfo) {
263 this.deviceInfo = deviceInfo;
267 public Boolean call() {
268 InstanceIdentifier<FlowCapableNode> nodeIdentity = deviceInfo.getNodeInstanceIdentifier()
269 .augmentation(FlowCapableNode.class);
270 String node = nodeIdentity.firstKeyOf(Node.class).getId().getValue();
271 BundleId bundleIdValue = new BundleId(BUNDLE_ID.getAndIncrement());
272 LOG.debug("Triggering arbitrator reconciliation for device :{}", node);
273 final NodeRef nodeRef = new NodeRef(nodeIdentity.firstIdentifierOf(Node.class));
275 final ControlBundleInput closeBundleInput = new ControlBundleInputBuilder().setNode(nodeRef)
276 .setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
277 .setType(BundleControlType.ONFBCTCLOSEREQUEST).build();
279 final ControlBundleInput openBundleInput = new ControlBundleInputBuilder().setNode(nodeRef)
280 .setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
281 .setType(BundleControlType.ONFBCTOPENREQUEST).build();
283 final AddBundleMessagesInput addBundleMessagesInput = new AddBundleMessagesInputBuilder()
284 .setNode(nodeRef).setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
285 .setMessages(createMessages(nodeRef)).build();
287 ListenableFuture<RpcResult<ControlBundleOutput>> closeBundle = salBundleService
288 .controlBundle(closeBundleInput);
290 ListenableFuture<RpcResult<ControlBundleOutput>> openBundleMessagesFuture = Futures
291 .transformAsync(closeBundle, rpcResult -> salBundleService
292 .controlBundle(openBundleInput), MoreExecutors.directExecutor());
294 ListenableFuture<RpcResult<AddBundleMessagesOutput>> addBundleMessagesFuture = Futures
295 .transformAsync(openBundleMessagesFuture, rpcResult -> {
296 if (rpcResult.isSuccessful()) {
297 return salBundleService
298 .addBundleMessages(addBundleMessagesInput);
300 return Futures.immediateFuture(null);
301 }, MoreExecutors.directExecutor());
302 Uint64 nodeId = getDpnIdFromNodeName(node);
304 if (addBundleMessagesFuture.get().isSuccessful()) {
305 bundleIdMap.put(nodeId, new BundleDetails(bundleIdValue,
306 Futures.immediateFuture(null)));
307 LOG.debug("Arbitrator reconciliation initial task has been completed for node {} and open up"
308 + " for application programming.", nodeId);
311 LOG.error("Error while performing arbitrator reconciliation for device:{}", nodeId);
314 } catch (InterruptedException | ExecutionException e) {
315 LOG.error("Error while performing arbitrator reconciliation for device:{}", nodeId, e);
321 public final class CommitActiveBundleCallback implements FutureCallback<RpcResult<?>> {
322 private final Uint64 nodeId;
324 private CommitActiveBundleCallback(final Uint64 nodeId) {
325 this.nodeId = nodeId;
329 public void onSuccess(RpcResult<?> rpcResult) {
330 LOG.debug("Completed arbitrator reconciliation for device:{}", nodeId);
331 bundleIdMap.remove(nodeId);
335 public void onFailure(Throwable throwable) {
336 LOG.error("Error while performing arbitrator reconciliation for device {}", nodeId, throwable);
340 private <D> Function<RpcResult<D>,
341 RpcResult<CommitActiveBundleOutput>> createRpcResultCondenser(final String action) {
343 final RpcResultBuilder<CommitActiveBundleOutput> resultSink;
345 List<RpcError> errors = new ArrayList<>();
346 if (!input.isSuccessful()) {
347 errors.addAll(input.getErrors());
348 resultSink = RpcResultBuilder.<CommitActiveBundleOutput>failed().withRpcErrors(errors);
350 resultSink = RpcResultBuilder.success();
353 resultSink = RpcResultBuilder.<CommitActiveBundleOutput>failed()
354 .withError(RpcError.ErrorType.APPLICATION, "action of " + action + " failed");
356 return resultSink.build();
360 private void registerRpc(DeviceInfo node) {
361 KeyedInstanceIdentifier<Node, NodeKey> path = InstanceIdentifier.create(Nodes.class)
362 .child(Node.class, new NodeKey(node.getNodeId()));
363 LOG.debug("The path is registered : {}", path);
364 routedRpcReg.registerPath(NodeContext.class, path);
367 private void deregisterRpc(DeviceInfo node) {
368 KeyedInstanceIdentifier<Node, NodeKey> path = InstanceIdentifier.create(Nodes.class).child(Node.class,
369 new NodeKey(node.getNodeId()));
370 LOG.debug("The path is unregistered : {}", path);
371 routedRpcReg.unregisterPath(NodeContext.class, path);
374 private static class BundleDetails {
375 private final BundleId bundleId;
376 private final ListenableFuture<RpcResult<ControlBundleOutput>> result;
378 BundleDetails(BundleId bundleId, ListenableFuture<RpcResult<ControlBundleOutput>> result) {
379 this.bundleId = bundleId;
380 this.result = result;
383 public BundleId getBundleId() {
387 public ListenableFuture<RpcResult<ControlBundleOutput>> getResult() {
392 @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
393 justification = "https://github.com/spotbugs/spotbugs/issues/811")
394 private Uint64 getDpnIdFromNodeName(String nodeName) {
395 String dpnId = nodeName.substring(nodeName.lastIndexOf(SEPARATOR) + 1);
396 return Uint64.valueOf(dpnId);