2 * Copyright (c) 2018 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.applications.arbitratorreconciliation.impl;
10 import static java.util.Objects.requireNonNull;
12 import com.google.common.base.Function;
13 import com.google.common.collect.ImmutableClassToInstanceMap;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import java.util.ArrayList;
19 import java.util.List;
22 import java.util.concurrent.Callable;
23 import java.util.concurrent.ConcurrentHashMap;
24 import java.util.concurrent.ConcurrentMap;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.atomic.AtomicLong;
29 import javax.annotation.PreDestroy;
30 import javax.inject.Inject;
31 import javax.inject.Singleton;
32 import org.eclipse.jdt.annotation.NonNullByDefault;
33 import org.opendaylight.mdsal.binding.api.RpcConsumerRegistry;
34 import org.opendaylight.mdsal.binding.api.RpcProviderService;
35 import org.opendaylight.openflowplugin.api.OFConstants;
36 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
37 import org.opendaylight.openflowplugin.applications.reconciliation.NotificationRegistration;
38 import org.opendaylight.openflowplugin.applications.reconciliation.ReconciliationManager;
39 import org.opendaylight.openflowplugin.applications.reconciliation.ReconciliationNotificationListener;
40 import org.opendaylight.serviceutils.upgrade.UpgradeState;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupId;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupTypes;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupBuilder;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessages;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesInputBuilder;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundle;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundleInputBuilder;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundleOutput;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.Messages;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.MessagesBuilder;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.MessageBuilder;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveFlowCase;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveFlowCaseBuilder;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveGroupCase;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveGroupCaseBuilder;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.remove.flow._case.RemoveFlowCaseDataBuilder;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.remove.group._case.RemoveGroupCaseDataBuilder;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleControlType;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleFlags;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleId;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.CommitActiveBundle;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.CommitActiveBundleInput;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.CommitActiveBundleOutput;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.CommitActiveBundleOutputBuilder;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundle;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleInput;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleOutput;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.GetActiveBundleOutputBuilder;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.rf.state.rev170713.ResultState;
75 import org.opendaylight.yangtools.concepts.Registration;
76 import org.opendaylight.yangtools.util.concurrent.FluentFutures;
77 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
78 import org.opendaylight.yangtools.yang.binding.Rpc;
79 import org.opendaylight.yangtools.yang.common.ErrorType;
80 import org.opendaylight.yangtools.yang.common.RpcError;
81 import org.opendaylight.yangtools.yang.common.RpcResult;
82 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
83 import org.opendaylight.yangtools.yang.common.Uint32;
84 import org.opendaylight.yangtools.yang.common.Uint64;
85 import org.osgi.service.component.annotations.Activate;
86 import org.osgi.service.component.annotations.Component;
87 import org.osgi.service.component.annotations.Deactivate;
88 import org.osgi.service.component.annotations.Reference;
89 import org.slf4j.Logger;
90 import org.slf4j.LoggerFactory;
93 @Component(service = { })
94 public final class ArbitratorReconciliationManagerImpl implements ReconciliationNotificationListener, AutoCloseable {
95 private static final Logger LOG = LoggerFactory.getLogger(ArbitratorReconciliationManagerImpl.class);
96 private static final AtomicLong BUNDLE_ID = new AtomicLong();
97 private static final BundleFlags BUNDLE_FLAGS = new BundleFlags(true, true);
98 private static final String SERVICE_NAME = "ArbitratorReconciliationManager";
99 private static final String SEPARATOR = ":";
101 private static final BundleRemoveFlowCase DELETE_ALL_FLOW = new BundleRemoveFlowCaseBuilder()
102 .setRemoveFlowCaseData(new RemoveFlowCaseDataBuilder()
103 .setTableId(OFConstants.OFPTT_ALL)
106 private static final BundleRemoveGroupCase DELETE_ALL_GROUP = new BundleRemoveGroupCaseBuilder()
107 .setRemoveGroupCaseData(new RemoveGroupCaseDataBuilder(new GroupBuilder()
108 .setGroupType(GroupTypes.GroupAll)
109 .setGroupId(new GroupId(OFConstants.OFPG_ALL))
113 // FIXME: use CM to control this constant
114 private static final int THREAD_POOL_SIZE = 4;
115 // FIXME: use CM to control this constant
116 private static final int ARBITRATOR_RECONCILIATION_PRIORITY =
117 Integer.getInteger("arbitrator.reconciliation.manager.priority", 0 /*default*/);
119 private final ExecutorService executor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
120 private final Map<Uint64, BundleDetails> bundleIdMap = new ConcurrentHashMap<>();
121 private final ConcurrentMap<String, Registration> rpcRegistrations = new ConcurrentHashMap<>();
123 private final RpcProviderService rpcProviderService;
124 private final UpgradeState upgradeState;
125 private final NotificationRegistration registration;
126 private final AddBundleMessages addBundleMessages;
127 private final ControlBundle controlBundle;
131 public ArbitratorReconciliationManagerImpl(@Reference final ReconciliationManager reconciliationManager,
132 @Reference final RpcProviderService rpcProviderService, @Reference final RpcConsumerRegistry rpcService,
133 @Reference final UpgradeState upgradeState) {
134 this.rpcProviderService = requireNonNull(rpcProviderService);
135 this.upgradeState = requireNonNull(upgradeState);
136 addBundleMessages = requireNonNull(rpcService.getRpc(AddBundleMessages.class));
137 controlBundle = requireNonNull(rpcService.getRpc(ControlBundle.class));
138 registration = reconciliationManager.registerService(this);
139 LOG.info("ArbitratorReconciliationManager has started successfully.");
145 public void close() throws Exception {
147 registration.close();
150 private ListenableFuture<RpcResult<CommitActiveBundleOutput>> commitActiveBundle(
151 final CommitActiveBundleInput input) {
152 final var nodeId = input.getNodeId();
153 final var details = bundleIdMap.get(nodeId);
154 if (details != null) {
155 final var rpcResult = controlBundle.invoke(new ControlBundleInputBuilder()
156 .setNode(input.getNode())
157 .setBundleId(details.bundleId)
158 .setFlags(BUNDLE_FLAGS)
159 .setType(BundleControlType.ONFBCTCOMMITREQUEST)
161 bundleIdMap.put(nodeId, new BundleDetails(details.bundleId, rpcResult));
163 Futures.addCallback(rpcResult, new CommitActiveBundleCallback(nodeId), MoreExecutors.directExecutor());
164 return Futures.transform(rpcResult, createRpcResultCondenser("committed active bundle"),
165 MoreExecutors.directExecutor());
167 return RpcResultBuilder.success(new CommitActiveBundleOutputBuilder()
168 .setResult(null).build())
169 .withRpcErrors(List.of(RpcResultBuilder.newError(ErrorType.APPLICATION,
170 null, "No active bundle found for the node" + nodeId))).buildFuture();
173 private ListenableFuture<RpcResult<GetActiveBundleOutput>> getActiveBundle(final GetActiveBundleInput input) {
174 Uint64 nodeId = input.getNodeId();
175 BundleDetails bundleDetails = bundleIdMap.get(nodeId);
176 if (bundleDetails != null) {
178 //This blocking call is used to prevent the applications from pushing flows and groups via the default
179 // pipeline when the commit bundle is ongoing.
180 bundleDetails.result.get();
181 return RpcResultBuilder.success(new GetActiveBundleOutputBuilder()
182 .setResult(bundleDetails.bundleId)
185 } catch (InterruptedException | ExecutionException e) {
186 return RpcResultBuilder.<GetActiveBundleOutput>failed()
187 .withRpcErrors(List.of(RpcResultBuilder.newError(ErrorType.APPLICATION,
188 null, e.getMessage()))).buildFuture();
191 return RpcResultBuilder.success(new GetActiveBundleOutputBuilder().setResult(null).build()).buildFuture();
195 public ListenableFuture<Boolean> startReconciliation(final DeviceInfo node) {
197 if (upgradeState.isUpgradeInProgress()) {
198 LOG.trace("Starting arbitrator reconciliation for node {}", node.getDatapathId());
199 return reconcileConfiguration(node);
201 LOG.trace("arbitrator reconciliation is disabled");
202 return FluentFutures.immediateTrueFluentFuture();
206 public ListenableFuture<Boolean> endReconciliation(final DeviceInfo node) {
207 Uint64 datapathId = node.getDatapathId();
208 LOG.trace("Stopping arbitrator reconciliation for node {}", datapathId);
209 bundleIdMap.remove(datapathId);
211 return FluentFutures.immediateTrueFluentFuture();
215 public int getPriority() {
216 return ARBITRATOR_RECONCILIATION_PRIORITY;
220 public String getName() {
225 public ResultState getResultState() {
226 return ResultState.DONOTHING;
230 private static Messages createMessages(final NodeRef nodeRef) {
231 final var messages = List.of(
232 new MessageBuilder().setNode(nodeRef).setBundleInnerMessage(DELETE_ALL_FLOW).build(),
233 new MessageBuilder().setNode(nodeRef).setBundleInnerMessage(DELETE_ALL_GROUP).build());
234 LOG.debug("The size of the flows and group messages created in createMessage() {}", messages.size());
235 return new MessagesBuilder().setMessage(messages).build();
238 private ListenableFuture<Boolean> reconcileConfiguration(final DeviceInfo node) {
239 LOG.info("Triggering arbitrator reconciliation for device {}", node.getDatapathId());
240 return Futures.submit(new ArbitratorReconciliationTask(node), executor);
243 private final class ArbitratorReconciliationTask implements Callable<Boolean> {
244 private final DeviceInfo deviceInfo;
246 ArbitratorReconciliationTask(final DeviceInfo deviceInfo) {
247 this.deviceInfo = requireNonNull(deviceInfo);
251 public Boolean call() {
252 InstanceIdentifier<FlowCapableNode> nodeIdentity = deviceInfo.getNodeInstanceIdentifier()
253 .augmentation(FlowCapableNode.class);
254 String node = nodeIdentity.firstKeyOf(Node.class).getId().getValue();
255 final var bundleIdValue = new BundleId(Uint32.valueOf(BUNDLE_ID.getAndIncrement()));
256 LOG.debug("Triggering arbitrator reconciliation for device :{}", node);
257 final NodeRef nodeRef = new NodeRef(nodeIdentity.firstIdentifierOf(Node.class));
259 final var openBundleMessagesFuture = Futures.transformAsync(
260 controlBundle.invoke(new ControlBundleInputBuilder()
262 .setBundleId(bundleIdValue)
263 .setFlags(BUNDLE_FLAGS)
264 .setType(BundleControlType.ONFBCTCLOSEREQUEST)
266 rpcResult -> controlBundle.invoke(new ControlBundleInputBuilder()
268 .setBundleId(bundleIdValue)
269 .setFlags(BUNDLE_FLAGS)
270 .setType(BundleControlType.ONFBCTOPENREQUEST)
271 .build()), MoreExecutors.directExecutor());
273 final var addBundleMessagesFuture = Futures.transformAsync(openBundleMessagesFuture,
274 rpcResult -> rpcResult.isSuccessful()
275 ? addBundleMessages.invoke(new AddBundleMessagesInputBuilder()
277 .setBundleId(bundleIdValue)
278 .setFlags(BUNDLE_FLAGS)
279 .setMessages(createMessages(nodeRef))
281 : FluentFutures.immediateNullFluentFuture(), MoreExecutors.directExecutor());
282 final var nodeId = getDpnIdFromNodeName(node);
284 if (addBundleMessagesFuture.get().isSuccessful()) {
285 bundleIdMap.put(nodeId, new BundleDetails(bundleIdValue,
286 FluentFutures.immediateNullFluentFuture()));
287 LOG.debug("Arbitrator reconciliation initial task has been completed for node {} ", nodeId);
290 LOG.error("Error while performing arbitrator reconciliation for device:{}", nodeId);
293 } catch (InterruptedException | ExecutionException e) {
294 LOG.error("Error while performing arbitrator reconciliation for device:{}", nodeId, e);
300 public final class CommitActiveBundleCallback implements FutureCallback<RpcResult<?>> {
301 private final Uint64 nodeId;
303 private CommitActiveBundleCallback(final Uint64 nodeId) {
304 this.nodeId = nodeId;
308 public void onSuccess(final RpcResult<?> rpcResult) {
309 LOG.debug("Completed arbitrator reconciliation for device:{}", nodeId);
310 bundleIdMap.remove(nodeId);
314 public void onFailure(final Throwable throwable) {
315 LOG.error("Error while performing arbitrator reconciliation for device {}", nodeId, throwable);
319 private static <D> Function<RpcResult<D>, RpcResult<CommitActiveBundleOutput>> createRpcResultCondenser(
320 final String action) {
322 final RpcResultBuilder<CommitActiveBundleOutput> resultSink;
324 final var errors = new ArrayList<RpcError>();
325 if (!input.isSuccessful()) {
326 errors.addAll(input.getErrors());
327 resultSink = RpcResultBuilder.<CommitActiveBundleOutput>failed().withRpcErrors(errors);
329 resultSink = RpcResultBuilder.success();
332 resultSink = RpcResultBuilder.<CommitActiveBundleOutput>failed()
333 .withError(ErrorType.APPLICATION, "action of " + action + " failed");
335 return resultSink.build();
339 private void registerRpc(final DeviceInfo node) {
340 final var path = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(node.getNodeId()));
341 LOG.debug("The path is registered : {}", path);
342 rpcRegistrations.put(node.getNodeId().getValue(), rpcProviderService.registerRpcImplementations(
343 ImmutableClassToInstanceMap.<Rpc<?, ?>>builder()
344 .put(GetActiveBundle.class, this::getActiveBundle)
345 .put(CommitActiveBundle.class, this::commitActiveBundle)
346 .build(), Set.of(path)));
349 private void deregisterRpc(final DeviceInfo node) {
350 final var path = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(node.getNodeId()));
351 LOG.debug("The path is unregistered : {}", path);
352 final var reg = rpcRegistrations.remove(node.getNodeId().getValue());
359 record BundleDetails(BundleId bundleId, ListenableFuture<RpcResult<ControlBundleOutput>> result) {
361 requireNonNull(bundleId);
362 requireNonNull(result);
366 private static Uint64 getDpnIdFromNodeName(final String nodeName) {
367 String dpnId = nodeName.substring(nodeName.lastIndexOf(SEPARATOR) + 1);
368 return Uint64.valueOf(dpnId);