-
- //Get local status from bucket store and send to remote
- Future<Object> futureReply =
- Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration());
-
- LOG.trace("Sending bucket versions to [{}]", remoteGossiper);
-
- futureReply.map(getMapperToSendLocalStatus(remoteGossiper), getContext().dispatcher());
- }
-
- ///
- /// Private factories to create mappers
- ///
-
- private Mapper<Object, Void> getMapperToSendLocalStatus(final ActorSelection remote) {
-
- return new Mapper<Object, Void>() {
- @Override
- public Void apply(final Object replyMessage) {
- if (replyMessage instanceof GetBucketVersionsReply) {
- GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
- Map<Address, Long> localVersions = reply.getVersions();
-
- remote.tell(new GossipStatus(selfAddress, localVersions), getSelf());
- }
- return null;
- }
- };
- }
-
- /**
- * Process bucket versions received from
- * {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}.
- * Then this method compares remote bucket versions with local bucket versions.
- * <ul>
- * <li>The buckets that are newer locally, send
- * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
- * to remote
- * <li>The buckets that are older locally, send
- * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus}
- * to remote so that remote sends GossipEnvelop.
- * </ul>
- *
- * @param sender the remote member
- * @param status bucket versions from a remote member
- * @return a {@link akka.dispatch.Mapper} that gets evaluated in future
- *
- */
- private Mapper<Object, Void> getMapperToProcessRemoteStatus(final ActorRef sender, final GossipStatus status) {
-
- final Map<Address, Long> remoteVersions = status.getVersions();
-
- return new Mapper<Object, Void>() {
- @Override
- public Void apply(final Object replyMessage) {
- if (replyMessage instanceof GetBucketVersionsReply) {
- GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
- Map<Address, Long> localVersions = reply.getVersions();
-
- //diff between remote list and local
- Set<Address> localIsOlder = new HashSet<>();
- localIsOlder.addAll(remoteVersions.keySet());
- localIsOlder.removeAll(localVersions.keySet());
-
- //diff between local list and remote
- Set<Address> localIsNewer = new HashSet<>();
- localIsNewer.addAll(localVersions.keySet());
- localIsNewer.removeAll(remoteVersions.keySet());
-
-
- for (Map.Entry<Address, Long> entry : remoteVersions.entrySet()) {
- Address address = entry.getKey();
- Long remoteVersion = entry.getValue();
- Long localVersion = localVersions.get(address);
- if (localVersion == null || remoteVersion == null) {
- //this condition is taken care of by above diffs
- continue;
- }
-
- if (localVersion < remoteVersion) {
- localIsOlder.add(address);
- } else if (localVersion > remoteVersion) {
- localIsNewer.add(address);
- }
- }
-
- if (!localIsOlder.isEmpty()) {
- sender.tell(new GossipStatus(selfAddress, localVersions), getSelf());
- }
-
- if (!localIsNewer.isEmpty()) {
- //send newer buckets to remote
- sendGossipTo(sender, localIsNewer);
- }
- }
- return null;
- }
- };
- }
-
- /**
- * Processes the message from {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}
- * that contains {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket}.
- * These buckets are sent to a remote member encapsulated in
- * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
- *
- * @param sender the remote member that sent
- * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus}
- * in reply to which bucket is being sent back
- * @return a {@link akka.dispatch.Mapper} that gets evaluated in future
- *
- */
- private Mapper<Object, Void> getMapperToSendGossip(final ActorRef sender) {
-
- return new Mapper<Object, Void>() {
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Override
- public Void apply(final Object msg) {
- if (msg instanceof GetBucketsByMembersReply) {
- Map<Address, Bucket<?>> buckets = ((GetBucketsByMembersReply) msg).getBuckets();
- LOG.trace("Buckets to send from {}: {}", selfAddress, buckets);
- GossipEnvelope envelope = new GossipEnvelope(selfAddress, sender.path().address(), buckets);
- sender.tell(envelope, getSelf());
- }
- return null;
- }
- };