description "Creates a backup file of the datastore state";
}
+
+ rpc get-shard-role {
+ input {
+ leaf shard-name {
+ mandatory true;
+ type string;
+ description "The name of the shard for which to create a replica.";
+ }
+
+ leaf data-store-type {
+ mandatory true;
+ type data-store-type;
+ description "The type of the data store to which the replica belongs";
+ }
+ }
+
+ output {
+ leaf role {
+ type string;
+ description "Current role for the given shard, if not present the shard currently doesn't have a role";
+ }
+ }
+
+ description "Returns the current role for the requested module shard.";
+ }
+
+ rpc get-prefix-shard-role {
+ input {
+ leaf shard-prefix {
+ mandatory true;
+ type instance-identifier;
+ }
+
+ leaf data-store-type {
+ mandatory true;
+ type data-store-type;
+ description "The type of the data store to which the replica belongs";
+ }
+ }
+
+ output {
+ leaf role {
+ type string;
+ description "Current role for the given shard, if not present the shard currently doesn't have a role";
+ }
+ }
+
+ description "Returns the current role for the requested module shard.";
+ }
}
\ No newline at end of file
import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
import org.opendaylight.controller.cluster.datastore.messages.FlipShardMembersVotingStatus;
+import org.opendaylight.controller.cluster.datastore.messages.GetShardRole;
+import org.opendaylight.controller.cluster.datastore.messages.GetShardRoleReply;
import org.opendaylight.controller.cluster.datastore.messages.MakeLeaderLocal;
import org.opendaylight.controller.cluster.datastore.messages.RemovePrefixShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshotList;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddPrefixShardReplicaInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetPrefixShardRoleInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetPrefixShardRoleOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetPrefixShardRoleOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetShardRoleInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetShardRoleOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetShardRoleOutputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.MakeLeaderLocalInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutput;
"Failed to change member voting states");
}
+ @Override
+ public Future<RpcResult<GetShardRoleOutput>> getShardRole(final GetShardRoleInput input) {
+ final String shardName = input.getShardName();
+ if (Strings.isNullOrEmpty(shardName)) {
+ return newFailedRpcResultFuture("A valid shard name must be specified");
+ }
+
+ DataStoreType dataStoreType = input.getDataStoreType();
+ if (dataStoreType == null) {
+ return newFailedRpcResultFuture("A valid DataStoreType must be specified");
+ }
+
+ LOG.info("Getting role for shard {}, datastore type {}", shardName, dataStoreType);
+
+ final SettableFuture<RpcResult<GetShardRoleOutput>> returnFuture = SettableFuture.create();
+ ListenableFuture<GetShardRoleReply> future = sendMessageToShardManager(dataStoreType,
+ new GetShardRole(shardName));
+ Futures.addCallback(future, new FutureCallback<GetShardRoleReply>() {
+ @Override
+ public void onSuccess(final GetShardRoleReply reply) {
+ if (reply == null) {
+ returnFuture.set(ClusterAdminRpcService.<GetShardRoleOutput>newFailedRpcResultBuilder(
+ "No Shard role present. Please retry..").build());
+ return;
+ }
+ LOG.info("Successfully received role:{} for shard {}", reply.getRole(), shardName);
+ final GetShardRoleOutputBuilder builder = new GetShardRoleOutputBuilder();
+ if (reply.getRole() != null) {
+ builder.setRole(reply.getRole());
+ }
+ returnFuture.set(newSuccessfulResult(builder.build()));
+ }
+
+ @Override
+ public void onFailure(final Throwable failure) {
+ returnFuture.set(ClusterAdminRpcService.<GetShardRoleOutput>newFailedRpcResultBuilder(
+ "Failed to get shard role.", failure).build());
+ }
+ });
+
+ return returnFuture;
+ }
+
+ @Override
+ public Future<RpcResult<GetPrefixShardRoleOutput>> getPrefixShardRole(final GetPrefixShardRoleInput input) {
+ final InstanceIdentifier<?> identifier = input.getShardPrefix();
+ if (identifier == null) {
+ return newFailedRpcResultFuture("A valid shard identifier must be specified");
+ }
+
+ final DataStoreType dataStoreType = input.getDataStoreType();
+ if (dataStoreType == null) {
+ return newFailedRpcResultFuture("A valid DataStoreType must be specified");
+ }
+
+ LOG.info("Getting prefix shard role for shard: {}, datastore type {}", identifier, dataStoreType);
+
+ final YangInstanceIdentifier prefix = serializer.toYangInstanceIdentifier(identifier);
+ final String shardName = ClusterUtils.getCleanShardName(prefix);
+ final SettableFuture<RpcResult<GetPrefixShardRoleOutput>> returnFuture = SettableFuture.create();
+ ListenableFuture<GetShardRoleReply> future = sendMessageToShardManager(dataStoreType,
+ new GetShardRole(shardName));
+ Futures.addCallback(future, new FutureCallback<GetShardRoleReply>() {
+ @Override
+ public void onSuccess(final GetShardRoleReply reply) {
+ if (reply == null) {
+ returnFuture.set(ClusterAdminRpcService.<GetPrefixShardRoleOutput>newFailedRpcResultBuilder(
+ "No Shard role present. Please retry..").build());
+ return;
+ }
+
+ LOG.info("Successfully received role:{} for shard {}", reply.getRole(), shardName);
+ final GetPrefixShardRoleOutputBuilder builder = new GetPrefixShardRoleOutputBuilder();
+ if (reply.getRole() != null) {
+ builder.setRole(reply.getRole());
+ }
+ returnFuture.set(newSuccessfulResult(builder.build()));
+ }
+
+ @Override
+ public void onFailure(final Throwable failure) {
+ returnFuture.set(ClusterAdminRpcService.<GetPrefixShardRoleOutput>newFailedRpcResultBuilder(
+ "Failed to get shard role.", failure).build());
+ }
+ });
+
+ return returnFuture;
+ }
+
@Override
public Future<RpcResult<Void>> backupDatastore(final BackupDatastoreInput input) {
LOG.debug("backupDatastore: {}", input);
import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
+import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.Cars;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.people.rev140818.People;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddPrefixShardReplicaInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddPrefixShardReplicaInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForShardInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetPrefixShardRoleInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetPrefixShardRoleInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetPrefixShardRoleOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetShardRoleInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetShardRoleInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetShardRoleOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.MakeLeaderLocalInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutput;
verifyNoShardPresent(replicaNode2.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH));
}
+ @Test
+ public void testGetShardRole() throws Exception {
+ String name = "testGetShardRole";
+ String moduleShardsConfig = "module-shards-default-member-1.conf";
+
+ final MemberNode member1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
+ .moduleShardsConfig(moduleShardsConfig).build();
+
+ member1.kit().waitUntilLeader(member1.configDataStore().getActorContext(), "default");
+
+ final RpcResult<GetShardRoleOutput> successResult =
+ getShardRole(member1, Mockito.mock(BindingNormalizedNodeSerializer.class), "default");
+ verifySuccessfulRpcResult(successResult);
+ assertEquals("Leader", successResult.getResult().getRole());
+
+ final RpcResult<GetShardRoleOutput> failedResult =
+ getShardRole(member1, Mockito.mock(BindingNormalizedNodeSerializer.class), "cars");
+
+ verifyFailedRpcResult(failedResult);
+
+ final ActorRef shardManager1 = member1.configDataStore().getActorContext().getShardManager();
+
+ shardManager1.tell(new PrefixShardCreated(new PrefixShardConfiguration(
+ new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH),
+ "prefix", Collections.singleton(MEMBER_1))),
+ ActorRef.noSender());
+
+ member1.kit().waitUntilLeader(member1.configDataStore().getActorContext(),
+ ClusterUtils.getCleanShardName(CarsModel.BASE_PATH));
+
+ final InstanceIdentifier<Cars> identifier = InstanceIdentifier.create(Cars.class);
+ final BindingNormalizedNodeSerializer serializer = Mockito.mock(BindingNormalizedNodeSerializer.class);
+ Mockito.doReturn(CarsModel.BASE_PATH).when(serializer).toYangInstanceIdentifier(identifier);
+
+ final RpcResult<GetPrefixShardRoleOutput> prefixSuccessResult =
+ getPrefixShardRole(member1, identifier, serializer);
+
+ verifySuccessfulRpcResult(prefixSuccessResult);
+ assertEquals("Leader", prefixSuccessResult.getResult().getRole());
+
+ final InstanceIdentifier<People> peopleId = InstanceIdentifier.create(People.class);
+ Mockito.doReturn(PeopleModel.BASE_PATH).when(serializer).toYangInstanceIdentifier(peopleId);
+
+ final RpcResult<GetPrefixShardRoleOutput> prefixFail =
+ getPrefixShardRole(member1, peopleId, serializer);
+
+ verifyFailedRpcResult(prefixFail);
+ }
+
+ @Test
+ public void testGetPrefixShardRole() throws Exception {
+ String name = "testGetPrefixShardRole";
+ String moduleShardsConfig = "module-shards-default-member-1.conf";
+
+ final MemberNode member1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
+ .moduleShardsConfig(moduleShardsConfig).build();
+
+ member1.kit().waitUntilLeader(member1.configDataStore().getActorContext(), "default");
+
+
+ }
+
@Test
public void testModuleShardLeaderMovement() throws Exception {
String name = "testModuleShardLeaderMovement";
assertEquals("Data node", expCarsNode, optional.get());
}
+ private RpcResult<GetShardRoleOutput> getShardRole(final MemberNode memberNode,
+ final BindingNormalizedNodeSerializer serializer,
+ final String shardName) throws Exception {
+
+ final GetShardRoleInput input = new GetShardRoleInputBuilder()
+ .setDataStoreType(DataStoreType.Config)
+ .setShardName(shardName)
+ .build();
+
+ final ClusterAdminRpcService service =
+ new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), serializer);
+
+ return service.getShardRole(input).get(10, TimeUnit.SECONDS);
+
+ }
+
+ private RpcResult<GetPrefixShardRoleOutput> getPrefixShardRole(
+ final MemberNode memberNode,
+ final InstanceIdentifier<?> identifier,
+ final BindingNormalizedNodeSerializer serializer) throws Exception {
+
+ final GetPrefixShardRoleInput input = new GetPrefixShardRoleInputBuilder()
+ .setDataStoreType(DataStoreType.Config)
+ .setShardPrefix(identifier)
+ .build();
+
+ final ClusterAdminRpcService service =
+ new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), serializer);
+
+ return service.getPrefixShardRole(input).get(10, TimeUnit.SECONDS);
+
+ }
+
private void addPrefixShardReplica(final MemberNode memberNode,
final InstanceIdentifier<?> identifier,
final BindingNormalizedNodeSerializer serializer,
return rpcResult.getResult();
}
- private static void verifyFailedRpcResult(RpcResult<Void> rpcResult) {
+ private static void verifyFailedRpcResult(RpcResult<?> rpcResult) {
assertFalse("RpcResult", rpcResult.isSuccessful());
assertEquals("RpcResult errors size", 1, rpcResult.getErrors().size());
RpcError error = Iterables.getFirst(rpcResult.getErrors(), null);
--- /dev/null
+/*
+ * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+/**
+ * Message sent to the local ShardManager to request the current role for the given shard.
+ */
+public class GetShardRole {
+
+ private final String name;
+
+ public GetShardRole(final String name) {
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+/**
+ * Reply to GetShardRole, containing the current role of the shard if present on the ShardManager.
+ */
+public class GetShardRoleReply {
+
+ private final String role;
+
+ public GetShardRoleReply(final String role) {
+ this.role = role;
+ }
+
+ public String getRole() {
+ return role;
+ }
+}
notifyOnShardInitializedCallbacks();
}
+ String getRole() {
+ return role;
+ }
+
void setFollowerSyncStatus(boolean syncStatus) {
this.followerSyncStatus = syncStatus;
}
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.FlipShardMembersVotingStatus;
+import org.opendaylight.controller.cluster.datastore.messages.GetShardRole;
+import org.opendaylight.controller.cluster.datastore.messages.GetShardRoleReply;
import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
onShutDown();
} else if (message instanceof GetLocalShardIds) {
onGetLocalShardIds();
+ } else if (message instanceof GetShardRole) {
+ onGetShardRole((GetShardRole) message);
} else if (message instanceof RunnableMessage) {
((RunnableMessage)message).run();
} else if (message instanceof DeleteSnapshotsFailure) {
}
}
+ private void onGetShardRole(final GetShardRole message) {
+ LOG.debug("{}: onGetShardRole for shard: {}", persistenceId(), message.getName());
+
+ final String name = message.getName();
+
+ final ShardInformation shardInformation = localShards.get(name);
+
+ if (shardInformation == null) {
+ LOG.info("{}: no shard information for {} found", persistenceId(), name);
+ getSender().tell(new Status.Failure(
+ new IllegalArgumentException("Shard with name " + name + " not present.")), ActorRef.noSender());
+ return;
+ }
+
+ getSender().tell(new GetShardRoleReply(shardInformation.getRole()), ActorRef.noSender());
+ }
+
private void onInitConfigListener() {
LOG.debug("{}: Initializing config listener on {}", persistenceId(), cluster.getCurrentMemberName());