described in the Raft paper.";
}
+ rpc add-prefix-shard-replica {
+ input {
+ leaf shard-prefix {
+ mandatory true;
+ type instance-identifier;
+ }
+
+ leaf data-store-type {
+ mandatory true;
+ type data-store-type;
+ description "The type of the data store to which the replica belongs";
+ }
+ }
+
+ description "Adds a replica of a shard to this node and joins it to an existing cluster. There must already be
+ a shard existing on another node with a leader. This RPC first contacts peer member seed nodes
+ searching for a shard. When found, an AddServer message is sent to the shard leader and applied as
+ described in the Raft paper.";
+ }
+
+ rpc remove-prefix-shard-replica {
+ input {
+ leaf shard-prefix {
+ mandatory true;
+ type instance-identifier;
+ }
+ leaf member-name {
+ mandatory true;
+ type string;
+ description "The cluster member from which the shard replica should be removed";
+ }
+
+ leaf data-store-type {
+ mandatory true;
+ type data-store-type;
+ description "The type of the data store to which the replica belongs";
+ }
+ }
+
+ description "Removes an existing replica of a prefix shard from this node via the RemoveServer mechanism as
+ described in the Raft paper.";
+ }
+
rpc add-replicas-for-all-shards {
output {
uses shard-result-output;
<groupId>org.opendaylight.yangtools</groupId>
<artifactId>yang-test-util</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller.samples</groupId>
+ <artifactId>clustering-it-model</artifactId>
+ <version>1.5.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
<!-- Akka -->
<dependency>
import org.apache.commons.lang3.SerializationUtils;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
+import org.opendaylight.controller.cluster.datastore.messages.AddPrefixShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
import org.opendaylight.controller.cluster.datastore.messages.FlipShardMembersVotingStatus;
+import org.opendaylight.controller.cluster.datastore.messages.RemovePrefixShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshotList;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
+import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddPrefixShardReplicaInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemovePrefixShardReplicaInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.member.voting.states.input.MemberVotingState;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResult;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultBuilder;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private final DistributedDataStoreInterface configDataStore;
private final DistributedDataStoreInterface operDataStore;
+ private final BindingNormalizedNodeSerializer serializer;
public ClusterAdminRpcService(DistributedDataStoreInterface configDataStore,
- DistributedDataStoreInterface operDataStore) {
+ DistributedDataStoreInterface operDataStore,
+ BindingNormalizedNodeSerializer serializer) {
this.configDataStore = configDataStore;
this.operDataStore = operDataStore;
+ this.serializer = serializer;
}
@Override
return returnFuture;
}
+ @Override
+ public Future<RpcResult<Void>> addPrefixShardReplica(final AddPrefixShardReplicaInput input) {
+
+ final InstanceIdentifier<?> identifier = input.getShardPrefix();
+ if (identifier == null) {
+ return newFailedRpcResultFuture("A valid shard identifier must be specified");
+ }
+
+ final DataStoreType dataStoreType = input.getDataStoreType();
+ if (dataStoreType == null) {
+ return newFailedRpcResultFuture("A valid DataStoreType must be specified");
+ }
+
+ LOG.info("Adding replica for shard {}, datastore type {}", identifier, dataStoreType);
+
+ final YangInstanceIdentifier prefix = serializer.toYangInstanceIdentifier(identifier);
+ final SettableFuture<RpcResult<Void>> returnFuture = SettableFuture.create();
+ ListenableFuture<Success> future = sendMessageToShardManager(dataStoreType, new AddPrefixShardReplica(prefix));
+ Futures.addCallback(future, new FutureCallback<Success>() {
+ @Override
+ public void onSuccess(Success success) {
+ LOG.info("Successfully added replica for shard {}", prefix);
+ returnFuture.set(newSuccessfulResult());
+ }
+
+ @Override
+ public void onFailure(Throwable failure) {
+ onMessageFailure(String.format("Failed to add replica for shard %s", prefix),
+ returnFuture, failure);
+ }
+ });
+
+ return returnFuture;
+ }
+
+ @Override
+ public Future<RpcResult<Void>> removePrefixShardReplica(final RemovePrefixShardReplicaInput input) {
+
+ final InstanceIdentifier<?> identifier = input.getShardPrefix();
+ if (identifier == null) {
+ return newFailedRpcResultFuture("A valid shard identifier must be specified");
+ }
+
+ final DataStoreType dataStoreType = input.getDataStoreType();
+ if (dataStoreType == null) {
+ return newFailedRpcResultFuture("A valid DataStoreType must be specified");
+ }
+
+ final String memberName = input.getMemberName();
+ if (Strings.isNullOrEmpty(memberName)) {
+ return newFailedRpcResultFuture("A valid member name must be specified");
+ }
+
+ LOG.info("Removing replica for shard {} memberName {}, datastoreType {}",
+ identifier, memberName, dataStoreType);
+ final YangInstanceIdentifier prefix = serializer.toYangInstanceIdentifier(identifier);
+
+ final SettableFuture<RpcResult<Void>> returnFuture = SettableFuture.create();
+ final ListenableFuture<Success> future = sendMessageToShardManager(dataStoreType,
+ new RemovePrefixShardReplica(prefix, MemberName.forName(memberName)));
+ Futures.addCallback(future, new FutureCallback<Success>() {
+ @Override
+ public void onSuccess(final Success success) {
+ LOG.info("Successfully removed replica for shard {}", prefix);
+ returnFuture.set(newSuccessfulResult());
+ }
+
+ @Override
+ public void onFailure(final Throwable failure) {
+ onMessageFailure(String.format("Failed to remove replica for shard %s", prefix),
+ returnFuture, failure);
+ }
+ });
+
+ return returnFuture;
+ }
+
@Override
public Future<RpcResult<AddReplicasForAllShardsOutput>> addReplicasForAllShards() {
LOG.info("Adding replicas for all shards");
<?xml version="1.0" encoding="UTF-8"?>
<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"
- xmlns:odl="http://opendaylight.org/xmlns/blueprint/v1.0.0">
+ xmlns:odl="http://opendaylight.org/xmlns/blueprint/v1.0.0"
+ odl:use-default-for-reference-types="true">
<!-- ClusterAdminRpcService -->
<reference id="operationalDatastore" interface="org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface"
odl:type="distributed-operational"/>
+ <reference id="normalizedNodeSerializer" interface="org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer"/>
+
<bean id="clusterAdminService" class="org.opendaylight.controller.cluster.datastore.admin.ClusterAdminRpcService">
<argument ref="configDatastore"/>
<argument ref="operationalDatastore"/>
+ <argument ref="normalizedNodeSerializer"/>
</bean>
<odl:rpc-implementation ref="clusterAdminService"/>
import java.util.AbstractMap.SimpleEntry;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mockito;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.MemberNode.RaftStateVerifier;
import org.opendaylight.controller.cluster.datastore.Shard;
import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
+import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
+import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
+import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.Cars;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddPrefixShardReplicaInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddPrefixShardReplicaInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemovePrefixShardReplicaInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemovePrefixShardReplicaInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.member.voting.states.input.MemberVotingStateBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResult;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultBuilder;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.common.RpcError;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
String fileName = "target/testBackupDatastore";
new File(fileName).delete();
- ClusterAdminRpcService service = new ClusterAdminRpcService(node.configDataStore(), node.operDataStore());
+ ClusterAdminRpcService service = new ClusterAdminRpcService(node.configDataStore(), node.operDataStore(), null);
RpcResult<Void> rpcResult = service .backupDatastore(new BackupDatastoreInputBuilder()
.setFilePath(fileName).build()).get(5, TimeUnit.SECONDS);
assertEquals("DatastoreSnapshot shard names", Sets.newHashSet(expShardNames), shardNames);
}
+ @Test
+ public void testAddRemovePrefixShardReplica() throws Exception {
+ String name = "testAddPrefixShardReplica";
+ String moduleShardsConfig = "module-shards-default.conf";
+
+ final MemberNode member1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
+ .moduleShardsConfig(moduleShardsConfig).build();
+ final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
+ .moduleShardsConfig(moduleShardsConfig).build();
+ final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
+ .moduleShardsConfig(moduleShardsConfig).build();
+
+ member1.waitForMembersUp("member-2", "member-3");
+ replicaNode2.kit().waitForMembersUp("member-1", "member-3");
+ replicaNode3.kit().waitForMembersUp("member-1", "member-2");
+
+ final ActorRef shardManager1 = member1.configDataStore().getActorContext().getShardManager();
+
+ shardManager1.tell(new PrefixShardCreated(new PrefixShardConfiguration(
+ new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH),
+ "prefix", Collections.singleton(MEMBER_1))),
+ ActorRef.noSender());
+
+ member1.kit().waitUntilLeader(member1.configDataStore().getActorContext(),
+ ClusterUtils.getCleanShardName(CarsModel.BASE_PATH));
+
+ final InstanceIdentifier<Cars> identifier = InstanceIdentifier.create(Cars.class);
+ final BindingNormalizedNodeSerializer serializer = Mockito.mock(BindingNormalizedNodeSerializer.class);
+ Mockito.doReturn(CarsModel.BASE_PATH).when(serializer).toYangInstanceIdentifier(identifier);
+
+ addPrefixShardReplica(replicaNode2, identifier, serializer,
+ ClusterUtils.getCleanShardName(CarsModel.BASE_PATH), "member-1");
+
+ addPrefixShardReplica(replicaNode3, identifier, serializer,
+ ClusterUtils.getCleanShardName(CarsModel.BASE_PATH), "member-1", "member-2");
+
+ verifyRaftPeersPresent(member1.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH),
+ "member-2", "member-3");
+
+ removePrefixShardReplica(member1, identifier, "member-3", serializer,
+ ClusterUtils.getCleanShardName(CarsModel.BASE_PATH), "member-2");
+
+ verifyNoShardPresent(replicaNode3.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH));
+ verifyRaftPeersPresent(replicaNode2.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH),
+ "member-1");
+
+ removePrefixShardReplica(member1, identifier, "member-2", serializer,
+ ClusterUtils.getCleanShardName(CarsModel.BASE_PATH));
+
+ verifyNoShardPresent(replicaNode2.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH));
+ }
+
@Test
public void testAddShardReplica() throws Exception {
String name = "testAddShardReplica";
.moduleShardsConfig("module-shards-cars-member-1.conf").build();
ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(),
- memberNode.operDataStore());
+ memberNode.operDataStore(), null);
RpcResult<Void> rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder()
.setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
assertEquals("Data node", expCarsNode, optional.get());
}
+ private void addPrefixShardReplica(final MemberNode memberNode,
+ final InstanceIdentifier<?> identifier,
+ final BindingNormalizedNodeSerializer serializer,
+ final String shardName,
+ final String... peerMemberNames) throws Exception {
+
+ final AddPrefixShardReplicaInput input = new AddPrefixShardReplicaInputBuilder()
+ .setShardPrefix(identifier)
+ .setDataStoreType(DataStoreType.Config).build();
+
+ final ClusterAdminRpcService service =
+ new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), serializer);
+
+ final RpcResult<Void> rpcResult = service.addPrefixShardReplica(input).get(10, TimeUnit.SECONDS);
+ verifySuccessfulRpcResult(rpcResult);
+
+ verifyRaftPeersPresent(memberNode.configDataStore(), shardName, peerMemberNames);
+ Optional<ActorRef> optional = memberNode.configDataStore().getActorContext().findLocalShard(shardName);
+ assertTrue("Replica shard not present", optional.isPresent());
+ }
+
+ private void removePrefixShardReplica(final MemberNode memberNode,
+ final InstanceIdentifier<?> identifier,
+ final String removeFromMember,
+ final BindingNormalizedNodeSerializer serializer,
+ final String shardName,
+ final String... peerMemberNames) throws Exception {
+ final RemovePrefixShardReplicaInput input = new RemovePrefixShardReplicaInputBuilder()
+ .setDataStoreType(DataStoreType.Config)
+ .setShardPrefix(identifier)
+ .setMemberName(removeFromMember).build();
+
+ final ClusterAdminRpcService service =
+ new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), serializer);
+
+ final RpcResult<Void> rpcResult = service.removePrefixShardReplica(input).get(10, TimeUnit.SECONDS);
+ verifySuccessfulRpcResult(rpcResult);
+
+ verifyRaftPeersPresent(memberNode.configDataStore(), shardName, peerMemberNames);
+ }
+
private static void doAddShardReplica(MemberNode memberNode, String shardName, String... peerMemberNames)
throws Exception {
memberNode.waitForMembersUp(peerMemberNames);
ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(),
- memberNode.operDataStore());
+ memberNode.operDataStore(), null);
RpcResult<Void> rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName(shardName)
.setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
// Invoke RPC service on member-3 to remove it's local shard
ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
- replicaNode3.operDataStore());
+ replicaNode3.operDataStore(), null);
RpcResult<Void> rpcResult = service3.removeShardReplica(new RemoveShardReplicaInputBuilder()
.setShardName("cars").setMemberName("member-3").setDataStoreType(DataStoreType.Config).build())
// Invoke RPC service on member-1 to remove member-2
ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
- leaderNode1.operDataStore());
+ leaderNode1.operDataStore(), null);
rpcResult = service1.removeShardReplica(new RemoveShardReplicaInputBuilder().setShardName("cars")
.setMemberName("member-2").setDataStoreType(DataStoreType.Config).build()).get(10, TimeUnit.SECONDS);
// Invoke RPC service on leader member-1 to remove it's local shard
ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
- leaderNode1.operDataStore());
+ leaderNode1.operDataStore(), null);
RpcResult<Void> rpcResult = service1.removeShardReplica(new RemoveShardReplicaInputBuilder()
.setShardName("cars").setMemberName("member-1").setDataStoreType(DataStoreType.Config).build())
newReplicaNode2.kit().expectMsgClass(Success.class);
ClusterAdminRpcService service = new ClusterAdminRpcService(newReplicaNode2.configDataStore(),
- newReplicaNode2.operDataStore());
+ newReplicaNode2.operDataStore(), null);
RpcResult<AddReplicasForAllShardsOutput> rpcResult =
service.addReplicasForAllShards().get(10, TimeUnit.SECONDS);
verifyRaftPeersPresent(replicaNode3.configDataStore(), "pets", "member-1", "member-2");
ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
- replicaNode3.operDataStore());
+ replicaNode3.operDataStore(), null);
RpcResult<RemoveAllShardReplicasOutput> rpcResult = service3.removeAllShardReplicas(
new RemoveAllShardReplicasInputBuilder().setMemberName("member-3").build()).get(10, TimeUnit.SECONDS);
// Invoke RPC service on member-3 to change voting status
ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
- replicaNode3.operDataStore());
+ replicaNode3.operDataStore(), null);
RpcResult<Void> rpcResult = service3
.changeMemberVotingStatesForShard(new ChangeMemberVotingStatesForShardInputBuilder()
// Invoke RPC service on member-3 to change voting status
ClusterAdminRpcService service = new ClusterAdminRpcService(leaderNode.configDataStore(),
- leaderNode.operDataStore());
+ leaderNode.operDataStore(), null);
RpcResult<Void> rpcResult = service
.changeMemberVotingStatesForShard(new ChangeMemberVotingStatesForShardInputBuilder()
// Invoke RPC service on member-3 to change voting status
ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
- replicaNode3.operDataStore());
+ replicaNode3.operDataStore(), null);
RpcResult<ChangeMemberVotingStatesForAllShardsOutput> rpcResult = service3.changeMemberVotingStatesForAllShards(
new ChangeMemberVotingStatesForAllShardsInputBuilder().setMemberVotingState(ImmutableList.of(
new SimpleEntry<>("member-2", TRUE), new SimpleEntry<>("member-3", FALSE));
ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
- replicaNode3.operDataStore());
+ replicaNode3.operDataStore(), null);
RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service3.flipMemberVotingStatesForAllShards()
.get(10, TimeUnit.SECONDS);
assertEquals("Expected raft state", RaftState.Follower.toString(), raftState.getRaftState()));
ClusterAdminRpcService service1 = new ClusterAdminRpcService(replicaNode1.configDataStore(),
- replicaNode1.operDataStore());
+ replicaNode1.operDataStore(), null);
RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service1.flipMemberVotingStatesForAllShards()
.get(10, TimeUnit.SECONDS);
new SimpleEntry<>("member-6", FALSE));
ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
- leaderNode1.operDataStore());
+ leaderNode1.operDataStore(), null);
RpcResult<FlipMemberVotingStatesForAllShardsOutput> rpcResult = service1.flipMemberVotingStatesForAllShards()
.get(10, TimeUnit.SECONDS);
--- /dev/null
+/*
+ * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import com.google.common.base.Preconditions;
+import javax.annotation.Nonnull;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+/**
+ * A message sent to the ShardManager to dynamically add a new local shard
+ * that is a replica for an existing prefix shard that is already available
+ * in the cluster.
+ */
+public class AddPrefixShardReplica {
+
+ private final YangInstanceIdentifier prefix;
+
+ /**
+ * Constructor.
+ *
+ * @param prefix prefix of the shard that is to be locally replicated.
+ */
+
+ public AddPrefixShardReplica(@Nonnull final YangInstanceIdentifier prefix) {
+ this.prefix = Preconditions.checkNotNull(prefix, "prefix should not be null");
+ }
+
+ public YangInstanceIdentifier getShardPrefix() {
+ return this.prefix;
+ }
+
+ @Override
+ public String toString() {
+ return "AddPrefixShardReplica[prefix=" + prefix + "]";
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import com.google.common.base.Preconditions;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+/**
+ * A message sent to the ShardManager to dynamically remove a local prefix shard
+ * replica available in this node.
+ */
+public class RemovePrefixShardReplica {
+
+ private final YangInstanceIdentifier prefix;
+ private final MemberName memberName;
+
+ /**
+ * Constructor.
+ *
+ * @param prefix prefix of the local shard that is to be dynamically removed.
+ */
+ public RemovePrefixShardReplica(@Nonnull final YangInstanceIdentifier prefix,
+ @Nonnull final MemberName memberName) {
+ this.prefix = Preconditions.checkNotNull(prefix, "prefix should not be null");
+ this.memberName = Preconditions.checkNotNull(memberName, "memberName should not be null");
+ }
+
+ public YangInstanceIdentifier getShardPrefix() {
+ return prefix;
+ }
+
+ public MemberName getMemberName() {
+ return memberName;
+ }
+
+ @Override
+ public String toString() {
+ return "RemovePrefixShardReplica [prefix=" + prefix + ", memberName=" + memberName + "]";
+ }
+}
import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
+import org.opendaylight.controller.cluster.datastore.messages.AddPrefixShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
+import org.opendaylight.controller.cluster.datastore.messages.RemovePrefixShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
onCreateShard((CreateShard)message);
} else if (message instanceof AddShardReplica) {
onAddShardReplica((AddShardReplica) message);
+ } else if (message instanceof AddPrefixShardReplica) {
+ onAddPrefixShardReplica((AddPrefixShardReplica) message);
} else if (message instanceof PrefixShardCreated) {
onPrefixShardCreated((PrefixShardCreated) message);
} else if (message instanceof PrefixShardRemoved) {
onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
} else if (message instanceof RemoveShardReplica) {
onRemoveShardReplica((RemoveShardReplica) message);
+ } else if (message instanceof RemovePrefixShardReplica) {
+ onRemovePrefixShardReplica((RemovePrefixShardReplica) message);
} else if (message instanceof WrappedShardResponse) {
onWrappedShardResponse((WrappedShardResponse) message);
} else if (message instanceof GetSnapshot) {
}
}
+ private void removePrefixShardReplica(final RemovePrefixShardReplica contextMessage, final String shardName,
+ final String primaryPath, final ActorRef sender) {
+ if (isShardReplicaOperationInProgress(shardName, sender)) {
+ return;
+ }
+
+ shardReplicaOperationsInProgress.add(shardName);
+
+ final ShardIdentifier shardId = getShardIdentifier(contextMessage.getMemberName(), shardName);
+
+ final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
+
+ //inform ShardLeader to remove this shard as a replica by sending an RemoveServer message
+ LOG.debug("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(),
+ primaryPath, shardId);
+
+ Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration());
+ Future<Object> futureObj = ask(getContext().actorSelection(primaryPath),
+ new RemoveServer(shardId.toString()), removeServerTimeout);
+
+ futureObj.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(Throwable failure, Object response) {
+ if (failure != null) {
+ shardReplicaOperationsInProgress.remove(shardName);
+ String msg = String.format("RemoveServer request to leader %s for shard %s failed",
+ primaryPath, shardName);
+
+ LOG.debug("{}: {}", persistenceId(), msg, failure);
+
+ // FAILURE
+ sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
+ } else {
+ // SUCCESS
+ self().tell(new WrappedShardResponse(shardId, response, primaryPath), sender);
+ }
+ }
+ }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+ }
+
private void removeShardReplica(RemoveShardReplica contextMessage, final String shardName, final String primaryPath,
final ActorRef sender) {
if (isShardReplicaOperationInProgress(shardName, sender)) {
final PrefixShardConfiguration config = message.getConfiguration();
- final ShardIdentifier shardId = ClusterUtils.getShardIdentifier(cluster.getCurrentMemberName(),
- config.getPrefix());
+ final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
+ ClusterUtils.getCleanShardName(config.getPrefix().getRootIdentifier()));
final String shardName = shardId.getShardName();
if (localShards.containsKey(shardName)) {
LOG.debug("{}: onPrefixShardRemoved : {}", persistenceId(), message);
final DOMDataTreeIdentifier prefix = message.getPrefix();
- final ShardIdentifier shardId = ClusterUtils.getShardIdentifier(cluster.getCurrentMemberName(), prefix);
+ final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
+ ClusterUtils.getCleanShardName(prefix.getRootIdentifier()));
final ShardInformation shard = localShards.remove(shardId.getShardName());
configuration.removePrefixShardConfiguration(prefix);
return false;
}
+ private void onAddPrefixShardReplica(final AddPrefixShardReplica message) {
+ LOG.debug("{}: onAddPrefixShardReplica: {}", persistenceId(), message);
+
+ final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
+ ClusterUtils.getCleanShardName(message.getShardPrefix()));
+ final String shardName = shardId.getShardName();
+
+ // Create the localShard
+ if (schemaContext == null) {
+ String msg = String.format(
+ "No SchemaContext is available in order to create a local shard instance for %s", shardName);
+ LOG.debug("{}: {}", persistenceId(), msg);
+ getSender().tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
+ return;
+ }
+
+ findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(),
+ getSelf()) {
+ @Override
+ public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
+ getSelf().tell((RunnableMessage) () -> addPrefixShard(getShardName(), message.getShardPrefix(),
+ response, getSender()), getTargetActor());
+ }
+
+ @Override
+ public void onLocalPrimaryFound(LocalPrimaryShardFound message) {
+ sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
+ }
+ });
+ }
+
private void onAddShardReplica(final AddShardReplica shardReplicaMsg) {
final String shardName = shardReplicaMsg.getShardName();
sender.tell(new Status.Failure(new AlreadyExistsException(msg)), getSelf());
}
+ private void addPrefixShard(final String shardName, final YangInstanceIdentifier shardPrefix,
+ final RemotePrimaryShardFound response, final ActorRef sender) {
+ if (isShardReplicaOperationInProgress(shardName, sender)) {
+ return;
+ }
+
+ shardReplicaOperationsInProgress.add(shardName);
+
+ final ShardInformation shardInfo;
+ final boolean removeShardOnFailure;
+ ShardInformation existingShardInfo = localShards.get(shardName);
+ if (existingShardInfo == null) {
+ removeShardOnFailure = true;
+ ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
+
+ final Builder builder = newShardDatastoreContextBuilder(shardName);
+ builder.storeRoot(shardPrefix).customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
+
+ DatastoreContext datastoreContext = builder.build();
+
+ shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext,
+ Shard.builder(), peerAddressResolver);
+ shardInfo.setActiveMember(false);
+ localShards.put(shardName, shardInfo);
+ shardInfo.setActor(newShardActor(schemaContext, shardInfo));
+ } else {
+ removeShardOnFailure = false;
+ shardInfo = existingShardInfo;
+ }
+
+ execAddShard(shardName, shardInfo, response, removeShardOnFailure, sender);
+ }
+
private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) {
if (isShardReplicaOperationInProgress(shardName, sender)) {
return;
shardInfo = existingShardInfo;
}
- String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
+ execAddShard(shardName, shardInfo, response, removeShardOnFailure, sender);
+ }
+
+ private void execAddShard(final String shardName,
+ final ShardInformation shardInfo,
+ final RemotePrimaryShardFound response,
+ final boolean removeShardOnFailure,
+ final ActorRef sender) {
+
+ final String localShardAddress =
+ peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
//inform ShardLeader to add this shard as a replica by sending an AddServer message
LOG.debug("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
response.getPrimaryPath(), shardInfo.getShardId());
- Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext()
+ final Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext()
.getShardLeaderElectionTimeout().duration());
- Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
- new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout);
+ final Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
+ new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout);
futureObj.onComplete(new OnComplete<Object>() {
@Override
LOG.debug("{}: AddServer request to {} for {} failed", persistenceId(),
response.getPrimaryPath(), shardName, failure);
- String msg = String.format("AddServer request to leader %s for shard %s failed",
+ final String msg = String.format("AddServer request to leader %s for shard %s failed",
response.getPrimaryPath(), shardName);
self().tell(new ForwardedAddServerFailure(shardName, msg, failure, removeShardOnFailure), sender);
} else {
});
}
+ private void onRemovePrefixShardReplica(final RemovePrefixShardReplica message) {
+ LOG.debug("{}: onRemovePrefixShardReplica: {}", persistenceId(), message);
+
+ final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
+ ClusterUtils.getCleanShardName(message.getShardPrefix()));
+ final String shardName = shardId.getShardName();
+
+ findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(),
+ shardName, persistenceId(), getSelf()) {
+ @Override
+ public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
+ doRemoveShardReplicaAsync(response.getPrimaryPath());
+ }
+
+ @Override
+ public void onLocalPrimaryFound(LocalPrimaryShardFound response) {
+ doRemoveShardReplicaAsync(response.getPrimaryPath());
+ }
+
+ private void doRemoveShardReplicaAsync(final String primaryPath) {
+ getSelf().tell((RunnableMessage) () -> removePrefixShardReplica(message, getShardName(),
+ primaryPath, getSender()), getTargetActor());
+ }
+ });
+ }
+
private void persistShardList() {
List<String> shardList = new ArrayList<>(localShards.keySet());
for (ShardInformation shardInfo : localShards.values()) {