import akka.actor.PoisonPill;
import akka.actor.Status;
import akka.actor.SupervisorStrategy;
+import akka.actor.SupervisorStrategy.Directive;
import akka.cluster.ClusterEvent;
+import akka.cluster.Member;
import akka.dispatch.Futures;
import akka.dispatch.OnComplete;
import akka.japi.Function;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.commons.lang3.SerializationUtils;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
return;
}
- sendResponse(shardInformation, message.isWaitUntilInitialized(), false, new Supplier<Object>() {
- @Override
- public Object get() {
- return new LocalShardFound(shardInformation.getActor());
- }
- });
+ sendResponse(shardInformation, message.isWaitUntilInitialized(), false, () -> new LocalShardFound(shardInformation.getActor()));
}
private void sendResponse(ShardInformation shardInformation, boolean doWait,
final ActorRef sender = getSender();
final ActorRef self = self();
- Runnable replyRunnable = new Runnable() {
- @Override
- public void run() {
- sender.tell(messageSupplier.get(), self);
- }
- };
+ Runnable replyRunnable = () -> sender.tell(messageSupplier.get(), self);
OnShardInitialized onShardInitialized = wantShardReady ? new OnShardReady(replyRunnable) :
new OnShardInitialized(replyRunnable);
"Found primary shard %s but it's not initialized yet. Please try again later", shardId));
}
+ @VisibleForTesting
+ static MemberName memberToName(final Member member) {
+ return MemberName.forName(member.roles().iterator().next());
+ }
+
private void memberRemoved(ClusterEvent.MemberRemoved message) {
- String memberName = message.member().roles().iterator().next();
+ MemberName memberName = memberToName(message.member());
LOG.debug("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
message.member().address());
}
private void memberExited(ClusterEvent.MemberExited message) {
- String memberName = message.member().roles().iterator().next();
+ MemberName memberName = memberToName(message.member());
LOG.debug("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
message.member().address());
}
private void memberUp(ClusterEvent.MemberUp message) {
- String memberName = message.member().roles().iterator().next();
+ MemberName memberName = memberToName(message.member());
LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
message.member().address());
checkReady();
}
- private void addPeerAddress(String memberName, Address address) {
+ private void addPeerAddress(MemberName memberName, Address address) {
peerAddressResolver.addPeerAddress(memberName, address);
for(ShardInformation info : localShards.values()){
}
private void memberReachable(ClusterEvent.ReachableMember message) {
- String memberName = message.member().roles().iterator().next();
+ MemberName memberName = memberToName(message.member());
LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
addPeerAddress(memberName, message.member().address());
}
private void memberUnreachable(ClusterEvent.UnreachableMember message) {
- String memberName = message.member().roles().iterator().next();
+ MemberName memberName = memberToName(message.member());
LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
markMemberUnavailable(memberName);
}
- private void markMemberUnavailable(final String memberName) {
- for(ShardInformation info : localShards.values()){
+ private void markMemberUnavailable(final MemberName memberName) {
+ final String memberStr = memberName.getName();
+ for (ShardInformation info : localShards.values()) {
String leaderId = info.getLeaderId();
- if(leaderId != null && leaderId.contains(memberName)) {
+ // XXX: why are we using String#contains() here?
+ if (leaderId != null && leaderId.contains(memberStr)) {
LOG.debug("Marking Leader {} as unavailable.", leaderId);
info.setLeaderAvailable(false);
}
}
- private void markMemberAvailable(final String memberName) {
- for(ShardInformation info : localShards.values()){
+ private void markMemberAvailable(final MemberName memberName) {
+ final String memberStr = memberName.getName();
+ for (ShardInformation info : localShards.values()) {
String leaderId = info.getLeaderId();
- if(leaderId != null && leaderId.contains(memberName)) {
+ // XXX: why are we using String#contains() here?
+ if (leaderId != null && leaderId.contains(memberStr)) {
LOG.debug("Marking Leader {} as available.", leaderId);
info.setLeaderAvailable(true);
}
// First see if the there is a local replica for the shard
final ShardInformation info = localShards.get(shardName);
if (info != null && info.isActiveMember()) {
- sendResponse(info, message.isWaitUntilReady(), true, new Supplier<Object>() {
- @Override
- public Object get() {
- String primaryPath = info.getSerializedLeaderActor();
- Object found = canReturnLocalShardState && info.isLeader() ?
- new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
- new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion());
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
- }
-
- return found;
- }
+ sendResponse(info, message.isWaitUntilReady(), true, () -> {
+ String primaryPath = info.getSerializedLeaderActor();
+ Object found = canReturnLocalShardState && info.isLeader() ?
+ new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
+ new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion());
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
+ }
+
+ return found;
});
return;
* @param shardName
* @return
*/
- private ShardIdentifier getShardIdentifier(String memberName, String shardName){
+ private ShardIdentifier getShardIdentifier(MemberName memberName, String shardName){
return peerAddressResolver.getShardIdentifier(memberName, shardName);
}
*
*/
private void createLocalShards() {
- String memberName = this.cluster.getCurrentMemberName();
+ MemberName memberName = this.cluster.getCurrentMemberName();
Collection<String> memberShardNames = this.configuration.getMemberShardNames(memberName);
Map<String, DatastoreSnapshot.ShardSnapshot> shardSnapshots = new HashMap<>();
* @param shardName
*/
private Map<String, String> getPeerAddresses(String shardName) {
- Collection<String> members = configuration.getMembersFromShardName(shardName);
+ Collection<MemberName> members = configuration.getMembersFromShardName(shardName);
Map<String, String> peerAddresses = new HashMap<>();
- String currentMemberName = this.cluster.getCurrentMemberName();
+ MemberName currentMemberName = this.cluster.getCurrentMemberName();
- for(String memberName : members) {
- if(!currentMemberName.equals(memberName)) {
+ for (MemberName memberName : members) {
+ if (!currentMemberName.equals(memberName)) {
ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
String address = peerAddressResolver.getShardActorAddress(shardName, memberName);
peerAddresses.put(shardId.toString(), address);
public SupervisorStrategy supervisorStrategy() {
return new OneForOneStrategy(10, Duration.create("1 minute"),
- new Function<Throwable, SupervisorStrategy.Directive>() {
- @Override
- public SupervisorStrategy.Directive apply(Throwable t) {
- LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
- return SupervisorStrategy.resume();
- }
- }
+ (Function<Throwable, Directive>) t -> {
+ LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
+ return SupervisorStrategy.resume();
+ }
);
}
LOG.debug ("{}: onSnapshotOffer: {}", persistenceId(), currentSnapshot);
- String currentMember = cluster.getCurrentMemberName();
+ final MemberName currentMember = cluster.getCurrentMemberName();
Set<String> configuredShardList =
new HashSet<>(configuration.getMemberShardNames(currentMember));
for (String shard : currentSnapshot.getShardList()) {