2 * Copyright (c) 2018 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.applications.arbitratorreconciliation.impl;
11 import com.google.common.base.Function;
12 import com.google.common.base.Preconditions;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.JdkFutureAdapters;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import java.math.BigInteger;
19 import java.util.ArrayList;
20 import java.util.Collections;
21 import java.util.List;
23 import java.util.concurrent.Callable;
24 import java.util.concurrent.ConcurrentHashMap;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.atomic.AtomicLong;
29 import javax.annotation.PostConstruct;
30 import javax.annotation.PreDestroy;
31 import javax.inject.Inject;
32 import javax.inject.Singleton;
33 import org.apache.aries.blueprint.annotation.service.Reference;
34 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration;
35 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
36 import org.opendaylight.openflowplugin.api.OFConstants;
37 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
38 import org.opendaylight.openflowplugin.applications.reconciliation.NotificationRegistration;
39 import org.opendaylight.openflowplugin.applications.reconciliation.ReconciliationManager;
40 import org.opendaylight.openflowplugin.applications.reconciliation.ReconciliationNotificationListener;
41 import org.opendaylight.serviceutils.upgrade.UpgradeState;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupId;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupTypes;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupBuilder;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeContext;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesInput;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesInputBuilder;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesOutput;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundleInput;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundleInputBuilder;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundleOutput;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.SalBundleService;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.Messages;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.MessagesBuilder;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.Message;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.MessageBuilder;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveFlowCaseBuilder;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveGroupCaseBuilder;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.remove.flow._case.RemoveFlowCaseDataBuilder;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.remove.group._case.RemoveGroupCaseDataBuilder;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleControlType;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleFlags;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleId;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.ArbitratorReconcileService;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.CommitActiveBundleInput;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.CommitActiveBundleOutput;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.CommitActiveBundleOutputBuilder;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleInput;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleOutput;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleOutputBuilder;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.rf.state.rev170713.ResultState;
80 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
81 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
82 import org.opendaylight.yangtools.yang.common.RpcError;
83 import org.opendaylight.yangtools.yang.common.RpcResult;
84 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
85 import org.slf4j.Logger;
86 import org.slf4j.LoggerFactory;
89 public class ArbitratorReconciliationManagerImpl implements ArbitratorReconcileService,
90 ReconciliationNotificationListener, AutoCloseable {
92 private static final Logger LOG = LoggerFactory.getLogger(ArbitratorReconciliationManagerImpl.class);
93 private static final int THREAD_POOL_SIZE = 4;
94 private static final AtomicLong BUNDLE_ID = new AtomicLong();
95 private static final BundleFlags BUNDLE_FLAGS = new BundleFlags(true, true);
96 private static final int ARBITRATOR_RECONCILIATION_PRIORITY = Integer
97 .getInteger("arbitrator.reconciliation.manager.priority", 0/*default*/);
98 private static final String SERVICE_NAME = "ArbitratorReconciliationManager";
99 private static final String SEPARATOR = ":";
101 private final SalBundleService salBundleService;
102 private final ReconciliationManager reconciliationManager;
103 private final RoutedRpcRegistration routedRpcReg;
104 private final UpgradeState upgradeState;
105 private NotificationRegistration registration;
106 private final ExecutorService executor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
107 private final Map<BigInteger, BundleDetails> bundleIdMap = new ConcurrentHashMap<>();
110 public ArbitratorReconciliationManagerImpl(@Reference RpcProviderRegistry rpcRegistry,
111 @Reference ReconciliationManager reconciliationManager, @Reference UpgradeState upgradeState) {
112 Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry cannot be null !");
113 this.reconciliationManager = Preconditions.checkNotNull(reconciliationManager,
114 "ReconciliationManager cannot be null!");
115 this.salBundleService = Preconditions.checkNotNull(rpcRegistry.getRpcService(SalBundleService.class),
116 "RPC SalBundlService not found.");
117 this.routedRpcReg = rpcRegistry.addRoutedRpcImplementation(ArbitratorReconcileService.class,
119 this.upgradeState = Preconditions.checkNotNull(upgradeState, "UpgradeState cannot be null!");
123 public void start() {
124 registration = reconciliationManager.registerService(this);
125 LOG.info("ArbitratorReconciliationManager has started successfully.");
130 public void close() throws Exception {
132 if (registration != null) {
133 registration.close();
139 public ListenableFuture<RpcResult<CommitActiveBundleOutput>> commitActiveBundle(
140 CommitActiveBundleInput input) {
141 BigInteger nodeId = input.getNodeId();
142 if (bundleIdMap.containsKey(nodeId)) {
143 BundleId bundleId = bundleIdMap.get(nodeId).getBundleId();
144 if (bundleId != null) {
145 final ControlBundleInput commitBundleInput = new ControlBundleInputBuilder()
146 .setNode(input.getNode()).setBundleId(bundleId)
147 .setFlags(BUNDLE_FLAGS)
148 .setType(BundleControlType.ONFBCTCOMMITREQUEST).build();
149 ListenableFuture<RpcResult<ControlBundleOutput>> rpcResult = salBundleService
150 .controlBundle(commitBundleInput);
151 bundleIdMap.put(nodeId, new BundleDetails(bundleId, rpcResult));
152 Futures.addCallback(rpcResult, new CommitActiveBundleCallback(nodeId),
153 MoreExecutors.directExecutor());
154 return Futures.transform(
156 this.<ControlBundleOutput>createRpcResultCondenser("committed active bundle"),
157 MoreExecutors.directExecutor());
160 return RpcResultBuilder.success(new CommitActiveBundleOutputBuilder()
161 .setResult(null).build())
162 .withRpcErrors(Collections.singleton(RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION,
163 null, "No active bundle found for the node" + nodeId.toString()))).buildFuture();
167 public ListenableFuture<RpcResult<GetActiveBundleOutput>> getActiveBundle(GetActiveBundleInput input) {
168 BigInteger nodeId = input.getNodeId();
169 BundleDetails bundleDetails = bundleIdMap.get(nodeId);
170 if (bundleDetails != null) {
172 //This blocking call is used to prevent the applications from pushing flows and groups via the default
173 // pipeline when the commit bundle is ongoing.
174 bundleDetails.getResult().get();
175 return RpcResultBuilder.success(new GetActiveBundleOutputBuilder()
176 .setResult(bundleDetails.getBundleId()).build()).buildFuture();
177 } catch (InterruptedException | ExecutionException | NullPointerException e) {
178 return RpcResultBuilder.<GetActiveBundleOutput>failed()
179 .withRpcErrors(Collections.singleton(RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION,
180 null, e.getMessage()))).buildFuture();
183 return RpcResultBuilder.success(new GetActiveBundleOutputBuilder()
184 .setResult(null).build()).buildFuture();
188 public ListenableFuture<Boolean> startReconciliation(DeviceInfo node) {
190 if (upgradeState.isUpgradeInProgress()) {
191 LOG.trace("Starting arbitrator reconciliation for node {}", node.getDatapathId());
192 return reconcileConfiguration(node);
194 LOG.trace("arbitrator reconciliation is disabled");
195 return Futures.immediateFuture(true);
199 public ListenableFuture<Boolean> endReconciliation(DeviceInfo node) {
200 LOG.trace("Stopping arbitrator reconciliation for node {}", node.getDatapathId());
201 InstanceIdentifier<FlowCapableNode> connectedNode = node.getNodeInstanceIdentifier()
202 .augmentation(FlowCapableNode.class);
203 bundleIdMap.remove(connectedNode);
205 return Futures.immediateFuture(true);
209 public int getPriority() {
210 return ARBITRATOR_RECONCILIATION_PRIORITY;
214 public String getName() {
219 public ResultState getResultState() {
220 return ResultState.DONOTHING;
223 private ListenableFuture<Boolean> reconcileConfiguration(DeviceInfo node) {
224 LOG.info("Triggering arbitrator reconciliation for device {}", node.getDatapathId());
225 ArbitratorReconciliationTask upgradeReconTask = new ArbitratorReconciliationTask(node);
226 return JdkFutureAdapters.listenInPoolThread(executor.submit(upgradeReconTask));
229 private Messages createMessages(final NodeRef nodeRef) {
230 final List<Message> messages = new ArrayList<>();
231 messages.add(new MessageBuilder().setNode(nodeRef).setBundleInnerMessage(
232 new BundleRemoveFlowCaseBuilder()
233 .setRemoveFlowCaseData(new RemoveFlowCaseDataBuilder(getDeleteAllFlow()).build())
236 messages.add(new MessageBuilder().setNode(nodeRef).setBundleInnerMessage(
237 new BundleRemoveGroupCaseBuilder()
238 .setRemoveGroupCaseData(new RemoveGroupCaseDataBuilder(getDeleteAllGroup()).build())
240 LOG.debug("The size of the flows and group messages created in createMessage() {}", messages.size());
241 return new MessagesBuilder().setMessage(messages).build();
244 private Flow getDeleteAllFlow() {
245 final FlowBuilder flowBuilder = new FlowBuilder();
246 flowBuilder.setTableId(OFConstants.OFPTT_ALL);
247 return flowBuilder.build();
250 private Group getDeleteAllGroup() {
251 final GroupBuilder groupBuilder = new GroupBuilder();
252 groupBuilder.setGroupType(GroupTypes.GroupAll);
253 groupBuilder.setGroupId(new GroupId(OFConstants.OFPG_ALL));
254 return groupBuilder.build();
257 private class ArbitratorReconciliationTask implements Callable<Boolean> {
258 final DeviceInfo deviceInfo;
260 ArbitratorReconciliationTask(final DeviceInfo deviceInfo) {
261 this.deviceInfo = deviceInfo;
265 public Boolean call() {
266 InstanceIdentifier<FlowCapableNode> nodeIdentity = deviceInfo.getNodeInstanceIdentifier()
267 .augmentation(FlowCapableNode.class);
268 String node = nodeIdentity.firstKeyOf(Node.class).getId().getValue();
269 BundleId bundleIdValue = new BundleId(BUNDLE_ID.getAndIncrement());
270 LOG.debug("Triggering arbitrator reconciliation for device :{}", node);
271 final NodeRef nodeRef = new NodeRef(nodeIdentity.firstIdentifierOf(Node.class));
273 final ControlBundleInput closeBundleInput = new ControlBundleInputBuilder().setNode(nodeRef)
274 .setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
275 .setType(BundleControlType.ONFBCTCLOSEREQUEST).build();
277 final ControlBundleInput openBundleInput = new ControlBundleInputBuilder().setNode(nodeRef)
278 .setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
279 .setType(BundleControlType.ONFBCTOPENREQUEST).build();
281 final AddBundleMessagesInput addBundleMessagesInput = new AddBundleMessagesInputBuilder()
282 .setNode(nodeRef).setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
283 .setMessages(createMessages(nodeRef)).build();
285 ListenableFuture<RpcResult<ControlBundleOutput>> closeBundle = salBundleService
286 .controlBundle(closeBundleInput);
288 ListenableFuture<RpcResult<ControlBundleOutput>> openBundleMessagesFuture = Futures
289 .transformAsync(closeBundle, rpcResult -> salBundleService
290 .controlBundle(openBundleInput), MoreExecutors.directExecutor());
292 ListenableFuture<RpcResult<AddBundleMessagesOutput>> addBundleMessagesFuture = Futures
293 .transformAsync(openBundleMessagesFuture, rpcResult -> {
294 if (rpcResult.isSuccessful()) {
295 return salBundleService
296 .addBundleMessages(addBundleMessagesInput);
298 return Futures.immediateFuture(null);
299 }, MoreExecutors.directExecutor());
300 BigInteger nodeId = getDpnIdFromNodeName(node);
302 if (addBundleMessagesFuture.get().isSuccessful()) {
303 bundleIdMap.put(nodeId, new BundleDetails(bundleIdValue,
304 Futures.immediateFuture(null)));
305 LOG.debug("Arbitrator reconciliation initial task has been completed for node {} and open up"
306 + " for application programming.", nodeId);
309 LOG.error("Error while performing arbitrator reconciliation for device:{}", nodeId);
312 } catch (InterruptedException | ExecutionException e) {
313 LOG.error("Error while performing arbitrator reconciliation for device:{}", nodeId, e);
319 public final class CommitActiveBundleCallback implements FutureCallback<RpcResult<?>> {
320 private final BigInteger nodeId;
322 private CommitActiveBundleCallback(final BigInteger nodeId) {
323 this.nodeId = nodeId;
327 public void onSuccess(RpcResult<?> rpcResult) {
328 LOG.debug("Completed arbitrator reconciliation for device:{}", nodeId);
329 bundleIdMap.remove(nodeId);
333 public void onFailure(Throwable throwable) {
334 LOG.error("Error while performing arbitrator reconciliation for device {}",
339 private <D> Function<RpcResult<D>,
340 RpcResult<CommitActiveBundleOutput>> createRpcResultCondenser(final String action) {
342 final RpcResultBuilder<CommitActiveBundleOutput> resultSink;
344 List<RpcError> errors = new ArrayList<>();
345 if (!input.isSuccessful()) {
346 errors.addAll(input.getErrors());
347 resultSink = RpcResultBuilder.<CommitActiveBundleOutput>failed().withRpcErrors(errors);
349 resultSink = RpcResultBuilder.success();
352 resultSink = RpcResultBuilder.<CommitActiveBundleOutput>failed()
353 .withError(RpcError.ErrorType.APPLICATION, "action of " + action + " failed");
355 return resultSink.build();
359 private void registerRpc(DeviceInfo node) {
360 KeyedInstanceIdentifier<Node, NodeKey> path = InstanceIdentifier.create(Nodes.class)
361 .child(Node.class, new NodeKey(node.getNodeId()));
362 LOG.debug("The path is registered : {}", path);
363 routedRpcReg.registerPath(NodeContext.class, path);
366 private void deregisterRpc(DeviceInfo node) {
367 KeyedInstanceIdentifier<Node, NodeKey> path = InstanceIdentifier.create(Nodes.class).child(Node.class,
368 new NodeKey(node.getNodeId()));
369 LOG.debug("The path is unregistered : {}", path);
370 routedRpcReg.unregisterPath(NodeContext.class, path);
373 private static class BundleDetails {
374 private final BundleId bundleId;
375 private final ListenableFuture<RpcResult<ControlBundleOutput>> result;
377 BundleDetails(BundleId bundleId, ListenableFuture<RpcResult<ControlBundleOutput>> result) {
378 this.bundleId = bundleId;
379 this.result = result;
382 public BundleId getBundleId() {
386 public ListenableFuture<RpcResult<ControlBundleOutput>> getResult() {
391 private BigInteger getDpnIdFromNodeName(String nodeName) {
392 String dpnId = nodeName.substring(nodeName.lastIndexOf(SEPARATOR) + 1);
393 return new BigInteger(dpnId);