* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.controller.remote.rpc.registry.gossip;
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Verify.verify;
+import static java.util.Objects.requireNonNull;
import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess.Singletons.GET_ALL_BUCKETS;
import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess.Singletons.GET_BUCKET_VERSIONS;
import akka.persistence.SnapshotOffer;
import akka.persistence.SnapshotSelectionCriteria;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Verify;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.SetMultimap;
import java.util.Optional;
import java.util.function.Consumer;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
-import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
+import org.opendaylight.controller.remote.rpc.RemoteOpsProviderConfig;
/**
* A store that syncs its data across nodes in the cluster.
*/
private final SetMultimap<ActorRef, Address> watchedActors = HashMultimap.create(1, 1);
- private final RemoteRpcProviderConfig config;
+ private final RemoteOpsProviderConfig config;
private final String persistenceId;
/**
private Integer incarnation;
private boolean persisting;
- protected BucketStoreActor(final RemoteRpcProviderConfig config, final String persistenceId, final T initialData) {
- this.config = Preconditions.checkNotNull(config);
- this.initialData = Preconditions.checkNotNull(initialData);
- this.persistenceId = Preconditions.checkNotNull(persistenceId);
+ protected BucketStoreActor(final RemoteOpsProviderConfig config, final String persistenceId, final T initialData) {
+ this.config = requireNonNull(config);
+ this.initialData = requireNonNull(initialData);
+ this.persistenceId = requireNonNull(persistenceId);
}
static ExecuteInActor getBucketsByMembersMessage(final Collection<Address> members) {
return;
}
- if (message instanceof ExecuteInActor) {
- ((ExecuteInActor) message).accept(this);
+ if (message instanceof ExecuteInActor execute) {
+ execute.accept(this);
} else if (GET_BUCKET_VERSIONS == message) {
// FIXME: do we need to send ourselves?
getSender().tell(ImmutableMap.copyOf(versions), getSelf());
- } else if (message instanceof Terminated) {
- actorTerminated((Terminated) message);
- } else if (message instanceof DeleteSnapshotsSuccess) {
- LOG.debug("{}: got command: {}", persistenceId(), message);
- } else if (message instanceof DeleteSnapshotsFailure) {
- LOG.warn("{}: failed to delete prior snapshots", persistenceId(),
- ((DeleteSnapshotsFailure) message).cause());
+ } else if (message instanceof Terminated terminated) {
+ actorTerminated(terminated);
+ } else if (message instanceof DeleteSnapshotsSuccess deleteSuccess) {
+ LOG.debug("{}: got command: {}", persistenceId(), deleteSuccess);
+ } else if (message instanceof DeleteSnapshotsFailure deleteFailure) {
+ LOG.warn("{}: failed to delete prior snapshots", persistenceId(), deleteFailure.cause());
} else {
LOG.debug("Unhandled message [{}]", message);
unhandled(message);
}
private void handleSnapshotMessage(final Object message) {
- if (message instanceof SaveSnapshotFailure) {
- LOG.error("{}: failed to persist state", persistenceId(), ((SaveSnapshotFailure) message).cause());
+ if (message instanceof SaveSnapshotFailure saveFailure) {
+ LOG.error("{}: failed to persist state", persistenceId(), saveFailure.cause());
persisting = false;
self().tell(PoisonPill.getInstance(), ActorRef.noSender());
- } else if (message instanceof SaveSnapshotSuccess) {
- LOG.debug("{}: got command: {}", persistenceId(), message);
- SaveSnapshotSuccess saved = (SaveSnapshotSuccess)message;
- deleteSnapshots(new SnapshotSelectionCriteria(saved.metadata().sequenceNr(),
- saved.metadata().timestamp() - 1, 0L, 0L));
+ } else if (message instanceof SaveSnapshotSuccess saveSuccess) {
+ LOG.debug("{}: got command: {}", persistenceId(), saveSuccess);
+ deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), saveSuccess.metadata().timestamp() - 1,
+ 0L, 0L));
persisting = false;
unstash();
} else {
incarnation = 0;
}
- this.localBucket = new LocalBucket<>(incarnation.intValue(), initialData);
+ this.localBucket = new LocalBucket<>(incarnation, initialData);
initialData = null;
LOG.debug("{}: persisting new incarnation {}", persistenceId(), incarnation);
persisting = true;
saveSnapshot(incarnation);
- } else if (message instanceof SnapshotOffer) {
- incarnation = (Integer) ((SnapshotOffer)message).snapshot();
+ } else if (message instanceof SnapshotOffer snapshotOffer) {
+ incarnation = (Integer) snapshotOffer.snapshot();
LOG.debug("{}: recovered incarnation {}", persistenceId(), incarnation);
} else {
LOG.warn("{}: ignoring recovery message {}", persistenceId(), message);
}
}
- protected final RemoteRpcProviderConfig getConfig() {
+ protected final RemoteOpsProviderConfig getConfig() {
return config;
}
if (bumpIncarnation) {
LOG.debug("Version wrapped. incrementing incarnation");
- Verify.verify(incarnation < Integer.MAX_VALUE, "Ran out of incarnations, cannot continue");
+ verify(incarnation < Integer.MAX_VALUE, "Ran out of incarnations, cannot continue");
incarnation = incarnation + 1;
persisting = true;
versions.remove(addr);
final Bucket<T> bucket = remoteBuckets.remove(addr);
if (bucket != null) {
- LOG.debug("Source actor dead, removing bucket {} from ", bucket, addr);
+ LOG.debug("Source actor dead, removing bucket {} from {}", bucket, addr);
onBucketRemoved(addr, bucket);
}
}
}
private LocalBucket<T> getLocalBucket() {
- Preconditions.checkState(localBucket != null, "Attempted to access local bucket before recovery completed");
+ checkState(localBucket != null, "Attempted to access local bucket before recovery completed");
return localBucket;
}
}