+ // update only if remote version is newer
+ final long remoteVersion = receivedBucket.getVersion();
+ final Long localVersion = versions.get(addr);
+ if (localVersion != null && remoteVersion <= localVersion.longValue()) {
+ LOG.debug("Ignoring down-versioned bucket from {} ({} local {} remote)", addr, localVersion,
+ remoteVersion);
+ continue;
+ }
+ newBuckets.put(addr, receivedBucket);
+ versions.put(addr, remoteVersion);
+ final Bucket<T> prevBucket = remoteBuckets.put(addr, receivedBucket);
+
+ // Deal with DeathWatch subscriptions
+ final Optional<ActorRef> prevRef = prevBucket != null ? prevBucket.getWatchActor() : Optional.empty();
+ final Optional<ActorRef> curRef = receivedBucket.getWatchActor();
+ if (!curRef.equals(prevRef)) {
+ prevRef.ifPresent(ref -> removeWatch(addr, ref));
+ curRef.ifPresent(ref -> addWatch(addr, ref));
+ }