BUG 7802: split out shard creation from produce transactions 95/53895/10
authorTomas Cere <tcere@cisco.com>
Mon, 27 Mar 2017 11:16:41 +0000 (13:16 +0200)
committerTom Pantelis <tompantelis@gmail.com>
Sat, 1 Apr 2017 08:39:34 +0000 (08:39 +0000)
Change-Id: I33fa46791a6c80477f57badf3bd44c3d6c5a2f9e
Signed-off-by: Tomas Cere <tcere@cisco.com>
opendaylight/md-sal/samples/clustering-test-app/model/src/main/yang/odl-mdsal-lowlevel-control.yang
opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/MdsalLowLevelTestProvider.java
opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/PrefixShardHandler.java [new file with mode: 0644]

index 302fbbd3626e6e4ac80f0510d70acb7b5c3582d1..a10b3ce628154943f0a98d9f8adc396e3ff29ebe 100644 (file)
@@ -340,31 +340,44 @@ module odl-mdsal-lowlevel-control {
         }
     }
 
-    rpc become-prefix-leader {
-        description "Upon receiving this, the member shall ask the appropriate API
-            to become Leader of the given shard (presumably the llt:list-ints one,
-            created by produce-transactions) and return immediatelly.";
+    rpc create-prefix-shard {
+        description "Upon receiving this, the member creates a prefix shard at the instance-identifier, with replicas
+                on the required members.";
         input {
-            leaf shard-name {
-                description "TBD.
 
-                FIXME: Ask Java implementation developer about the format needed.";
+            leaf prefix {
                 mandatory true;
+                type instance-identifier;
+            }
+            leaf-list replicas {
+                min-elements 1;
                 type string;
             }
         }
-        // No output.
     }
 
-    rpc become-module-leader {
-        description "Upon receiving this, the member shall ask appropriate API
-            to become Leader of given config shard and return immediatelly.";
+    rpc remove-prefix-shard {
+        description "Upon receiving this, the member removes the prefix based shard identifier by this prefix.
+                This must be called from the same node that created the shard.";
+
+        input {
+            leaf prefix {
+                mandatory true;
+                type instance-identifier;
+            }
+        }
+    }
+
+
+    rpc become-prefix-leader {
+        description "Upon receiving this, the member shall ask the appropriate API
+            to become Leader of the given shard (presumably the llt:list-ints one,
+            created by produce-transactions) and return immediatelly.";
         input {
             leaf shard-name {
                 description "TBD.
 
-                FIXME: Ask Java implementation developer about the format needed.
-                TODO: Perhaps the names are compatible and one 'become-leader' would suffice?";
+                FIXME: Ask Java implementation developer about the format needed.";
                 mandatory true;
                 type string;
             }
index f47a66ee15c8f15c40f585d992e2cd1e7df89295..d653a2c30b851402ec992d12ae0c4a3e61551a26 100644 (file)
@@ -18,6 +18,7 @@ import java.util.concurrent.Future;
 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory;
 import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService;
 import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService;
+import org.opendaylight.controller.clustering.it.provider.impl.PrefixShardHandler;
 import org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler;
 import org.opendaylight.controller.clustering.it.provider.impl.PublishNotificationsTask;
 import org.opendaylight.controller.clustering.it.provider.impl.RoutedGetConstantService;
@@ -37,11 +38,11 @@ import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaInput;
-import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomeModuleLeaderInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutputBuilder;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedOutput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.OdlMdsalLowlevelControlService;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput;
@@ -50,6 +51,7 @@ import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.l
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsInput;
 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput;
@@ -86,6 +88,7 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
     private final SchemaService schemaService;
     private final ClusterSingletonServiceProvider singletonService;
     private final DOMRpcProviderService domRpcService;
+    private final PrefixShardHandler prefixShardHandler;
 
     private Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>> routedRegistrations =
             new HashMap<>();
@@ -119,6 +122,8 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
         this.distributedShardFactory = distributedShardFactory;
 
         registration = rpcRegistry.addRpcImplementation(OdlMdsalLowlevelControlService.class, this);
+
+        prefixShardHandler = new PrefixShardHandler(distributedShardFactory, bindingNormalizedNodeSerializer);
     }
 
     @Override
@@ -183,11 +188,6 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
         return null;
     }
 
-    @Override
-    public Future<RpcResult<Void>> becomeModuleLeader(BecomeModuleLeaderInput input) {
-        return null;
-    }
-
     @Override
     public Future<RpcResult<Void>> removeShardReplica(RemoveShardReplicaInput input) {
         return null;
@@ -211,7 +211,14 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
     }
 
     @Override
-    public Future<RpcResult<Void>> becomePrefixLeader(BecomePrefixLeaderInput input) {
+    public Future<RpcResult<Void>> removePrefixShard(final RemovePrefixShardInput input) {
+        LOG.debug("remove-prefix-shard, input: {}", input);
+
+        return prefixShardHandler.onRemovePrefixShard(input);
+    }
+
+    @Override
+    public Future<RpcResult<Void>> becomePrefixLeader(final BecomePrefixLeaderInput input) {
         return null;
     }
 
@@ -353,6 +360,13 @@ public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService
         return null;
     }
 
+    @Override
+    public Future<RpcResult<Void>> createPrefixShard(final CreatePrefixShardInput input) {
+        LOG.debug("create-prefix-shard, input: {}", input);
+
+        return prefixShardHandler.onCreatePrefixShard(input);
+    }
+
     @Override
     public Future<RpcResult<Void>> deconfigureIdIntsShard() {
         return null;
diff --git a/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/PrefixShardHandler.java b/opendaylight/md-sal/samples/clustering-test-app/provider/src/main/java/org/opendaylight/controller/clustering/it/provider/impl/PrefixShardHandler.java
new file mode 100644 (file)
index 0000000..22e7700
--- /dev/null
@@ -0,0 +1,114 @@
+/*
+ * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.clustering.it.provider.impl;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletionStage;
+import java.util.stream.Collectors;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.controller.cluster.sharding.DistributedShardFactory;
+import org.opendaylight.controller.cluster.sharding.DistributedShardFactory.DistributedShardRegistration;
+import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardInput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PrefixShardHandler {
+
+    private static final Logger LOG = LoggerFactory.getLogger(PrefixShardHandler.class);
+
+    private final DistributedShardFactory shardFactory;
+    private final BindingNormalizedNodeSerializer serializer;
+
+    private final Map<YangInstanceIdentifier, DistributedShardRegistration> registrations =
+            Collections.synchronizedMap(new HashMap<>());
+
+    public PrefixShardHandler(final DistributedShardFactory shardFactory,
+                              final BindingNormalizedNodeSerializer serializer) {
+
+        this.shardFactory = shardFactory;
+        this.serializer = serializer;
+    }
+
+    public ListenableFuture<RpcResult<Void>> onCreatePrefixShard(final CreatePrefixShardInput input) {
+
+        final SettableFuture<RpcResult<Void>> future = SettableFuture.create();
+
+        final CompletionStage<DistributedShardRegistration> completionStage;
+        final YangInstanceIdentifier identifier = serializer.toYangInstanceIdentifier(input.getPrefix());
+
+        try {
+             completionStage = shardFactory.createDistributedShard(
+                    new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, identifier),
+                    input.getReplicas().stream().map(MemberName::forName).collect(Collectors.toList()));
+
+            completionStage.thenAccept(registration -> {
+                LOG.debug("Shard[{}] created successfully.", identifier);
+                registrations.put(identifier, registration);
+                future.set(RpcResultBuilder.<Void>success().build());
+            });
+            completionStage.exceptionally(throwable -> {
+                LOG.warn("Shard[{}] creation failed:", identifier, throwable);
+
+                final RpcError error = RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, "create-shard-failed",
+                        "Shard creation failed", "cluster-test-app", "", throwable);
+                future.set(RpcResultBuilder.<Void>failed().withRpcError(error).build());
+                return null;
+            });
+        } catch (final DOMDataTreeShardingConflictException e) {
+            LOG.warn("Unable to register shard for: {}.", identifier);
+
+            final RpcError error = RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, "create-shard-failed",
+                    "Sharding conflict", "cluster-test-app", "", e);
+            future.set(RpcResultBuilder.<Void>failed().withRpcError(error).build());
+        }
+
+        return future;
+    }
+
+    public ListenableFuture<RpcResult<Void>> onRemovePrefixShard(final RemovePrefixShardInput input) {
+
+        final YangInstanceIdentifier identifier = serializer.toYangInstanceIdentifier(input.getPrefix());
+        final DistributedShardRegistration registration = registrations.get(identifier);
+
+        if (registration == null) {
+            final RpcError error = RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, "registration-missing",
+                    "No shard registered at this prefix.");
+            return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
+        }
+
+        final SettableFuture<RpcResult<Void>> future = SettableFuture.create();
+
+        final CompletionStage<Void> close = registration.close();
+        close.thenRun(() -> future.set(RpcResultBuilder.<Void>success().build()));
+        close.exceptionally(throwable -> {
+            LOG.warn("Shard[{}] removal failed:", identifier, throwable);
+
+            final RpcError error = RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, "remove-shard-failed",
+                    "Shard removal failed", "cluster-test-app", "", throwable);
+            future.set(RpcResultBuilder.<Void>failed().withRpcError(error).build());
+            return null;
+        });
+
+        return future;
+    }
+}