- void getLocalStatusAndSendTo(Address remoteActorSystemAddress){
-
- //Get local status from bucket store and send to remote
- Future<Object> futureReply =
- Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration());
-
- //Find gossiper on remote system
- ActorSelection remoteRef = getContext().system().actorSelection(
- remoteActorSystemAddress.toString() + getSelf().path().toStringWithoutAddress());
-
- if(log.isDebugEnabled()) {
- log.debug("Sending bucket versions to [{}]", remoteRef);
- }
-
- futureReply.map(getMapperToSendLocalStatus(remoteRef), getContext().dispatcher());
-
- }
-
- /**
- * Helper to send bucket versions received from local store
- * @param remote remote gossiper to send versions to
- * @param localVersions bucket versions received from local store
- */
- void sendGossipStatusTo(ActorRef remote, Map<Address, Long> localVersions){
-
- GossipStatus status = new GossipStatus(selfAddress, localVersions);
- remote.tell(status, getSelf());
- }
-
- void sendGossipStatusTo(ActorSelection remote, Map<Address, Long> localVersions){
-
- GossipStatus status = new GossipStatus(selfAddress, localVersions);
- remote.tell(status, getSelf());
- }
-
- ///
- /// Private factories to create mappers
- ///
-
- private Mapper<Object, Void> getMapperToSendLocalStatus(final ActorSelection remote){
-
- return new Mapper<Object, Void>() {
- @Override
- public Void apply(Object replyMessage) {
- if (replyMessage instanceof GetBucketVersionsReply) {
- GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
- Map<Address, Long> localVersions = reply.getVersions();
-
- sendGossipStatusTo(remote, localVersions);
-
- }
- return null;
- }
- };
- }
-
- /**
- * Process bucket versions received from
- * {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}.
- * Then this method compares remote bucket versions with local bucket versions.
- * <ul>
- * <li>The buckets that are newer locally, send
- * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
- * to remote
- * <li>The buckets that are older locally, send
- * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus}
- * to remote so that remote sends GossipEnvelop.
- * </ul>
- *
- * @param sender the remote member
- * @param status bucket versions from a remote member
- * @return a {@link akka.dispatch.Mapper} that gets evaluated in future
- *
- */
- private Mapper<Object, Void> getMapperToProcessRemoteStatus(final ActorRef sender, final GossipStatus status){
-
- final Map<Address, Long> remoteVersions = status.getVersions();
-
- return new Mapper<Object, Void>() {
- @Override
- public Void apply(Object replyMessage) {
- if (replyMessage instanceof GetBucketVersionsReply) {
- GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
- Map<Address, Long> localVersions = reply.getVersions();
-
- //diff between remote list and local
- Set<Address> localIsOlder = new HashSet<>();
- localIsOlder.addAll(remoteVersions.keySet());
- localIsOlder.removeAll(localVersions.keySet());
-
- //diff between local list and remote
- Set<Address> localIsNewer = new HashSet<>();
- localIsNewer.addAll(localVersions.keySet());
- localIsNewer.removeAll(remoteVersions.keySet());
-
-
- for (Address address : remoteVersions.keySet()){
-
- if (localVersions.get(address) == null || remoteVersions.get(address) == null) {
- continue; //this condition is taken care of by above diffs
- }
- if (localVersions.get(address) < remoteVersions.get(address)) {
- localIsOlder.add(address);
- } else if (localVersions.get(address) > remoteVersions.get(address)) {
- localIsNewer.add(address);
- }
- }
-
- if (!localIsOlder.isEmpty()) {
- sendGossipStatusTo(sender, localVersions );
- }
-
- if (!localIsNewer.isEmpty()) {
- sendGossipTo(sender, localIsNewer);//send newer buckets to remote
- }
-
- }
- return null;
- }
- };
- }
-
- /**
- * Processes the message from {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}
- * that contains {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket}.
- * These buckets are sent to a remote member encapsulated in
- * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
- *
- * @param sender the remote member that sent
- * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus}
- * in reply to which bucket is being sent back
- * @return a {@link akka.dispatch.Mapper} that gets evaluated in future
- *
- */
- private Mapper<Object, Void> getMapperToSendGossip(final ActorRef sender) {
-
- return new Mapper<Object, Void>() {
- @Override
- public Void apply(Object msg) {
- if (msg instanceof GetBucketsByMembersReply) {
- Map<Address, Bucket> buckets = ((GetBucketsByMembersReply) msg).getBuckets();
- if(log.isDebugEnabled()) {
- log.debug("Buckets to send from {}: {}", selfAddress, buckets);
- }
- GossipEnvelope envelope = new GossipEnvelope(selfAddress, sender.path().address(), buckets);
- sender.tell(envelope, getSelf());
- }
- return null;
- }
- };