2 * Copyright (c) 2018 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.applications.arbitratorreconciliation.impl;
11 import com.google.common.base.Function;
12 import com.google.common.base.Preconditions;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.JdkFutureAdapters;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import java.math.BigInteger;
19 import java.util.ArrayList;
20 import java.util.Collections;
21 import java.util.List;
23 import java.util.concurrent.Callable;
24 import java.util.concurrent.ConcurrentHashMap;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.atomic.AtomicLong;
29 import javax.annotation.PostConstruct;
30 import javax.annotation.PreDestroy;
31 import javax.inject.Inject;
32 import javax.inject.Singleton;
33 import org.apache.aries.blueprint.annotation.service.Reference;
34 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration;
35 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
36 import org.opendaylight.openflowplugin.api.OFConstants;
37 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
38 import org.opendaylight.openflowplugin.applications.reconciliation.NotificationRegistration;
39 import org.opendaylight.openflowplugin.applications.reconciliation.ReconciliationManager;
40 import org.opendaylight.openflowplugin.applications.reconciliation.ReconciliationNotificationListener;
41 import org.opendaylight.serviceutils.upgrade.UpgradeState;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupId;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupTypes;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupBuilder;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeContext;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesInput;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesInputBuilder;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesOutput;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundleInput;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundleInputBuilder;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundleOutput;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.SalBundleService;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.Messages;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.MessagesBuilder;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.Message;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.MessageBuilder;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveFlowCaseBuilder;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveGroupCaseBuilder;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.remove.flow._case.RemoveFlowCaseDataBuilder;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.remove.group._case.RemoveGroupCaseDataBuilder;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleControlType;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleFlags;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleId;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.ArbitratorReconcileService;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.CommitActiveBundleInput;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.CommitActiveBundleOutput;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.CommitActiveBundleOutputBuilder;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleInput;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleOutput;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleOutputBuilder;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.rf.state.rev170713.ResultState;
80 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
81 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
82 import org.opendaylight.yangtools.yang.common.RpcError;
83 import org.opendaylight.yangtools.yang.common.RpcResult;
84 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
85 import org.slf4j.Logger;
86 import org.slf4j.LoggerFactory;
89 public class ArbitratorReconciliationManagerImpl implements ArbitratorReconcileService,
90 ReconciliationNotificationListener, AutoCloseable {
92 private static final Logger LOG = LoggerFactory.getLogger(ArbitratorReconciliationManagerImpl.class);
93 private static final int THREAD_POOL_SIZE = 4;
94 private static final AtomicLong BUNDLE_ID = new AtomicLong();
95 private static final BundleFlags BUNDLE_FLAGS = new BundleFlags(true, true);
96 private static final int ARBITRATOR_RECONCILIATION_PRIORITY = Integer
97 .getInteger("arbitrator.reconciliation.manager.priority", 0/*default*/);
98 private static final String SERVICE_NAME = "ArbitratorReconciliationManager";
99 private static final String SEPARATOR = ":";
101 private final SalBundleService salBundleService;
102 private final ReconciliationManager reconciliationManager;
103 private final RoutedRpcRegistration routedRpcReg;
104 private final UpgradeState upgradeState;
105 private NotificationRegistration registration;
106 private final ExecutorService executor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
107 private final Map<BigInteger, BundleDetails> bundleIdMap = new ConcurrentHashMap<>();
110 public ArbitratorReconciliationManagerImpl(@Reference RpcProviderRegistry rpcRegistry,
111 @Reference ReconciliationManager reconciliationManager, @Reference UpgradeState upgradeState) {
112 Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry cannot be null !");
113 this.reconciliationManager = Preconditions.checkNotNull(reconciliationManager,
114 "ReconciliationManager cannot be null!");
115 this.salBundleService = Preconditions.checkNotNull(rpcRegistry.getRpcService(SalBundleService.class),
116 "RPC SalBundlService not found.");
117 this.routedRpcReg = rpcRegistry.addRoutedRpcImplementation(ArbitratorReconcileService.class,
119 this.upgradeState = Preconditions.checkNotNull(upgradeState, "UpgradeState cannot be null!");
123 public void start() {
124 registration = reconciliationManager.registerService(this);
125 LOG.info("ArbitratorReconciliationManager has started successfully.");
130 public void close() throws Exception {
132 if (registration != null) {
133 registration.close();
139 public ListenableFuture<RpcResult<CommitActiveBundleOutput>> commitActiveBundle(
140 CommitActiveBundleInput input) {
141 BigInteger nodeId = input.getNodeId();
142 if (bundleIdMap.containsKey(nodeId)) {
143 BundleId bundleId = bundleIdMap.get(nodeId).getBundleId();
144 if (bundleId != null) {
145 final ControlBundleInput commitBundleInput = new ControlBundleInputBuilder()
146 .setNode(input.getNode()).setBundleId(bundleId)
147 .setFlags(BUNDLE_FLAGS)
148 .setType(BundleControlType.ONFBCTCOMMITREQUEST).build();
149 ListenableFuture<RpcResult<ControlBundleOutput>> rpcResult = salBundleService
150 .controlBundle(commitBundleInput);
151 bundleIdMap.put(nodeId, new BundleDetails(bundleId, rpcResult));
152 Futures.addCallback(rpcResult, new CommitActiveBundleCallback(nodeId),
153 MoreExecutors.directExecutor());
154 return Futures.transform(
156 this.<ControlBundleOutput>createRpcResultCondenser("committed active bundle"),
157 MoreExecutors.directExecutor());
160 return RpcResultBuilder.success(new CommitActiveBundleOutputBuilder()
161 .setResult(null).build())
162 .withRpcErrors(Collections.singleton(RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION,
163 null, "No active bundle found for the node" + nodeId.toString()))).buildFuture();
167 public ListenableFuture<RpcResult<GetActiveBundleOutput>> getActiveBundle(GetActiveBundleInput input) {
168 BigInteger nodeId = input.getNodeId();
169 BundleDetails bundleDetails = bundleIdMap.get(nodeId);
170 if (bundleDetails != null) {
172 //This blocking call is used to prevent the applications from pushing flows and groups via the default
173 // pipeline when the commit bundle is ongoing.
174 bundleDetails.getResult().get();
175 return RpcResultBuilder.success(new GetActiveBundleOutputBuilder()
176 .setResult(bundleDetails.getBundleId()).build()).buildFuture();
177 } catch (InterruptedException | ExecutionException | NullPointerException e) {
178 return RpcResultBuilder.<GetActiveBundleOutput>failed()
179 .withRpcErrors(Collections.singleton(RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION,
180 null, e.getMessage()))).buildFuture();
183 return RpcResultBuilder.success(new GetActiveBundleOutputBuilder()
184 .setResult(null).build()).buildFuture();
188 public ListenableFuture<Boolean> startReconciliation(DeviceInfo node) {
190 if (upgradeState.isUpgradeInProgress()) {
191 LOG.trace("Starting arbitrator reconciliation for node {}", node.getDatapathId());
192 return reconcileConfiguration(node);
194 LOG.trace("arbitrator reconciliation is disabled");
195 return Futures.immediateFuture(true);
199 public ListenableFuture<Boolean> endReconciliation(DeviceInfo node) {
200 BigInteger datapathId = node.getDatapathId();
201 LOG.trace("Stopping arbitrator reconciliation for node {}", datapathId);
202 bundleIdMap.remove(datapathId);
204 return Futures.immediateFuture(true);
208 public int getPriority() {
209 return ARBITRATOR_RECONCILIATION_PRIORITY;
213 public String getName() {
218 public ResultState getResultState() {
219 return ResultState.DONOTHING;
222 private ListenableFuture<Boolean> reconcileConfiguration(DeviceInfo node) {
223 LOG.info("Triggering arbitrator reconciliation for device {}", node.getDatapathId());
224 ArbitratorReconciliationTask upgradeReconTask = new ArbitratorReconciliationTask(node);
225 return JdkFutureAdapters.listenInPoolThread(executor.submit(upgradeReconTask));
228 private Messages createMessages(final NodeRef nodeRef) {
229 final List<Message> messages = new ArrayList<>();
230 messages.add(new MessageBuilder().setNode(nodeRef).setBundleInnerMessage(
231 new BundleRemoveFlowCaseBuilder()
232 .setRemoveFlowCaseData(new RemoveFlowCaseDataBuilder(getDeleteAllFlow()).build())
235 messages.add(new MessageBuilder().setNode(nodeRef).setBundleInnerMessage(
236 new BundleRemoveGroupCaseBuilder()
237 .setRemoveGroupCaseData(new RemoveGroupCaseDataBuilder(getDeleteAllGroup()).build())
239 LOG.debug("The size of the flows and group messages created in createMessage() {}", messages.size());
240 return new MessagesBuilder().setMessage(messages).build();
243 private Flow getDeleteAllFlow() {
244 final FlowBuilder flowBuilder = new FlowBuilder();
245 flowBuilder.setTableId(OFConstants.OFPTT_ALL);
246 return flowBuilder.build();
249 private Group getDeleteAllGroup() {
250 final GroupBuilder groupBuilder = new GroupBuilder();
251 groupBuilder.setGroupType(GroupTypes.GroupAll);
252 groupBuilder.setGroupId(new GroupId(OFConstants.OFPG_ALL));
253 return groupBuilder.build();
256 private class ArbitratorReconciliationTask implements Callable<Boolean> {
257 final DeviceInfo deviceInfo;
259 ArbitratorReconciliationTask(final DeviceInfo deviceInfo) {
260 this.deviceInfo = deviceInfo;
264 public Boolean call() {
265 InstanceIdentifier<FlowCapableNode> nodeIdentity = deviceInfo.getNodeInstanceIdentifier()
266 .augmentation(FlowCapableNode.class);
267 String node = nodeIdentity.firstKeyOf(Node.class).getId().getValue();
268 BundleId bundleIdValue = new BundleId(BUNDLE_ID.getAndIncrement());
269 LOG.debug("Triggering arbitrator reconciliation for device :{}", node);
270 final NodeRef nodeRef = new NodeRef(nodeIdentity.firstIdentifierOf(Node.class));
272 final ControlBundleInput closeBundleInput = new ControlBundleInputBuilder().setNode(nodeRef)
273 .setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
274 .setType(BundleControlType.ONFBCTCLOSEREQUEST).build();
276 final ControlBundleInput openBundleInput = new ControlBundleInputBuilder().setNode(nodeRef)
277 .setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
278 .setType(BundleControlType.ONFBCTOPENREQUEST).build();
280 final AddBundleMessagesInput addBundleMessagesInput = new AddBundleMessagesInputBuilder()
281 .setNode(nodeRef).setBundleId(bundleIdValue).setFlags(BUNDLE_FLAGS)
282 .setMessages(createMessages(nodeRef)).build();
284 ListenableFuture<RpcResult<ControlBundleOutput>> closeBundle = salBundleService
285 .controlBundle(closeBundleInput);
287 ListenableFuture<RpcResult<ControlBundleOutput>> openBundleMessagesFuture = Futures
288 .transformAsync(closeBundle, rpcResult -> salBundleService
289 .controlBundle(openBundleInput), MoreExecutors.directExecutor());
291 ListenableFuture<RpcResult<AddBundleMessagesOutput>> addBundleMessagesFuture = Futures
292 .transformAsync(openBundleMessagesFuture, rpcResult -> {
293 if (rpcResult.isSuccessful()) {
294 return salBundleService
295 .addBundleMessages(addBundleMessagesInput);
297 return Futures.immediateFuture(null);
298 }, MoreExecutors.directExecutor());
299 BigInteger nodeId = getDpnIdFromNodeName(node);
301 if (addBundleMessagesFuture.get().isSuccessful()) {
302 bundleIdMap.put(nodeId, new BundleDetails(bundleIdValue,
303 Futures.immediateFuture(null)));
304 LOG.debug("Arbitrator reconciliation initial task has been completed for node {} and open up"
305 + " for application programming.", nodeId);
308 LOG.error("Error while performing arbitrator reconciliation for device:{}", nodeId);
311 } catch (InterruptedException | ExecutionException e) {
312 LOG.error("Error while performing arbitrator reconciliation for device:{}", nodeId, e);
318 public final class CommitActiveBundleCallback implements FutureCallback<RpcResult<?>> {
319 private final BigInteger nodeId;
321 private CommitActiveBundleCallback(final BigInteger nodeId) {
322 this.nodeId = nodeId;
326 public void onSuccess(RpcResult<?> rpcResult) {
327 LOG.debug("Completed arbitrator reconciliation for device:{}", nodeId);
328 bundleIdMap.remove(nodeId);
332 public void onFailure(Throwable throwable) {
333 LOG.error("Error while performing arbitrator reconciliation for device {}",
338 private <D> Function<RpcResult<D>,
339 RpcResult<CommitActiveBundleOutput>> createRpcResultCondenser(final String action) {
341 final RpcResultBuilder<CommitActiveBundleOutput> resultSink;
343 List<RpcError> errors = new ArrayList<>();
344 if (!input.isSuccessful()) {
345 errors.addAll(input.getErrors());
346 resultSink = RpcResultBuilder.<CommitActiveBundleOutput>failed().withRpcErrors(errors);
348 resultSink = RpcResultBuilder.success();
351 resultSink = RpcResultBuilder.<CommitActiveBundleOutput>failed()
352 .withError(RpcError.ErrorType.APPLICATION, "action of " + action + " failed");
354 return resultSink.build();
358 private void registerRpc(DeviceInfo node) {
359 KeyedInstanceIdentifier<Node, NodeKey> path = InstanceIdentifier.create(Nodes.class)
360 .child(Node.class, new NodeKey(node.getNodeId()));
361 LOG.debug("The path is registered : {}", path);
362 routedRpcReg.registerPath(NodeContext.class, path);
365 private void deregisterRpc(DeviceInfo node) {
366 KeyedInstanceIdentifier<Node, NodeKey> path = InstanceIdentifier.create(Nodes.class).child(Node.class,
367 new NodeKey(node.getNodeId()));
368 LOG.debug("The path is unregistered : {}", path);
369 routedRpcReg.unregisterPath(NodeContext.class, path);
372 private static class BundleDetails {
373 private final BundleId bundleId;
374 private final ListenableFuture<RpcResult<ControlBundleOutput>> result;
376 BundleDetails(BundleId bundleId, ListenableFuture<RpcResult<ControlBundleOutput>> result) {
377 this.bundleId = bundleId;
378 this.result = result;
381 public BundleId getBundleId() {
385 public ListenableFuture<RpcResult<ControlBundleOutput>> getResult() {
390 private BigInteger getDpnIdFromNodeName(String nodeName) {
391 String dpnId = nodeName.substring(nodeName.lastIndexOf(SEPARATOR) + 1);
392 return new BigInteger(dpnId);