2 * Copyright (c) 2018 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.applications.arbitratorreconciliation.impl;
11 import com.google.common.base.Function;
12 import com.google.common.base.Preconditions;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.JdkFutureAdapters;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.MoreExecutors;
19 import java.math.BigInteger;
20 import java.util.ArrayList;
21 import java.util.Collections;
22 import java.util.List;
24 import java.util.concurrent.Callable;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.ExecutionException;
27 import java.util.concurrent.ExecutorService;
28 import java.util.concurrent.Executors;
29 import java.util.concurrent.atomic.AtomicLong;
31 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration;
32 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
33 import org.opendaylight.openflowplugin.api.OFConstants;
34 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
35 import org.opendaylight.openflowplugin.applications.reconciliation.NotificationRegistration;
36 import org.opendaylight.openflowplugin.applications.reconciliation.ReconciliationManager;
37 import org.opendaylight.openflowplugin.applications.reconciliation.ReconciliationNotificationListener;
38 import org.opendaylight.serviceutils.upgrade.UpgradeState;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupId;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupTypes;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupBuilder;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeContext;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesInput;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesInputBuilder;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesOutput;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundleInput;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundleInputBuilder;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundleOutput;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.SalBundleService;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.Messages;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.MessagesBuilder;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.Message;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.MessageBuilder;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveFlowCaseBuilder;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveGroupCaseBuilder;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.remove.flow._case.RemoveFlowCaseDataBuilder;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.remove.group._case.RemoveGroupCaseDataBuilder;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleControlType;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleFlags;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleId;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.ArbitratorReconcileService;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.CommitActiveBundleInput;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.CommitActiveBundleOutput;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.CommitActiveBundleOutputBuilder;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleInput;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleOutput;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleOutputBuilder;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.rf.state.rev170713.ResultState;
77 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
78 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
79 import org.opendaylight.yangtools.yang.common.RpcError;
80 import org.opendaylight.yangtools.yang.common.RpcResult;
81 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
82 import org.slf4j.Logger;
83 import org.slf4j.LoggerFactory;
85 public class ArbitratorReconciliationManagerImpl implements ArbitratorReconcileService,
86 ReconciliationNotificationListener, AutoCloseable {
88 private static final Logger LOG = LoggerFactory.getLogger(ArbitratorReconciliationManagerImpl.class);
89 private static final int THREAD_POOL_SIZE = 4;
90 private static final AtomicLong BUNDLE_ID = new AtomicLong();
91 private static final BundleFlags BUNDLE_FLAGS = new BundleFlags(true, true);
92 private static final int ARBITRATOR_RECONCILIATION_PRIORITY = Integer
93 .getInteger("arbitrator.reconciliation.manager.priority", 1/*default*/);
94 private static final String SERVICE_NAME = "ArbitratorReconciliationManager";
95 private static final String SEPARATOR = ":";
97 private final SalBundleService salBundleService;
98 private final ReconciliationManager reconciliationManager;
99 private final RoutedRpcRegistration routedRpcReg;
100 private final UpgradeState upgradeState;
101 private NotificationRegistration registration;
102 private final ExecutorService executor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
103 private final Map<BigInteger, BundleDetails> bundleIdMap = new ConcurrentHashMap<>();
105 public ArbitratorReconciliationManagerImpl(final RpcProviderRegistry rpcRegistry,
106 final ReconciliationManager reconciliationManager, final UpgradeState upgradeState) {
107 Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry cannot be null !");
108 this.reconciliationManager = Preconditions.checkNotNull(reconciliationManager,
109 "ReconciliationManager cannot be null!");
110 this.salBundleService = Preconditions.checkNotNull(rpcRegistry.getRpcService(SalBundleService.class),
111 "RPC SalBundlService not found.");
112 this.routedRpcReg = rpcRegistry.addRoutedRpcImplementation(ArbitratorReconcileService.class,
114 this.upgradeState = Preconditions.checkNotNull(upgradeState, "UpgradeState cannot be null!");
117 public void start() {
118 registration = reconciliationManager.registerService(this);
119 LOG.info("ArbitratorReconciliationManager has started successfully.");
123 public void close() throws Exception {
125 registration.close();
130 public ListenableFuture<RpcResult<CommitActiveBundleOutput>> commitActiveBundle(
131 CommitActiveBundleInput input) {
132 BigInteger nodeId = input.getNodeId();
133 if (bundleIdMap.containsKey(nodeId)) {
134 BundleId bundleId = bundleIdMap.get(nodeId).getBundleId();
135 if (bundleId != null) {
136 final ControlBundleInput commitBundleInput = new ControlBundleInputBuilder()
137 .setNode(input.getNode()).setBundleId(bundleId)
138 .setFlags(BUNDLE_FLAGS)
139 .setType(BundleControlType.ONFBCTCOMMITREQUEST).build();
140 ListenableFuture<RpcResult<ControlBundleOutput>> rpcResult = salBundleService
141 .controlBundle(commitBundleInput);
142 bundleIdMap.put(nodeId, new BundleDetails(bundleId, rpcResult));
143 Futures.addCallback(rpcResult, new CommitActiveBundleCallback(nodeId),
144 MoreExecutors.directExecutor());
145 return Futures.transform(
147 this.<ControlBundleOutput>createRpcResultCondenser("committed active bundle"),
148 MoreExecutors.directExecutor());
151 return RpcResultBuilder.success((new CommitActiveBundleOutputBuilder()
152 .setResult(null).build()))
153 .withRpcErrors(Collections.singleton(RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION,
154 null, "No active bundle found for the node" + nodeId.toString()))).buildFuture();
158 public ListenableFuture<RpcResult<GetActiveBundleOutput>> getActiveBundle(GetActiveBundleInput input) {
159 BigInteger nodeId = input.getNodeId();
160 BundleDetails bundleDetails = bundleIdMap.get(nodeId);
161 if (bundleDetails != null) {
163 //This blocking call is used to prevent the applications from pushing flows and groups via the default
164 // pipeline when the commit bundle is ongoing.
165 bundleDetails.getResult().get();
166 return RpcResultBuilder.success((new GetActiveBundleOutputBuilder()
167 .setResult(bundleDetails.getBundleId()).build())).buildFuture();
168 } catch (InterruptedException | ExecutionException | NullPointerException e) {
169 return RpcResultBuilder.<GetActiveBundleOutput>failed()
170 .withRpcErrors(Collections.singleton(RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION,
171 null, e.getMessage()))).buildFuture();
174 return RpcResultBuilder.success((new GetActiveBundleOutputBuilder()
175 .setResult(null).build())).buildFuture();
179 public ListenableFuture<Boolean> startReconciliation(DeviceInfo node) {
181 if (upgradeState.isUpgradeInProgress()) {
182 LOG.trace("Starting arbitrator reconciliation for node {}", node.getDatapathId());
183 return reconcileConfiguration(node);
185 LOG.trace("arbitrator reconciliation is disabled");
186 return Futures.immediateFuture(true);
190 public ListenableFuture<Boolean> endReconciliation(DeviceInfo node) {
191 LOG.trace("Stopping arbitrator reconciliation for node {}", node.getDatapathId());
192 InstanceIdentifier<FlowCapableNode> connectedNode = node.getNodeInstanceIdentifier()
193 .augmentation(FlowCapableNode.class);
194 bundleIdMap.remove(connectedNode);
196 return Futures.immediateFuture(true);
200 public int getPriority() {
201 return ARBITRATOR_RECONCILIATION_PRIORITY;
205 public String getName() {
210 public ResultState getResultState() {
211 return ResultState.DONOTHING;
214 private ListenableFuture<Boolean> reconcileConfiguration(DeviceInfo node) {
215 LOG.info("Triggering arbitrator reconciliation for device {}", node.getDatapathId());
216 ArbitratorReconciliationTask upgradeReconTask = new ArbitratorReconciliationTask(node);
217 return JdkFutureAdapters.listenInPoolThread(executor.submit(upgradeReconTask));
220 private Messages createMessages(final NodeRef nodeRef) {
221 final List<Message> messages = new ArrayList<>();
222 messages.add(new MessageBuilder().setNode(nodeRef).setBundleInnerMessage(
223 new BundleRemoveFlowCaseBuilder()
224 .setRemoveFlowCaseData(new RemoveFlowCaseDataBuilder(getDeleteAllFlow()).build())
227 messages.add(new MessageBuilder().setNode(nodeRef).setBundleInnerMessage(
228 new BundleRemoveGroupCaseBuilder()
229 .setRemoveGroupCaseData(new RemoveGroupCaseDataBuilder(getDeleteAllGroup()).build())
231 LOG.debug("The size of the flows and group messages created in createMessage() {}", messages.size());
232 return new MessagesBuilder().setMessage(messages).build();
235 private Flow getDeleteAllFlow() {
236 final FlowBuilder flowBuilder = new FlowBuilder();
237 flowBuilder.setTableId(OFConstants.OFPTT_ALL);
238 return flowBuilder.build();
241 private Group getDeleteAllGroup() {
242 final GroupBuilder groupBuilder = new GroupBuilder();
243 groupBuilder.setGroupType(GroupTypes.GroupAll);
244 groupBuilder.setGroupId(new GroupId(OFConstants.OFPG_ALL));
245 return groupBuilder.build();
248 private class ArbitratorReconciliationTask implements Callable<Boolean> {
249 final DeviceInfo deviceInfo;
251 ArbitratorReconciliationTask(final DeviceInfo deviceInfo) {
252 this.deviceInfo = deviceInfo;
256 public Boolean call() {
257 InstanceIdentifier<FlowCapableNode> nodeIdentity = deviceInfo.getNodeInstanceIdentifier()
258 .augmentation(FlowCapableNode.class);
259 String node = nodeIdentity.firstKeyOf(Node.class, NodeKey.class).getId().getValue();
260 BundleId bundleIdValue = new BundleId(BUNDLE_ID.getAndIncrement());
261 LOG.debug("Triggering arbitrator reconciliation for device :{}", node);
262 final NodeRef nodeRef = new NodeRef(nodeIdentity.firstIdentifierOf(Node.class));
264 final ControlBundleInput closeBundleInput = new ControlBundleInputBuilder().setNode(nodeRef)
265 .setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
266 .setType(BundleControlType.ONFBCTCLOSEREQUEST).build();
268 final ControlBundleInput openBundleInput = new ControlBundleInputBuilder().setNode(nodeRef)
269 .setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
270 .setType(BundleControlType.ONFBCTOPENREQUEST).build();
272 final AddBundleMessagesInput addBundleMessagesInput = new AddBundleMessagesInputBuilder()
273 .setNode(nodeRef).setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
274 .setMessages(createMessages(nodeRef)).build();
276 ListenableFuture<RpcResult<ControlBundleOutput>> closeBundle = salBundleService
277 .controlBundle(closeBundleInput);
279 ListenableFuture<RpcResult<ControlBundleOutput>> openBundleMessagesFuture = Futures
280 .transformAsync(closeBundle, rpcResult -> salBundleService
281 .controlBundle(openBundleInput), MoreExecutors.directExecutor());
283 ListenableFuture<RpcResult<AddBundleMessagesOutput>> addBundleMessagesFuture = Futures
284 .transformAsync(openBundleMessagesFuture, rpcResult -> {
285 if (rpcResult.isSuccessful()) {
286 return salBundleService
287 .addBundleMessages(addBundleMessagesInput);
289 return Futures.immediateFuture(null);
290 }, MoreExecutors.directExecutor());
291 BigInteger nodeId = getDpnIdFromNodeName(node);
293 if (addBundleMessagesFuture.get().isSuccessful()) {
294 bundleIdMap.put(nodeId, new BundleDetails(bundleIdValue,
295 Futures.immediateFuture(null)));
296 LOG.debug("Arbitrator reconciliation initial task has been completed for node {} and open up"
297 + " for application programming.", nodeId);
300 LOG.error("Error while performing arbitrator reconciliation for device:{}", nodeId);
303 } catch (InterruptedException | ExecutionException e) {
304 LOG.error("Error while performing arbitrator reconciliation for device:{}", nodeId, e);
310 public final class CommitActiveBundleCallback implements FutureCallback<RpcResult<?>> {
311 private final BigInteger nodeId;
313 private CommitActiveBundleCallback(final BigInteger nodeId) {
314 this.nodeId = nodeId;
318 public void onSuccess(RpcResult<?> rpcResult) {
319 LOG.debug("Completed arbitrator reconciliation for device:{}", nodeId);
320 bundleIdMap.remove(nodeId);
324 public void onFailure(Throwable throwable) {
325 LOG.error("Error while performing arbitrator reconciliation for device {}",
330 private <D> Function<RpcResult<D>,
331 RpcResult<CommitActiveBundleOutput>> createRpcResultCondenser(final String action) {
333 final RpcResultBuilder<CommitActiveBundleOutput> resultSink;
335 List<RpcError> errors = new ArrayList<>();
336 if (!input.isSuccessful()) {
337 errors.addAll(input.getErrors());
338 resultSink = RpcResultBuilder.<CommitActiveBundleOutput>failed().withRpcErrors(errors);
340 resultSink = RpcResultBuilder.success();
343 resultSink = RpcResultBuilder.<CommitActiveBundleOutput>failed()
344 .withError(RpcError.ErrorType.APPLICATION, "action of " + action + " failed");
346 return resultSink.build();
350 private void registerRpc(DeviceInfo node) {
351 KeyedInstanceIdentifier<Node, NodeKey> path = InstanceIdentifier.create(Nodes.class)
352 .child(Node.class, new NodeKey(node.getNodeId()));
353 LOG.debug("The path is registered : {}", path);
354 routedRpcReg.registerPath(NodeContext.class, path);
357 private void deregisterRpc(DeviceInfo node) {
358 KeyedInstanceIdentifier<Node, NodeKey> path = InstanceIdentifier.create(Nodes.class).child(Node.class,
359 new NodeKey(node.getNodeId()));
360 LOG.debug("The path is unregistered : {}", path);
361 routedRpcReg.unregisterPath(NodeContext.class, path);
364 private static class BundleDetails {
365 private final BundleId bundleId;
366 private final ListenableFuture<RpcResult<ControlBundleOutput>> result;
368 BundleDetails(BundleId bundleId, ListenableFuture<RpcResult<ControlBundleOutput>> result) {
369 this.bundleId = bundleId;
370 this.result = result;
373 public BundleId getBundleId() {
377 public ListenableFuture<RpcResult<ControlBundleOutput>> getResult() {
382 private BigInteger getDpnIdFromNodeName(String nodeName) {
383 String dpnId = nodeName.substring(nodeName.lastIndexOf(SEPARATOR) + 1);
384 return new BigInteger(dpnId);