import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
+import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
-import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
+import org.opendaylight.controller.remote.rpc.RemoteOpsProviderConfig;
import scala.concurrent.duration.FiniteDuration;
/**
};
private final boolean autoStartGossipTicks;
- private final RemoteRpcProviderConfig config;
+ private final RemoteOpsProviderConfig config;
/**
* All known cluster members.
private BucketStoreAccess bucketStore;
- Gossiper(final RemoteRpcProviderConfig config, final Boolean autoStartGossipTicks) {
+ Gossiper(final RemoteOpsProviderConfig config, final Boolean autoStartGossipTicks) {
this.config = Preconditions.checkNotNull(config);
this.autoStartGossipTicks = autoStartGossipTicks.booleanValue();
}
- Gossiper(final RemoteRpcProviderConfig config) {
+ Gossiper(final RemoteOpsProviderConfig config) {
this(config, Boolean.TRUE);
}
- public static Props props(final RemoteRpcProviderConfig config) {
+ public static Props props(final RemoteOpsProviderConfig config) {
return Props.create(Gossiper.class, config);
}
- static Props testProps(final RemoteRpcProviderConfig config) {
+ static Props testProps(final RemoteOpsProviderConfig config) {
return Props.create(Gossiper.class, config, Boolean.FALSE);
}
ActorRefProvider provider = getContext().provider();
selfAddress = provider.getDefaultAddress();
- bucketStore = new BucketStoreAccess(getContext(), config.getAskDuration());
+ bucketStore = new BucketStoreAccess(getContext().parent(), getContext().dispatcher(), config.getAskDuration());
if (provider instanceof ClusterActorRefProvider) {
cluster = Cluster.get(getContext().system());
}
@Override
- protected void handleReceive(final Object message) throws Exception {
+ protected void handleReceive(final Object message) {
//Usually sent by self via gossip task defined above. But its not enforced.
//These ticks can be sent by another actor as well which is esp. useful while testing
if (GOSSIP_TICK.equals(message)) {
* @param member who went down
*/
private void receiveMemberRemoveOrUnreachable(final Member member) {
+ LOG.debug("Received memberDown or Unreachable: {}", member);
+
//if its self, then stop itself
if (selfAddress.equals(member.address())) {
getContext().stop(getSelf());
* @param member the member to add
*/
private void receiveMemberUpOrReachable(final Member member) {
+ LOG.debug("Received memberUp or reachable: {}", member);
+
//ignore up notification for self
if (selfAddress.equals(member.address())) {
return;
*/
@VisibleForTesting
void updateRemoteBuckets(final Map<Address, ? extends Bucket<?>> buckets) {
- bucketStore.updateRemoteBuckets(buckets);
+ // filter this so we only handle buckets for known peers
+ bucketStore.updateRemoteBuckets(Maps.filterKeys(buckets, peers::containsKey));
}
/**