+
+ LOG.debug("Updating bucket from {} to version {}", entry.getKey(), remoteVersion);
+ }
+
+ LOG.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets);
+
+ onBucketsUpdated(newBuckets);
+ }
+
+ private void addWatch(final Address addr, final ActorRef ref) {
+ if (!watchedActors.containsKey(ref)) {
+ getContext().watch(ref);
+ LOG.debug("Watching {}", ref);
+ }
+ watchedActors.put(ref, addr);
+ }
+
+ private void removeWatch(final Address addr, final ActorRef ref) {
+ watchedActors.remove(ref, addr);
+ if (!watchedActors.containsKey(ref)) {
+ getContext().unwatch(ref);
+ LOG.debug("No longer watching {}", ref);
+ }
+ }
+
+ private void removeBucket(final Address addr) {
+ final Bucket<T> bucket = remoteBuckets.remove(addr);
+ if (bucket != null) {
+ bucket.getWatchActor().ifPresent(ref -> removeWatch(addr, ref));
+ onBucketRemoved(addr, bucket);