/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ * Copyright (c) 2014, 2017 Cisco Systems, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
return actor -> actor.updateRemoteBuckets(buckets);
}
+ static ExecuteInActor getLocalDataMessage() {
+ return actor -> actor.getSender().tell(actor.getLocalData(), actor.getSelf());
+ }
+
+ static ExecuteInActor getRemoteBucketsMessage() {
+ return actor -> actor.getSender().tell(ImmutableMap.copyOf(actor.getRemoteBuckets()), actor.getSelf());
+ }
+
public final T getLocalData() {
return getLocalBucket().getData();
}
} else if (message instanceof SaveSnapshotSuccess) {
LOG.debug("{}: got command: {}", persistenceId(), message);
SaveSnapshotSuccess saved = (SaveSnapshotSuccess)message;
- deleteSnapshots(new SnapshotSelectionCriteria(saved.metadata().sequenceNr(),
+ deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(),
saved.metadata().timestamp() - 1, 0L, 0L));
persisting = false;
unstash();
}
@Override
- protected final void handleRecover(final Object message) throws Exception {
+ protected final void handleRecover(final Object message) {
if (message instanceof RecoveryCompleted) {
if (incarnation != null) {
incarnation = incarnation + 1;
* @param address Remote address
* @param bucket Bucket removed
*/
- protected abstract void onBucketRemoved(final Address address, final Bucket<T> bucket);
+ protected abstract void onBucketRemoved(Address address, Bucket<T> bucket);
/**
* Callback to subclasses invoked when the set of remote buckets is updated.
*
* @param newBuckets Map of address to new bucket. Never null, but can be empty.
*/
- protected abstract void onBucketsUpdated(final Map<Address, Bucket<T>> newBuckets);
+ protected abstract void onBucketsUpdated(Map<Address, Bucket<T>> newBuckets);
/**
* Helper to collect all known buckets.
bucket.getWatchActor().ifPresent(ref -> removeWatch(addr, ref));
onBucketRemoved(addr, bucket);
}
+ versions.remove(addr);
}
/**
versions.remove(addr);
final Bucket<T> bucket = remoteBuckets.remove(addr);
if (bucket != null) {
- LOG.debug("Source actor dead, removing bucket {} from ", bucket, addr);
+ LOG.debug("Source actor dead, removing bucket {} from {}", bucket, addr);
onBucketRemoved(addr, bucket);
}
}