- private Mapper<Object, Void> getMapperToProcessRemoteStatus(final ActorRef sender, final GossipStatus status){
-
- final Map<Address, Long> remoteVersions = status.getVersions();
-
- return new Mapper<Object, Void>() {
- @Override
- public Void apply(Object replyMessage) {
- if (replyMessage instanceof GetBucketVersionsReply) {
- GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
- Map<Address, Long> localVersions = reply.getVersions();
-
- //diff between remote list and local
- Set<Address> localIsOlder = new HashSet<>();
- localIsOlder.addAll(remoteVersions.keySet());
- localIsOlder.removeAll(localVersions.keySet());
-
- //diff between local list and remote
- Set<Address> localIsNewer = new HashSet<>();
- localIsNewer.addAll(localVersions.keySet());
- localIsNewer.removeAll(remoteVersions.keySet());
-
-
- for (Address address : remoteVersions.keySet()){
-
- if (localVersions.get(address) == null || remoteVersions.get(address) == null)
- continue; //this condition is taken care of by above diffs
- if (localVersions.get(address) < remoteVersions.get(address))
- localIsOlder.add(address);
- else if (localVersions.get(address) > remoteVersions.get(address))
- localIsNewer.add(address);
- else
- continue;
- }
-
- if (!localIsOlder.isEmpty())
- sendGossipStatusTo(sender, localVersions );
-
- if (!localIsNewer.isEmpty())
- sendGossipTo(sender, localIsNewer);//send newer buckets to remote
-
- }
- return null;
- }
- };