}
}
- rpc become-prefix-leader {
- description "Upon receiving this, the member shall ask the appropriate API
- to become Leader of the given shard (presumably the llt:list-ints one,
- created by produce-transactions) and return immediatelly.";
+ rpc create-prefix-shard {
+ description "Upon receiving this, the member creates a prefix shard at the instance-identifier, with replicas
+ on the required members.";
input {
- leaf shard-name {
- description "TBD.
- FIXME: Ask Java implementation developer about the format needed.";
+ leaf prefix {
mandatory true;
+ type instance-identifier;
+ }
+ leaf-list replicas {
+ min-elements 1;
type string;
}
}
- // No output.
}
- rpc become-module-leader {
- description "Upon receiving this, the member shall ask appropriate API
- to become Leader of given config shard and return immediatelly.";
+ rpc remove-prefix-shard {
+ description "Upon receiving this, the member removes the prefix based shard identifier by this prefix.
+ This must be called from the same node that created the shard.";
+
+ input {
+ leaf prefix {
+ mandatory true;
+ type instance-identifier;
+ }
+ }
+ }
+
+
+ rpc become-prefix-leader {
+ description "Upon receiving this, the member shall ask the appropriate API
+ to become Leader of the given shard (presumably the llt:list-ints one,
+ created by produce-transactions) and return immediatelly.";
input {
leaf shard-name {
description "TBD.
- FIXME: Ask Java implementation developer about the format needed.
- TODO: Perhaps the names are compatible and one 'become-leader' would suffice?";
+ FIXME: Ask Java implementation developer about the format needed.";
mandatory true;
type string;
}
import org.opendaylight.controller.cluster.sharding.DistributedShardFactory;
import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService;
import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService;
+import org.opendaylight.controller.clustering.it.provider.impl.PrefixShardHandler;
import org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler;
import org.opendaylight.controller.clustering.it.provider.impl.PublishNotificationsTask;
import org.opendaylight.controller.clustering.it.provider.impl.RoutedGetConstantService;
import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaInput;
-import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomeModuleLeaderInput;
import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderInput;
import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsInput;
import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutput;
import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutputBuilder;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardInput;
import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedOutput;
import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.OdlMdsalLowlevelControlService;
import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput;
import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantInput;
import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantInput;
import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput;
import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput;
import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsInput;
import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput;
private final SchemaService schemaService;
private final ClusterSingletonServiceProvider singletonService;
private final DOMRpcProviderService domRpcService;
+ private final PrefixShardHandler prefixShardHandler;
private Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>> routedRegistrations =
new HashMap<>();
this.distributedShardFactory = distributedShardFactory;
registration = rpcRegistry.addRpcImplementation(OdlMdsalLowlevelControlService.class, this);
+
+ prefixShardHandler = new PrefixShardHandler(distributedShardFactory, bindingNormalizedNodeSerializer);
}
@Override
return null;
}
- @Override
- public Future<RpcResult<Void>> becomeModuleLeader(BecomeModuleLeaderInput input) {
- return null;
- }
-
@Override
public Future<RpcResult<Void>> removeShardReplica(RemoveShardReplicaInput input) {
return null;
}
@Override
- public Future<RpcResult<Void>> becomePrefixLeader(BecomePrefixLeaderInput input) {
+ public Future<RpcResult<Void>> removePrefixShard(final RemovePrefixShardInput input) {
+ LOG.debug("remove-prefix-shard, input: {}", input);
+
+ return prefixShardHandler.onRemovePrefixShard(input);
+ }
+
+ @Override
+ public Future<RpcResult<Void>> becomePrefixLeader(final BecomePrefixLeaderInput input) {
return null;
}
return null;
}
+ @Override
+ public Future<RpcResult<Void>> createPrefixShard(final CreatePrefixShardInput input) {
+ LOG.debug("create-prefix-shard, input: {}", input);
+
+ return prefixShardHandler.onCreatePrefixShard(input);
+ }
+
@Override
public Future<RpcResult<Void>> deconfigureIdIntsShard() {
return null;
--- /dev/null
+/*
+ * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.clustering.it.provider.impl;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletionStage;
+import java.util.stream.Collectors;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.controller.cluster.sharding.DistributedShardFactory;
+import org.opendaylight.controller.cluster.sharding.DistributedShardFactory.DistributedShardRegistration;
+import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardInput;
+import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PrefixShardHandler {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PrefixShardHandler.class);
+
+ private final DistributedShardFactory shardFactory;
+ private final BindingNormalizedNodeSerializer serializer;
+
+ private final Map<YangInstanceIdentifier, DistributedShardRegistration> registrations =
+ Collections.synchronizedMap(new HashMap<>());
+
+ public PrefixShardHandler(final DistributedShardFactory shardFactory,
+ final BindingNormalizedNodeSerializer serializer) {
+
+ this.shardFactory = shardFactory;
+ this.serializer = serializer;
+ }
+
+ public ListenableFuture<RpcResult<Void>> onCreatePrefixShard(final CreatePrefixShardInput input) {
+
+ final SettableFuture<RpcResult<Void>> future = SettableFuture.create();
+
+ final CompletionStage<DistributedShardRegistration> completionStage;
+ final YangInstanceIdentifier identifier = serializer.toYangInstanceIdentifier(input.getPrefix());
+
+ try {
+ completionStage = shardFactory.createDistributedShard(
+ new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, identifier),
+ input.getReplicas().stream().map(MemberName::forName).collect(Collectors.toList()));
+
+ completionStage.thenAccept(registration -> {
+ LOG.debug("Shard[{}] created successfully.", identifier);
+ registrations.put(identifier, registration);
+ future.set(RpcResultBuilder.<Void>success().build());
+ });
+ completionStage.exceptionally(throwable -> {
+ LOG.warn("Shard[{}] creation failed:", identifier, throwable);
+
+ final RpcError error = RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, "create-shard-failed",
+ "Shard creation failed", "cluster-test-app", "", throwable);
+ future.set(RpcResultBuilder.<Void>failed().withRpcError(error).build());
+ return null;
+ });
+ } catch (final DOMDataTreeShardingConflictException e) {
+ LOG.warn("Unable to register shard for: {}.", identifier);
+
+ final RpcError error = RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, "create-shard-failed",
+ "Sharding conflict", "cluster-test-app", "", e);
+ future.set(RpcResultBuilder.<Void>failed().withRpcError(error).build());
+ }
+
+ return future;
+ }
+
+ public ListenableFuture<RpcResult<Void>> onRemovePrefixShard(final RemovePrefixShardInput input) {
+
+ final YangInstanceIdentifier identifier = serializer.toYangInstanceIdentifier(input.getPrefix());
+ final DistributedShardRegistration registration = registrations.get(identifier);
+
+ if (registration == null) {
+ final RpcError error = RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, "registration-missing",
+ "No shard registered at this prefix.");
+ return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
+ }
+
+ final SettableFuture<RpcResult<Void>> future = SettableFuture.create();
+
+ final CompletionStage<Void> close = registration.close();
+ close.thenRun(() -> future.set(RpcResultBuilder.<Void>success().build()));
+ close.exceptionally(throwable -> {
+ LOG.warn("Shard[{}] removal failed:", identifier, throwable);
+
+ final RpcError error = RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, "remove-shard-failed",
+ "Shard removal failed", "cluster-test-app", "", throwable);
+ future.set(RpcResultBuilder.<Void>failed().withRpcError(error).build());
+ return null;
+ });
+
+ return future;
+ }
+}