import akka.actor.Address;
import akka.actor.Props;
import akka.cluster.ClusterActorRefProvider;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
-import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
import org.opendaylight.controller.utils.ConditionalProbe;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A store that syncs its data across nodes in the cluster.
private static final Long NO_VERSION = -1L;
- final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
+ protected final Logger log = LoggerFactory.getLogger(getClass());
/**
* Bucket owned by the node
private final RemoteRpcProviderConfig config;
- public BucketStore(){
- config = new RemoteRpcProviderConfig(getContext().system().settings().config());
+ public BucketStore(RemoteRpcProviderConfig config){
+ this.config = Preconditions.checkNotNull(config);
}
@Override
selfAddress = provider.getDefaultAddress();
if ( provider instanceof ClusterActorRefProvider) {
- getContext().actorOf(Props.create(Gossiper.class).withMailbox(config.getMailBoxName()), "gossiper");
+ getContext().actorOf(Props.create(Gossiper.class, config).withMailbox(config.getMailBoxName()), "gossiper");
}
}
} else if (message instanceof GetBucketVersions) {
receiveGetBucketVersions();
} else if (message instanceof UpdateRemoteBuckets) {
- receiveUpdateRemoteBuckets(((UpdateRemoteBuckets) message).getBuckets());
+ receiveUpdateRemoteBuckets(((UpdateRemoteBuckets<T>) message).getBuckets());
} else {
if(log.isDebugEnabled()) {
log.debug("Unhandled message [{}]", message);
*/
void receiveGetAllBuckets(){
final ActorRef sender = getSender();
- sender.tell(new GetAllBucketsReply(getAllBuckets()), getSelf());
+ sender.tell(new GetAllBucketsReply<T>(getAllBuckets()), getSelf());
}
/**
*
* @return self owned + remote buckets
*/
- @SuppressWarnings("rawtypes")
- Map<Address, Bucket> getAllBuckets(){
- Map<Address, Bucket> all = new HashMap<>(remoteBuckets.size() + 1);
+ Map<Address, Bucket<T>> getAllBuckets(){
+ Map<Address, Bucket<T>> all = new HashMap<>(remoteBuckets.size() + 1);
//first add the local bucket
all.put(selfAddress, new BucketImpl<>(localBucket));
*
* @param members requested members
*/
- @SuppressWarnings("rawtypes")
void receiveGetBucketsByMembers(Set<Address> members){
final ActorRef sender = getSender();
- Map<Address, Bucket> buckets = getBucketsByMembers(members);
- sender.tell(new GetBucketsByMembersReply(buckets), getSelf());
+ Map<Address, Bucket<T>> buckets = getBucketsByMembers(members);
+ sender.tell(new GetBucketsByMembersReply<T>(buckets), getSelf());
}
/**
* @param members requested members
* @return buckets for requested memebers
*/
- @SuppressWarnings("rawtypes")
- Map<Address, Bucket> getBucketsByMembers(Set<Address> members) {
- Map<Address, Bucket> buckets = new HashMap<>();
+ Map<Address, Bucket<T>> getBucketsByMembers(Set<Address> members) {
+ Map<Address, Bucket<T>> buckets = new HashMap<>();
//first add the local bucket if asked
if (members.contains(selfAddress)) {
* @param receivedBuckets buckets sent by remote
* {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}
*/
- @SuppressWarnings({ "rawtypes", "unchecked" })
- void receiveUpdateRemoteBuckets(Map<Address, Bucket> receivedBuckets){
+ void receiveUpdateRemoteBuckets(Map<Address, Bucket<T>> receivedBuckets){
log.debug("{}: receiveUpdateRemoteBuckets: {}", selfAddress, receivedBuckets);
if (receivedBuckets == null || receivedBuckets.isEmpty())
{
//Remote cant update self's bucket
receivedBuckets.remove(selfAddress);
- for (Map.Entry<Address, Bucket> entry : receivedBuckets.entrySet()){
+ for (Map.Entry<Address, Bucket<T>> entry : receivedBuckets.entrySet()){
Long localVersion = versions.get(entry.getKey());
if (localVersion == null) {
}
}
- protected BucketImpl<T> getLocalBucket() {
+ public BucketImpl<T> getLocalBucket() {
return localBucket;
}
versions.put(selfAddress, localBucket.getVersion());
}
- protected Map<Address, Bucket<T>> getRemoteBuckets() {
+ public Map<Address, Bucket<T>> getRemoteBuckets() {
return remoteBuckets;
}
- @VisibleForTesting
- Map<Address, Long> getVersions() {
+ public Map<Address, Long> getVersions() {
return versions;
}
}