import akka.actor.ActorRef;
import akka.dispatch.OnComplete;
import akka.util.Timeout;
+import com.google.common.annotations.VisibleForTesting;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
public void onComplete(Throwable failure, Object response) {
if(failure != null) {
LOG.debug("Error sending message {} to {}", message, shardActor, failure);
- // TODO - queue for retry
} else {
LOG.debug("{} message to {} succeeded", message, shardActor, failure);
}
return new DistributedEntityOwnershipCandidateRegistration(candidate, entity, this);
}
- void unregisterCandidate(Entity entity) {
+ void unregisterCandidate(Entity entity, EntityOwnershipCandidate entityOwnershipCandidate) {
LOG.debug("Unregistering candidate for {}", entity);
- executeLocalEntityOwnershipShardOperation(new UnregisterCandidateLocal(entity));
+ executeLocalEntityOwnershipShardOperation(new UnregisterCandidateLocal(entityOwnershipCandidate, entity));
registeredEntities.remove(entity);
}
}
protected EntityOwnershipShardPropsCreator newShardPropsCreator() {
- return new EntityOwnershipShardPropsCreator();
+ return new EntityOwnershipShardPropsCreator(datastore.getActorContext().getCurrentMemberName());
+ }
+
+ @VisibleForTesting
+ ActorRef getLocalEntityOwnershipShard() {
+ return localEntityOwnershipShard;
}
}