/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ * Copyright (c) 2014, 2017 Cisco Systems, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
import java.util.Optional;
import java.util.function.Consumer;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
-import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
+import org.opendaylight.controller.remote.rpc.RemoteOpsProviderConfig;
/**
* A store that syncs its data across nodes in the cluster.
*/
private final SetMultimap<ActorRef, Address> watchedActors = HashMultimap.create(1, 1);
- private final RemoteRpcProviderConfig config;
+ private final RemoteOpsProviderConfig config;
private final String persistenceId;
/**
private Integer incarnation;
private boolean persisting;
- protected BucketStoreActor(final RemoteRpcProviderConfig config, final String persistenceId, final T initialData) {
+ protected BucketStoreActor(final RemoteOpsProviderConfig config, final String persistenceId, final T initialData) {
this.config = Preconditions.checkNotNull(config);
this.initialData = Preconditions.checkNotNull(initialData);
this.persistenceId = Preconditions.checkNotNull(persistenceId);
return actor -> actor.updateRemoteBuckets(buckets);
}
+ static ExecuteInActor getLocalDataMessage() {
+ return actor -> actor.getSender().tell(actor.getLocalData(), actor.getSelf());
+ }
+
+ static ExecuteInActor getRemoteBucketsMessage() {
+ return actor -> actor.getSender().tell(ImmutableMap.copyOf(actor.getRemoteBuckets()), actor.getSelf());
+ }
+
public final T getLocalData() {
return getLocalBucket().getData();
}
} else if (message instanceof SaveSnapshotSuccess) {
LOG.debug("{}: got command: {}", persistenceId(), message);
SaveSnapshotSuccess saved = (SaveSnapshotSuccess)message;
- deleteSnapshots(new SnapshotSelectionCriteria(saved.metadata().sequenceNr(),
+ deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(),
saved.metadata().timestamp() - 1, 0L, 0L));
persisting = false;
unstash();
}
@Override
- protected final void handleRecover(final Object message) throws Exception {
+ protected final void handleRecover(final Object message) {
if (message instanceof RecoveryCompleted) {
if (incarnation != null) {
incarnation = incarnation + 1;
}
}
- protected final RemoteRpcProviderConfig getConfig() {
+ protected final RemoteOpsProviderConfig getConfig() {
return config;
}
* @param address Remote address
* @param bucket Bucket removed
*/
- protected abstract void onBucketRemoved(final Address address, final Bucket<T> bucket);
+ protected abstract void onBucketRemoved(Address address, Bucket<T> bucket);
/**
* Callback to subclasses invoked when the set of remote buckets is updated.
*
* @param newBuckets Map of address to new bucket. Never null, but can be empty.
*/
- protected abstract void onBucketsUpdated(final Map<Address, Bucket<T>> newBuckets);
+ protected abstract void onBucketsUpdated(Map<Address, Bucket<T>> newBuckets);
/**
* Helper to collect all known buckets.
versions.remove(addr);
final Bucket<T> bucket = remoteBuckets.remove(addr);
if (bucket != null) {
- LOG.debug("Source actor dead, removing bucket {} from ", bucket, addr);
+ LOG.debug("Source actor dead, removing bucket {} from {}", bucket, addr);
onBucketRemoved(addr, bucket);
}
}