this.entityOwnershipStatistics.init(getDataStore());
for(String peerId: getRaftActorContext().getPeerIds()) {
- ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(peerId).build();
+ ShardIdentifier shardId = ShardIdentifier.fromShardIdString(peerId);
peerIdToMemberNames.put(peerId, shardId.getMemberName());
}
}
private final String type;
private final String fullName;
- public ShardIdentifier(String shardName, MemberName memberName, String type) {
+ private ShardIdentifier(String shardName, MemberName memberName, String type) {
+ this.shardName = Preconditions.checkNotNull(shardName, "shardName should not be null");
+ this.memberName = Preconditions.checkNotNull(memberName, "memberName should not be null");
+ this.type = Preconditions.checkNotNull(type, "type should not be null");
- Preconditions.checkNotNull(shardName, "shardName should not be null");
- Preconditions.checkNotNull(memberName, "memberName should not be null");
- Preconditions.checkNotNull(type, "type should not be null");
+ fullName = memberName.getName() + "-shard-" + shardName + "-" + type;
+ }
+
+ public static ShardIdentifier create(final String shardName, final MemberName memberName, final String type) {
+ return new ShardIdentifier(shardName, memberName, type);
+ }
- this.shardName = shardName;
- this.memberName = memberName;
- this.type = type;
+ public static ShardIdentifier fromShardIdString(final String shardIdString) {
+ final Matcher matcher = PATTERN.matcher(shardIdString);
+ Preconditions.checkArgument(matcher.matches(), "Invalid shard id \"%s\"", shardIdString);
- fullName = new StringBuilder(memberName.getName()).append("-shard-").append(shardName).append("-")
- .append(type).toString();
+ return new ShardIdentifier(matcher.group(2), MemberName.forName(matcher.group(1)), matcher.group(3));
}
@Override
@Override
public String toString() {
- //ensure the output of toString matches the pattern above
+ // ensure the output of toString matches the pattern above
return fullName;
}
- public static Builder builder(){
- return new Builder();
- }
-
public String getShardName() {
return shardName;
}
String actorName = sender.path().name();
//find shard name from actor name; actor name is stringified shardId
- ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(actorName).build();
- if (shardId.getShardName() == null) {
+ final ShardIdentifier shardId;
+ try {
+ shardId = ShardIdentifier.fromShardIdString(actorName);
+ } catch (IllegalArgumentException e) {
+ LOG.debug("{}: ignoring actor {}", actorName, e);
return;
}
private void onGetSnapshotReply(GetSnapshotReply getSnapshotReply) {
LOG.debug("{}: Received {}", params.id, getSnapshotReply);
- ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(getSnapshotReply.getId()).build();
+ ShardIdentifier shardId = ShardIdentifier.fromShardIdString(getSnapshotReply.getId());
shardSnapshots.add(new ShardSnapshot(shardId.getShardName(), getSnapshotReply.getSnapshot()));
remainingShardNames.remove(shardId.getShardName());
@Override
public void switchShardState(String shardId, String newState, long term) {
- final ShardIdentifier identifier = ShardIdentifier.builder().fromShardIdString(
- Preconditions.checkNotNull(shardId, "Shard id may not be null")).build();
+ final ShardIdentifier identifier = ShardIdentifier.fromShardIdString(shardId);
LOG.info("switchShardState called shardName = {}, newState = {}, term = {}", shardId, newState, term);
requestSwitchShardState(identifier, newState, term);
}
return peerAddresses;
}
- ShardIdentifier getShardIdentifier(MemberName memberName, String shardName){
- return ShardIdentifier.builder().memberName(memberName).shardName(shardName).type(shardManagerType).build();
+ ShardIdentifier getShardIdentifier(MemberName memberName, String shardName) {
+ return ShardIdentifier.create(shardName, memberName, shardManagerType);
}
String getShardActorAddress(String shardName, MemberName memberName) {
@Override
public String resolve(String peerId) {
- if(peerId == null) {
+ if (peerId == null) {
return null;
}
- ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(peerId).build();
+ ShardIdentifier shardId = ShardIdentifier.fromShardIdString(peerId);
return getShardActorAddress(shardId.getShardName(), shardId.getMemberName());
}
}
private static final AtomicInteger NEXT_SHARD_NUM = new AtomicInteger();
- protected final ShardIdentifier shardID = ShardIdentifier.builder().memberName(MemberName.forName("member-1"))
- .shardName("inventory").type("config" + NEXT_SHARD_NUM.getAndIncrement()).build();
+ protected final ShardIdentifier shardID = ShardIdentifier.create("inventory", MemberName.forName("member-1"),
+ "config" + NEXT_SHARD_NUM.getAndIncrement());
protected final Builder dataStoreContextBuilder = DatastoreContext.newBuilder().
shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).
String... peerMemberNames) throws Exception {
final Set<String> peerIds = Sets.newHashSet();
for(String p: peerMemberNames) {
- peerIds.add(ShardIdentifier.builder().memberName(MemberName.forName(p)).shardName(shardName).
- type(datastore.getActorContext().getDataStoreName()).build().toString());
+ peerIds.add(ShardIdentifier.create(shardName, MemberName.forName(p),
+ datastore.getActorContext().getDataStoreName()).toString());
}
- verifyRaftState(datastore, shardName, raftState -> assertEquals("Peers for shard " + shardName, peerIds, raftState.getPeerAddresses().keySet()));
+ verifyRaftState(datastore, shardName, raftState -> assertEquals("Peers for shard " + shardName, peerIds,
+ raftState.getPeerAddresses().keySet()));
}
public static void verifyNoShardPresent(DistributedDataStore datastore, String shardName) {
@Test
public void testPeerAddressResolved() throws Exception {
new ShardTestKit(getSystem()) {{
- ShardIdentifier peerID = ShardIdentifier.builder().memberName(MemberName.forName("member-2"))
- .shardName("inventory").type("config").build();
+ ShardIdentifier peerID = ShardIdentifier.create("inventory", MemberName.forName("member-2"), "config");
final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardBuilder().
peerAddresses(Collections.<String, String>singletonMap(peerID.toString(), null)).props().
withDispatcher(Dispatchers.DefaultDispatcherId()), "testPeerAddressResolved");
public void testClusteredDataChangeListenerRegistration() throws Exception {
new ShardTestKit(getSystem()) {{
String testName = "testClusteredDataChangeListenerRegistration";
- final ShardIdentifier followerShardID = ShardIdentifier.builder()
- .memberName(MemberName.forName(actorFactory.generateActorId(testName + "-follower")))
- .shardName("inventory").type("config").build();
+ final ShardIdentifier followerShardID = ShardIdentifier.create("inventory",
+ MemberName.forName(actorFactory.generateActorId(testName + "-follower")), "config");
- final ShardIdentifier leaderShardID = ShardIdentifier.builder()
- .memberName(MemberName.forName(actorFactory.generateActorId(testName + "-leader")))
- .shardName("inventory").type("config").build();
+ final ShardIdentifier leaderShardID = ShardIdentifier.create("inventory",
+ MemberName.forName(actorFactory.generateActorId(testName + "-leader")), "config");
final TestActorRef<Shard> followerShard = actorFactory.createTestActor(
Shard.builder().id(followerShardID).
public void testClusteredDataTreeChangeListenerRegistration() throws Exception {
new ShardTestKit(getSystem()) {{
String testName = "testClusteredDataTreeChangeListenerRegistration";
- final ShardIdentifier followerShardID = ShardIdentifier.builder()
- .memberName(MemberName.forName(actorFactory.generateActorId(testName + "-follower")))
- .shardName("inventory").type("config").build();
+ final ShardIdentifier followerShardID = ShardIdentifier.create("inventory",
+ MemberName.forName(actorFactory.generateActorId(testName + "-follower")), "config");
- final ShardIdentifier leaderShardID = ShardIdentifier.builder()
- .memberName(MemberName.forName(actorFactory.generateActorId(testName + "-leader")))
- .shardName("inventory").type("config").build();
+ final ShardIdentifier leaderShardID = ShardIdentifier.create("inventory",
+ MemberName.forName(actorFactory.generateActorId(testName + "-leader")), "config");
final TestActorRef<Shard> followerShard = actorFactory.createTestActor(
Shard.builder().id(followerShardID).
private static final ShardDataTree store = new ShardDataTree(testSchemaContext, TreeType.OPERATIONAL);
private static final ShardIdentifier SHARD_IDENTIFIER =
- ShardIdentifier.builder().memberName(MemberName.forName("member-1"))
- .shardName("inventory").type("operational").build();
+ ShardIdentifier.create("inventory", MemberName.forName("member-1"), "operational");
private final DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
private static final TransactionType WO = TransactionType.WRITE_ONLY;
private static final ShardIdentifier SHARD_IDENTIFIER =
- ShardIdentifier.builder().memberName(MemberName.forName("member-1"))
- .shardName("inventory").type("config").build();
+ ShardIdentifier.create("inventory", MemberName.forName("member-1"), "config");
private DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
}
private static ShardIdentifier newShardId(String memberName) {
- return ShardIdentifier.builder().memberName(MemberName.forName(memberName)).shardName("entity-ownership").
- type("operational" + NEXT_SHARD_NUM.getAndIncrement()).build();
+ return ShardIdentifier.create("entity-ownership", MemberName.forName(memberName),
+ "operational" + NEXT_SHARD_NUM.getAndIncrement());
}
private static class TestEntityOwnershipShard extends EntityOwnershipShard {
@Test
public void testBasic(){
- ShardIdentifier id = ShardIdentifier.builder().memberName(MemberName.forName("member-1"))
- .shardName("inventory").type("config").build();
+ ShardIdentifier id = ShardIdentifier.create("inventory", MemberName.forName("member-1"), "config");
assertEquals("member-1-shard-inventory-config", id.toString());
}
public void testFromShardIdString(){
String shardIdStr = "member-1-shard-inventory-config";
- ShardIdentifier id = ShardIdentifier.builder().fromShardIdString(shardIdStr).build();
+ ShardIdentifier id = ShardIdentifier.fromShardIdString(shardIdStr);
assertEquals("member-1", id.getMemberName().getName());
assertEquals("inventory", id.getShardName());
kit.watch(replyActor);
byte[] shard1Snapshot = new byte[]{1,2,3};
- replyActor.tell(new GetSnapshotReply(ShardIdentifier.builder().memberName(MEMBER_1).type("config").
- shardName("shard1").build().toString(), shard1Snapshot), ActorRef.noSender());
+ replyActor.tell(new GetSnapshotReply(ShardIdentifier.create("shard1", MEMBER_1, "config").toString(),
+ shard1Snapshot), ActorRef.noSender());
byte[] shard2Snapshot = new byte[]{4,5,6};
- replyActor.tell(new GetSnapshotReply(ShardIdentifier.builder().memberName(MEMBER_1).type("config").
- shardName("shard2").build().toString(), shard2Snapshot), ActorRef.noSender());
+ replyActor.tell(new GetSnapshotReply(ShardIdentifier.create("shard2", MEMBER_1, "config").toString(),
+ shard2Snapshot), ActorRef.noSender());
kit.expectNoMsg(FiniteDuration.create(500, TimeUnit.MILLISECONDS));
byte[] shard3Snapshot = new byte[]{7,8,9};
- replyActor.tell(new GetSnapshotReply(ShardIdentifier.builder().memberName(MEMBER_1).type("config").
- shardName("shard3").build().toString(), shard3Snapshot), ActorRef.noSender());
+ replyActor.tell(new GetSnapshotReply(ShardIdentifier.create("shard3", MEMBER_1, "config").toString(),
+ shard3Snapshot), ActorRef.noSender());
DatastoreSnapshot datastoreSnapshot = kit.expectMsgClass(DatastoreSnapshot.class);
kit.watch(replyActor);
- replyActor.tell(new GetSnapshotReply(ShardIdentifier.builder().memberName(MEMBER_1).type("config").
- shardName("shard1").build().toString(), new byte[]{1,2,3}), ActorRef.noSender());
+ replyActor.tell(new GetSnapshotReply(ShardIdentifier.create("shard1", MEMBER_1, "config").toString(),
+ new byte[]{1,2,3}), ActorRef.noSender());
replyActor.tell(new Failure(new RuntimeException()), ActorRef.noSender());
InMemorySnapshotStore.clear();
if(mockShardActor == null) {
- mockShardName = new ShardIdentifier(Shard.DEFAULT_NAME, MEMBER_1, "config");
+ mockShardName = ShardIdentifier.create(Shard.DEFAULT_NAME, MEMBER_1, "config");
mockShardActor = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class),
mockShardName.toString());
}
}
private ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) {
- String name = new ShardIdentifier(shardName, MemberName.forName(memberName), "config").toString();
+ String name = ShardIdentifier.create(shardName, MemberName.forName(memberName), "config").toString();
if(system == getSystem()) {
return actorFactory.createTestActor(Props.create(MessageCollectorActor.class), name);
}
assertTrue("Epxected ShardPeerAddressResolver", shardBuilder.getDatastoreContext().getShardRaftConfig().
getPeerAddressResolver() instanceof ShardPeerAddressResolver);
assertEquals("peerMembers", Sets.newHashSet(
- new ShardIdentifier("foo", MemberName.forName("member-5"), shardMrgIDSuffix).toString(),
- new ShardIdentifier("foo", MemberName.forName("member-6"), shardMrgIDSuffix).toString()),
+ ShardIdentifier.create("foo", MemberName.forName("member-5"), shardMrgIDSuffix).toString(),
+ ShardIdentifier.create("foo", MemberName.forName("member-6"), shardMrgIDSuffix).toString()),
shardBuilder.getPeerAddresses().keySet());
- assertEquals("ShardIdentifier", new ShardIdentifier("foo", MEMBER_1, shardMrgIDSuffix),
+ assertEquals("ShardIdentifier", ShardIdentifier.create("foo", MEMBER_1, shardMrgIDSuffix),
shardBuilder.getId());
assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
final ActorSystem system2 = newActorSystem("Member2");
Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
- String name = new ShardIdentifier("astronauts", MEMBER_2, "config").toString();
+ String name = ShardIdentifier.create("astronauts", MEMBER_2, "config").toString();
final TestActorRef<MockRespondActor> mockShardLeaderActor =
TestActorRef.create(system2, Props.create(MockRespondActor.class).
withDispatcher(Dispatchers.DefaultDispatcherId()), name);
respondActor.underlyingActor().updateResponse(new RemoveServerReply(ServerChangeStatus.OK, null));
shardManager.tell(new RemoveShardReplica(Shard.DEFAULT_NAME, MEMBER_1), getRef());
final RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(respondActor, RemoveServer.class);
- assertEquals(new ShardIdentifier("default", MEMBER_1, shardMrgIDSuffix).toString(),
+ assertEquals(ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString(),
removeServer.getServerId());
expectMsgClass(duration("5 seconds"), Success.class);
}};
final ActorSystem system2 = newActorSystem("Member2");
Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
- String name = new ShardIdentifier("default", MEMBER_2, shardMrgIDSuffix).toString();
+ String name = ShardIdentifier.create("default", MEMBER_2, shardMrgIDSuffix).toString();
final TestActorRef<MockRespondActor> mockShardLeaderActor =
TestActorRef.create(system2, Props.create(MockRespondActor.class), name);
newReplicaShardManager.tell(new RemoveShardReplica("default", MEMBER_1), getRef());
RemoveServer removeServer = MessageCollectorActor.expectFirstMatching(mockShardLeaderActor,
RemoveServer.class);
- String removeServerId = new ShardIdentifier("default", MEMBER_1, shardMrgIDSuffix).toString();
+ String removeServerId = ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString();
assertEquals("RemoveServer serverId", removeServerId, removeServer.getServerId());
expectMsgClass(duration("5 seconds"), Status.Success.class);
}};
put("astronauts", Arrays.asList("member-2")).
put("people", Arrays.asList("member-1", "member-2")).build());
- String shardId = ShardIdentifier.builder().shardName("default").memberName(MEMBER_1).
- type(shardMrgIDSuffix).build().toString();
+ String shardId = ShardIdentifier.create("default", MEMBER_1, shardMrgIDSuffix).toString();
TestActorRef<MessageCollectorActor> shard = actorFactory.createTestActor(
MessageCollectorActor.props(), shardId);
put("shard1", Arrays.asList("member-1")).
put("shard2", Arrays.asList("member-1")).build());
- String shardId1 = ShardIdentifier.builder().shardName("shard1").memberName(MEMBER_1).
- type(shardMrgIDSuffix).build().toString();
+ String shardId1 = ShardIdentifier.create("shard1", MEMBER_1, shardMrgIDSuffix).toString();
TestActorRef<MessageCollectorActor> shard1 = actorFactory.createTestActor(
MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), shardId1);
- String shardId2 = ShardIdentifier.builder().shardName("shard2").memberName(MEMBER_1).
- type(shardMrgIDSuffix).build().toString();
+ String shardId2 = ShardIdentifier.create("shard2", MEMBER_1, shardMrgIDSuffix).toString();
TestActorRef<MessageCollectorActor> shard2 = actorFactory.createTestActor(
MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), shardId2);
ShardPeerAddressResolver resolver = new ShardPeerAddressResolver(type, MEMBER_1);
MemberName memberName = MEMBER_2;
- String peerId = ShardIdentifier.builder().memberName(memberName).shardName("default").
- type(type).build().toString();
+ String peerId = ShardIdentifier.create("default", memberName, type).toString();
assertEquals("resolve", null, resolver.resolve(peerId));