From ec65758987d4232409bdd1fd9f71325392720894 Mon Sep 17 00:00:00 2001 From: Tomas Cere Date: Mon, 27 Mar 2017 13:16:41 +0200 Subject: [PATCH] BUG 7802: split out shard creation from produce transactions Change-Id: I33fa46791a6c80477f57badf3bd44c3d6c5a2f9e Signed-off-by: Tomas Cere --- .../main/yang/odl-mdsal-lowlevel-control.yang | 39 ++++-- .../provider/MdsalLowLevelTestProvider.java | 28 +++-- .../it/provider/impl/PrefixShardHandler.java | 114 ++++++++++++++++++ 3 files changed, 161 insertions(+), 20 deletions(-) create mode 100644 opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/PrefixShardHandler.java diff --git a/opendaylight/md-sal/samples/clustering-test-app/model/src/main/yang/odl-mdsal-lowlevel-control.yang b/opendaylight/md-sal/samples/clustering-test-app/model/src/main/yang/odl-mdsal-lowlevel-control.yang index 302fbbd362..a10b3ce628 100644 --- a/opendaylight/md-sal/samples/clustering-test-app/model/src/main/yang/odl-mdsal-lowlevel-control.yang +++ b/opendaylight/md-sal/samples/clustering-test-app/model/src/main/yang/odl-mdsal-lowlevel-control.yang @@ -340,31 +340,44 @@ module odl-mdsal-lowlevel-control { } } - rpc become-prefix-leader { - description "Upon receiving this, the member shall ask the appropriate API - to become Leader of the given shard (presumably the llt:list-ints one, - created by produce-transactions) and return immediatelly."; + rpc create-prefix-shard { + description "Upon receiving this, the member creates a prefix shard at the instance-identifier, with replicas + on the required members."; input { - leaf shard-name { - description "TBD. - FIXME: Ask Java implementation developer about the format needed."; + leaf prefix { mandatory true; + type instance-identifier; + } + leaf-list replicas { + min-elements 1; type string; } } - // No output. } - rpc become-module-leader { - description "Upon receiving this, the member shall ask appropriate API - to become Leader of given config shard and return immediatelly."; + rpc remove-prefix-shard { + description "Upon receiving this, the member removes the prefix based shard identifier by this prefix. + This must be called from the same node that created the shard."; + + input { + leaf prefix { + mandatory true; + type instance-identifier; + } + } + } + + + rpc become-prefix-leader { + description "Upon receiving this, the member shall ask the appropriate API + to become Leader of the given shard (presumably the llt:list-ints one, + created by produce-transactions) and return immediatelly."; input { leaf shard-name { description "TBD. - FIXME: Ask Java implementation developer about the format needed. - TODO: Perhaps the names are compatible and one 'become-leader' would suffice?"; + FIXME: Ask Java implementation developer about the format needed."; mandatory true; type string; } diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/MdsalLowLevelTestProvider.java b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/MdsalLowLevelTestProvider.java index f47a66ee15..d653a2c30b 100644 --- a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/MdsalLowLevelTestProvider.java +++ b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/MdsalLowLevelTestProvider.java @@ -18,6 +18,7 @@ import java.util.concurrent.Future; import org.opendaylight.controller.cluster.sharding.DistributedShardFactory; import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService; import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService; +import org.opendaylight.controller.clustering.it.provider.impl.PrefixShardHandler; import org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler; import org.opendaylight.controller.clustering.it.provider.impl.PublishNotificationsTask; import org.opendaylight.controller.clustering.it.provider.impl.RoutedGetConstantService; @@ -37,11 +38,11 @@ import org.opendaylight.mdsal.dom.api.DOMDataTreeService; import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider; import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaInput; -import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomeModuleLeaderInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutputBuilder; +import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedOutput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.OdlMdsalLowlevelControlService; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput; @@ -50,6 +51,7 @@ import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.l import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput; +import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput; @@ -86,6 +88,7 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService private final SchemaService schemaService; private final ClusterSingletonServiceProvider singletonService; private final DOMRpcProviderService domRpcService; + private final PrefixShardHandler prefixShardHandler; private Map, DOMRpcImplementationRegistration> routedRegistrations = new HashMap<>(); @@ -119,6 +122,8 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService this.distributedShardFactory = distributedShardFactory; registration = rpcRegistry.addRpcImplementation(OdlMdsalLowlevelControlService.class, this); + + prefixShardHandler = new PrefixShardHandler(distributedShardFactory, bindingNormalizedNodeSerializer); } @Override @@ -183,11 +188,6 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService return null; } - @Override - public Future> becomeModuleLeader(BecomeModuleLeaderInput input) { - return null; - } - @Override public Future> removeShardReplica(RemoveShardReplicaInput input) { return null; @@ -211,7 +211,14 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService } @Override - public Future> becomePrefixLeader(BecomePrefixLeaderInput input) { + public Future> removePrefixShard(final RemovePrefixShardInput input) { + LOG.debug("remove-prefix-shard, input: {}", input); + + return prefixShardHandler.onRemovePrefixShard(input); + } + + @Override + public Future> becomePrefixLeader(final BecomePrefixLeaderInput input) { return null; } @@ -353,6 +360,13 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService return null; } + @Override + public Future> createPrefixShard(final CreatePrefixShardInput input) { + LOG.debug("create-prefix-shard, input: {}", input); + + return prefixShardHandler.onCreatePrefixShard(input); + } + @Override public Future> deconfigureIdIntsShard() { return null; diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/PrefixShardHandler.java b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/PrefixShardHandler.java new file mode 100644 index 0000000000..22e7700d5c --- /dev/null +++ b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/PrefixShardHandler.java @@ -0,0 +1,114 @@ +/* + * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.clustering.it.provider.impl; + +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletionStage; +import java.util.stream.Collectors; +import org.opendaylight.controller.cluster.access.concepts.MemberName; +import org.opendaylight.controller.cluster.sharding.DistributedShardFactory; +import org.opendaylight.controller.cluster.sharding.DistributedShardFactory.DistributedShardRegistration; +import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; +import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException; +import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardInput; +import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput; +import org.opendaylight.yangtools.yang.common.RpcError; +import org.opendaylight.yangtools.yang.common.RpcResult; +import org.opendaylight.yangtools.yang.common.RpcResultBuilder; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PrefixShardHandler { + + private static final Logger LOG = LoggerFactory.getLogger(PrefixShardHandler.class); + + private final DistributedShardFactory shardFactory; + private final BindingNormalizedNodeSerializer serializer; + + private final Map registrations = + Collections.synchronizedMap(new HashMap<>()); + + public PrefixShardHandler(final DistributedShardFactory shardFactory, + final BindingNormalizedNodeSerializer serializer) { + + this.shardFactory = shardFactory; + this.serializer = serializer; + } + + public ListenableFuture> onCreatePrefixShard(final CreatePrefixShardInput input) { + + final SettableFuture> future = SettableFuture.create(); + + final CompletionStage completionStage; + final YangInstanceIdentifier identifier = serializer.toYangInstanceIdentifier(input.getPrefix()); + + try { + completionStage = shardFactory.createDistributedShard( + new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, identifier), + input.getReplicas().stream().map(MemberName::forName).collect(Collectors.toList())); + + completionStage.thenAccept(registration -> { + LOG.debug("Shard[{}] created successfully.", identifier); + registrations.put(identifier, registration); + future.set(RpcResultBuilder.success().build()); + }); + completionStage.exceptionally(throwable -> { + LOG.warn("Shard[{}] creation failed:", identifier, throwable); + + final RpcError error = RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, "create-shard-failed", + "Shard creation failed", "cluster-test-app", "", throwable); + future.set(RpcResultBuilder.failed().withRpcError(error).build()); + return null; + }); + } catch (final DOMDataTreeShardingConflictException e) { + LOG.warn("Unable to register shard for: {}.", identifier); + + final RpcError error = RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, "create-shard-failed", + "Sharding conflict", "cluster-test-app", "", e); + future.set(RpcResultBuilder.failed().withRpcError(error).build()); + } + + return future; + } + + public ListenableFuture> onRemovePrefixShard(final RemovePrefixShardInput input) { + + final YangInstanceIdentifier identifier = serializer.toYangInstanceIdentifier(input.getPrefix()); + final DistributedShardRegistration registration = registrations.get(identifier); + + if (registration == null) { + final RpcError error = RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, "registration-missing", + "No shard registered at this prefix."); + return Futures.immediateFuture(RpcResultBuilder.failed().withRpcError(error).build()); + } + + final SettableFuture> future = SettableFuture.create(); + + final CompletionStage close = registration.close(); + close.thenRun(() -> future.set(RpcResultBuilder.success().build())); + close.exceptionally(throwable -> { + LOG.warn("Shard[{}] removal failed:", identifier, throwable); + + final RpcError error = RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, "remove-shard-failed", + "Shard removal failed", "cluster-test-app", "", throwable); + future.set(RpcResultBuilder.failed().withRpcError(error).build()); + return null; + }); + + return future; + } +} -- 2.36.6