BUG-7556: update version tracking 22/50622/22
authorRobert Varga <rovarga@cisco.com>
Wed, 18 Jan 2017 15:01:21 +0000 (16:01 +0100)
committerTom Pantelis <tpanteli@brocade.com>
Thu, 9 Feb 2017 12:12:20 +0000 (12:12 +0000)
This patch adds better version tracking, so it does not rely
on calendar time, but rather is monotonically increasing.

The 64bit version field is logically split into an incarnation
number (31 bits) and a version number (32 bits).

Change-Id: Ie0e1f4089cc1ee582037982d9837490348158975
Signed-off-by: Robert Varga <rovarga@cisco.com>
13 files changed:
opendaylight/md-sal/sal-remoterpc-connector/pom.xml
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderConfig.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Bucket.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketImpl.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/LocalBucket.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Messages.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteRpcRegistryMXBeanImpl.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderTest.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreTest.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/resources/application.conf

index ee94c9818174faf25b508dee9c37a19154ff71dd..23e6e689b325a144b1ef9cacdb560e331844b4ef 100644 (file)
             <groupId>org.opendaylight.yangtools</groupId>
             <artifactId>yang-test-util</artifactId>
         </dependency>
             <groupId>org.opendaylight.yangtools</groupId>
             <artifactId>yang-test-util</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>sal-akka-raft</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
     </dependencies>
 
     <build>
index c1846b8e69a034678d1ee1c0ad63ee3f2dd6ea15..1d663877c50f3e07150ba350370c5a8f014bfc0b 100644 (file)
@@ -24,7 +24,9 @@ public class RemoteRpcProviderConfig extends CommonConfig {
     protected static final String TAG_RPC_REGISTRY_PATH = "rpc-registry-path";
     protected static final String TAG_RPC_MGR_PATH = "rpc-manager-path";
     protected static final String TAG_ASK_DURATION = "ask-duration";
     protected static final String TAG_RPC_REGISTRY_PATH = "rpc-registry-path";
     protected static final String TAG_RPC_MGR_PATH = "rpc-manager-path";
     protected static final String TAG_ASK_DURATION = "ask-duration";
+
     private static final String TAG_GOSSIP_TICK_INTERVAL = "gossip-tick-interval";
     private static final String TAG_GOSSIP_TICK_INTERVAL = "gossip-tick-interval";
+    private static final String TAG_RPC_REGISTRY_PERSISTENCE_ID = "rpc-registry-persistence-id";
 
     //locally cached values
     private Timeout cachedAskDuration;
 
     //locally cached values
     private Timeout cachedAskDuration;
@@ -56,7 +58,10 @@ public class RemoteRpcProviderConfig extends CommonConfig {
 
     public String getRpcRegistryPath() {
         return get().getString(TAG_RPC_REGISTRY_PATH);
 
     public String getRpcRegistryPath() {
         return get().getString(TAG_RPC_REGISTRY_PATH);
+    }
 
 
+    public String getRpcRegistryPersistenceId() {
+        return get().getString(TAG_RPC_REGISTRY_PERSISTENCE_ID);
     }
 
     public String getRpcManagerPath() {
     }
 
     public String getRpcManagerPath() {
@@ -118,6 +123,8 @@ public class RemoteRpcProviderConfig extends CommonConfig {
             configHolder.put(TAG_ASK_DURATION, "15s");
             configHolder.put(TAG_GOSSIP_TICK_INTERVAL, "500ms");
 
             configHolder.put(TAG_ASK_DURATION, "15s");
             configHolder.put(TAG_GOSSIP_TICK_INTERVAL, "500ms");
 
+            // persistence
+            configHolder.put(TAG_RPC_REGISTRY_PERSISTENCE_ID, "remote-rpc-registry");
         }
 
         public Builder gossipTickInterval(final String interval) {
         }
 
         public Builder gossipTickInterval(final String interval) {
index c8415cc818733ff67bcc9828eaf96eceac916f7c..54f76132090bc58da3ade977d5e4344416dec0c0 100644 (file)
@@ -43,7 +43,7 @@ public class RpcRegistry extends BucketStore<RoutingTable> {
     private final ActorRef rpcRegistrar;
 
     public RpcRegistry(final RemoteRpcProviderConfig config, final ActorRef rpcInvoker, final ActorRef rpcRegistrar) {
     private final ActorRef rpcRegistrar;
 
     public RpcRegistry(final RemoteRpcProviderConfig config, final ActorRef rpcInvoker, final ActorRef rpcRegistrar) {
-        super(config, new RoutingTable(rpcInvoker));
+        super(config, config.getRpcRegistryPersistenceId(), new RoutingTable(rpcInvoker));
         this.rpcRegistrar = Preconditions.checkNotNull(rpcRegistrar);
     }
 
         this.rpcRegistrar = Preconditions.checkNotNull(rpcRegistrar);
     }
 
@@ -61,19 +61,19 @@ public class RpcRegistry extends BucketStore<RoutingTable> {
     }
 
     @Override
     }
 
     @Override
-    protected void handleReceive(final Object message) throws Exception {
+    protected void handleCommand(final Object message) throws Exception {
         if (message instanceof AddOrUpdateRoutes) {
             receiveAddRoutes((AddOrUpdateRoutes) message);
         } else if (message instanceof RemoveRoutes) {
             receiveRemoveRoutes((RemoveRoutes) message);
         } else {
         if (message instanceof AddOrUpdateRoutes) {
             receiveAddRoutes((AddOrUpdateRoutes) message);
         } else if (message instanceof RemoveRoutes) {
             receiveRemoveRoutes((RemoveRoutes) message);
         } else {
-            super.handleReceive(message);
+            super.handleCommand(message);
         }
     }
 
     private void receiveAddRoutes(final AddOrUpdateRoutes msg) {
         LOG.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers());
         }
     }
 
     private void receiveAddRoutes(final AddOrUpdateRoutes msg) {
         LOG.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers());
-        updateLocalBucket(getLocalBucket().getData().addRpcs(msg.getRouteIdentifiers()));
+        updateLocalBucket(getLocalData().addRpcs(msg.getRouteIdentifiers()));
     }
 
     /**
     }
 
     /**
@@ -83,7 +83,7 @@ public class RpcRegistry extends BucketStore<RoutingTable> {
      */
     private void receiveRemoveRoutes(final RemoveRoutes msg) {
         LOG.debug("RemoveRoutes: {}", msg.getRouteIdentifiers());
      */
     private void receiveRemoveRoutes(final RemoveRoutes msg) {
         LOG.debug("RemoveRoutes: {}", msg.getRouteIdentifiers());
-        updateLocalBucket(getLocalBucket().getData().removeRpcs(msg.getRouteIdentifiers()));
+        updateLocalBucket(getLocalData().removeRpcs(msg.getRouteIdentifiers()));
     }
 
     @Override
     }
 
     @Override
index 4e3e5519fb10410a4d569ef24beab0837bba8c0a..faa51b90ade84b33c887491747ac3de88c331cf3 100644 (file)
@@ -9,11 +9,12 @@ package org.opendaylight.controller.remote.rpc.registry.gossip;
 
 import akka.actor.ActorRef;
 import java.util.Optional;
 
 import akka.actor.ActorRef;
 import java.util.Optional;
+import javax.annotation.Nonnull;
 
 public interface Bucket<T extends BucketData<T>> {
     long getVersion();
 
 
 public interface Bucket<T extends BucketData<T>> {
     long getVersion();
 
-    T getData();
+    @Nonnull T getData();
 
     default Optional<ActorRef> getWatchActor() {
         return getData().getWatchActor();
 
     default Optional<ActorRef> getWatchActor() {
         return getData().getWatchActor();
index a927c935331e31d13737ffb49a649cdc626c88f5..ade614b8d765ed23205d0565bbf0b428db484897 100644 (file)
@@ -7,27 +7,23 @@
  */
 package org.opendaylight.controller.remote.rpc.registry.gossip;
 
  */
 package org.opendaylight.controller.remote.rpc.registry.gossip;
 
+import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
 import java.io.Serializable;
 
 import java.io.Serializable;
 
-public final class BucketImpl<T extends BucketData<T>> implements Bucket<T>, Serializable {
+final class BucketImpl<T extends BucketData<T>> implements Bucket<T>, Serializable {
     private static final long serialVersionUID = 294779770032719196L;
 
     private static final long serialVersionUID = 294779770032719196L;
 
-    private Long version = System.currentTimeMillis();
+    // Guaranteed to be non-null.
+    // This is kept a Long for binary compatibility of serialization format.
+    private final Long version;
 
 
-    private T data;
+    // Guaranteed to be non-null
+    private final T data;
 
 
-    public BucketImpl(final T data) {
-        this.data = data;
-    }
-
-    public BucketImpl(final Bucket<T> other) {
-        this.version = other.getVersion();
-        this.data = other.getData();
-    }
-
-    public void setData(final T data) {
-        this.data = data;
-        this.version = System.currentTimeMillis() + 1;
+    BucketImpl(final Long version, final T data) {
+        this.version = Preconditions.checkNotNull(version);
+        this.data = Preconditions.checkNotNull(data);
     }
 
     @Override
     }
 
     @Override
@@ -44,4 +40,10 @@ public final class BucketImpl<T extends BucketData<T>> implements Bucket<T>, Ser
     public String toString() {
         return "BucketImpl{" + "version=" + version + ", data=" + data + '}';
     }
     public String toString() {
         return "BucketImpl{" + "version=" + version + ", data=" + data + '}';
     }
+
+    private Object readResolve() {
+        Verify.verifyNotNull(version);
+        Verify.verifyNotNull(data);
+        return this;
+    }
 }
 }
index b4af0adfe5f4160868752aab7e0210a8607c3910..70f4053723d4c426be572de0c9c69fbb3e6950c8 100644 (file)
@@ -8,12 +8,24 @@
 
 package org.opendaylight.controller.remote.rpc.registry.gossip;
 
 
 package org.opendaylight.controller.remote.rpc.registry.gossip;
 
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
+
 import akka.actor.ActorRef;
 import akka.actor.ActorRefProvider;
 import akka.actor.Address;
 import akka.actor.ActorRef;
 import akka.actor.ActorRefProvider;
 import akka.actor.Address;
+import akka.actor.PoisonPill;
 import akka.actor.Terminated;
 import akka.cluster.ClusterActorRefProvider;
 import akka.actor.Terminated;
 import akka.cluster.ClusterActorRefProvider;
+import akka.persistence.DeleteSnapshotsFailure;
+import akka.persistence.DeleteSnapshotsSuccess;
+import akka.persistence.RecoveryCompleted;
+import akka.persistence.SaveSnapshotFailure;
+import akka.persistence.SaveSnapshotSuccess;
+import akka.persistence.SnapshotOffer;
+import akka.persistence.SnapshotSelectionCriteria;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.SetMultimap;
 import java.util.HashMap;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.SetMultimap;
 import java.util.HashMap;
@@ -21,9 +33,8 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.Set;
 import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.Set;
-import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
@@ -31,7 +42,6 @@ import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketSto
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.RemoveRemoteBucket;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.RemoveRemoteBucket;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
-import org.opendaylight.controller.utils.ConditionalProbe;
 
 /**
  * A store that syncs its data across nodes in the cluster.
 
 /**
  * A store that syncs its data across nodes in the cluster.
@@ -43,12 +53,7 @@ import org.opendaylight.controller.utils.ConditionalProbe;
  * This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}.
  *
  */
  * This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}.
  *
  */
-public class BucketStore<T extends BucketData<T>> extends AbstractUntypedActorWithMetering {
-    /**
-     * Bucket owned by the node.
-     */
-    private final BucketImpl<T> localBucket;
-
+public class BucketStore<T extends BucketData<T>> extends AbstractUntypedPersistentActorWithMetering {
     /**
      * Buckets owned by other known nodes in the cluster.
      */
     /**
      * Buckets owned by other known nodes in the cluster.
      */
@@ -65,19 +70,31 @@ public class BucketStore<T extends BucketData<T>> extends AbstractUntypedActorWi
      */
     private final SetMultimap<ActorRef, Address> watchedActors = HashMultimap.create(1, 1);
 
      */
     private final SetMultimap<ActorRef, Address> watchedActors = HashMultimap.create(1, 1);
 
+    private final RemoteRpcProviderConfig config;
+    private final String persistenceId;
+
     /**
      * Cluster address for this node.
      */
     private Address selfAddress;
 
     /**
      * Cluster address for this node.
      */
     private Address selfAddress;
 
-    // FIXME: should be part of test-specific subclass
-    private ConditionalProbe probe;
-
-    private final RemoteRpcProviderConfig config;
+    /**
+     * Bucket owned by the node. Initialized during recovery (due to incarnation number).
+     */
+    private LocalBucket<T> localBucket;
+    private T initialData;
+    private Integer incarnation;
+    private boolean persisting;
 
 
-    public BucketStore(final RemoteRpcProviderConfig config, final T initialData) {
+    public BucketStore(final RemoteRpcProviderConfig config, final String persistenceId, final T initialData) {
         this.config = Preconditions.checkNotNull(config);
         this.config = Preconditions.checkNotNull(config);
-        this.localBucket = new BucketImpl<>(initialData);
+        this.initialData = Preconditions.checkNotNull(initialData);
+        this.persistenceId = Preconditions.checkNotNull(persistenceId);
+    }
+
+    @Override
+    public String persistenceId() {
+        return persistenceId;
     }
 
     @Override
     }
 
     @Override
@@ -92,9 +109,16 @@ public class BucketStore<T extends BucketData<T>> extends AbstractUntypedActorWi
 
     @SuppressWarnings("unchecked")
     @Override
 
     @SuppressWarnings("unchecked")
     @Override
-    protected void handleReceive(final Object message) throws Exception {
-        if (probe != null) {
-            probe.tell(message, getSelf());
+    protected void handleCommand(final Object message) throws Exception {
+        if (message instanceof GetAllBuckets) {
+            // GetAllBuckets is used only in testing
+            receiveGetAllBuckets();
+            return;
+        }
+
+        if (persisting) {
+            handleSnapshotMessage(message);
+            return;
         }
 
         if (message instanceof GetBucketsByMembers) {
         }
 
         if (message instanceof GetBucketsByMembers) {
@@ -107,21 +131,57 @@ public class BucketStore<T extends BucketData<T>> extends AbstractUntypedActorWi
             removeBucket(((RemoveRemoteBucket) message).getAddress());
         } else if (message instanceof Terminated) {
             actorTerminated((Terminated) message);
             removeBucket(((RemoveRemoteBucket) message).getAddress());
         } else if (message instanceof Terminated) {
             actorTerminated((Terminated) message);
-        } else if (message instanceof GetAllBuckets) {
-            // GetAllBuckets is used only for unit tests.
-            receiveGetAllBuckets();
-        } else if (message instanceof ConditionalProbe) {
-            // The ConditionalProbe is only used for unit tests.
-            LOG.info("Received probe {} {}", getSelf(), message);
-            probe = (ConditionalProbe) message;
-            // Send back any message to tell the caller we got the probe.
-            getSender().tell("Got it", getSelf());
+        } else if (message instanceof DeleteSnapshotsSuccess) {
+            LOG.debug("{}: got command: {}", persistenceId(), message);
+        } else if (message instanceof DeleteSnapshotsFailure) {
+            LOG.warn("{}: failed to delete prior snapshots", persistenceId(),
+                ((DeleteSnapshotsFailure) message).cause());
         } else {
             LOG.debug("Unhandled message [{}]", message);
             unhandled(message);
         }
     }
 
         } else {
             LOG.debug("Unhandled message [{}]", message);
             unhandled(message);
         }
     }
 
+    private void handleSnapshotMessage(final Object message) {
+        if (message instanceof SaveSnapshotFailure) {
+            LOG.error("{}: failed to persist state", persistenceId(), ((SaveSnapshotFailure) message).cause());
+            persisting = false;
+            self().tell(PoisonPill.getInstance(), ActorRef.noSender());
+        } else if (message instanceof SaveSnapshotSuccess) {
+            LOG.debug("{}: got command: {}", persistenceId(), message);
+            SaveSnapshotSuccess saved = (SaveSnapshotSuccess)message;
+            deleteSnapshots(new SnapshotSelectionCriteria(saved.metadata().sequenceNr(),
+                    saved.metadata().timestamp() - 1, 0L, 0L));
+            persisting = false;
+            unstash();
+        } else {
+            LOG.debug("{}: stashing command {}", persistenceId(), message);
+            stash();
+        }
+    }
+
+    @Override
+    protected void handleRecover(final Object message) throws Exception {
+        if (message instanceof RecoveryCompleted) {
+            if (incarnation != null) {
+                incarnation = incarnation + 1;
+            } else {
+                incarnation = 0;
+            }
+
+            this.localBucket = new LocalBucket<>(incarnation.intValue(), initialData);
+            initialData = null;
+            LOG.debug("{}: persisting new incarnation {}", persistenceId(), incarnation);
+            persisting = true;
+            saveSnapshot(incarnation);
+        } else if (message instanceof SnapshotOffer) {
+            incarnation = (Integer) ((SnapshotOffer)message).snapshot();
+            LOG.debug("{}: recovered incarnation {}", persistenceId(), incarnation);
+        } else {
+            LOG.warn("{}: ignoring recovery message {}", persistenceId(), message);
+        }
+    }
+
     protected RemoteRpcProviderConfig getConfig() {
         return config;
     }
     protected RemoteRpcProviderConfig getConfig() {
         return config;
     }
@@ -129,7 +189,8 @@ public class BucketStore<T extends BucketData<T>> extends AbstractUntypedActorWi
     /**
      * Returns all the buckets the this node knows about, self owned + remote.
      */
     /**
      * Returns all the buckets the this node knows about, self owned + remote.
      */
-    void receiveGetAllBuckets() {
+    @VisibleForTesting
+    protected void receiveGetAllBuckets() {
         final ActorRef sender = getSender();
         sender.tell(new GetAllBucketsReply<>(getAllBuckets()), getSelf());
     }
         final ActorRef sender = getSender();
         sender.tell(new GetAllBucketsReply<>(getAllBuckets()), getSelf());
     }
@@ -143,7 +204,7 @@ public class BucketStore<T extends BucketData<T>> extends AbstractUntypedActorWi
         Map<Address, Bucket<T>> all = new HashMap<>(remoteBuckets.size() + 1);
 
         //first add the local bucket
         Map<Address, Bucket<T>> all = new HashMap<>(remoteBuckets.size() + 1);
 
         //first add the local bucket
-        all.put(selfAddress, new BucketImpl<>(localBucket));
+        all.put(selfAddress, getLocalBucket().snapshot());
 
         //then get all remote buckets
         all.putAll(remoteBuckets);
 
         //then get all remote buckets
         all.putAll(remoteBuckets);
@@ -173,7 +234,7 @@ public class BucketStore<T extends BucketData<T>> extends AbstractUntypedActorWi
 
         //first add the local bucket if asked
         if (members.contains(selfAddress)) {
 
         //first add the local bucket if asked
         if (members.contains(selfAddress)) {
-            buckets.put(selfAddress, new BucketImpl<>(localBucket));
+            buckets.put(selfAddress, getLocalBucket().snapshot());
         }
 
         //then get buckets for requested remote nodes
         }
 
         //then get buckets for requested remote nodes
@@ -307,13 +368,34 @@ public class BucketStore<T extends BucketData<T>> extends AbstractUntypedActorWi
         // Default noop
     }
 
         // Default noop
     }
 
-    public BucketImpl<T> getLocalBucket() {
+    @VisibleForTesting
+    protected boolean isPersisting() {
+        return persisting;
+    }
+
+    public T getLocalData() {
+        return getLocalBucket().getData();
+    }
+
+    private LocalBucket<T> getLocalBucket() {
+        Preconditions.checkState(localBucket != null, "Attempted to access local bucket before recovery completed");
         return localBucket;
     }
 
     protected void updateLocalBucket(final T data) {
         return localBucket;
     }
 
     protected void updateLocalBucket(final T data) {
-        localBucket.setData(data);
-        versions.put(selfAddress, localBucket.getVersion());
+        final LocalBucket<T> local = getLocalBucket();
+        final boolean bumpIncarnation = local.setData(data);
+        versions.put(selfAddress, local.getVersion());
+
+        if (bumpIncarnation) {
+            LOG.debug("Version wrapped. incrementing incarnation");
+
+            Verify.verify(incarnation < Integer.MAX_VALUE, "Ran out of incarnations, cannot continue");
+            incarnation = incarnation + 1;
+
+            persisting = true;
+            saveSnapshot(incarnation);
+        }
     }
 
     public Map<Address, Bucket<T>> getRemoteBuckets() {
     }
 
     public Map<Address, Bucket<T>> getRemoteBuckets() {
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/LocalBucket.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/LocalBucket.java
new file mode 100644 (file)
index 0000000..b2c5e0c
--- /dev/null
@@ -0,0 +1,64 @@
+/*
+ * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.registry.gossip;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Local bucket implementation. Unlike a full-blown {@link Bucket}, this class is mutable and tracks when it has been
+ * changed and when it has been sent anywhere.
+ *
+ * @author Robert Varga
+ */
+final class LocalBucket<T extends BucketData<T>> {
+    /*
+     * Decomposed 64bit signed version number. Always non-negative, hence the most significant bit is always zero.
+     * - incarnation number (most-significant 31 bits, forming an unsigned int)
+     * - version number (least-significant 32 bits, treated as unsigned int)
+     *
+     * We are keeping a boxed version here, as we stick it into a map anyway.
+     */
+    private Long version;
+    private T data;
+
+    // We bump versions only if we took a snapshot since last data update
+    private boolean bumpVersion;
+
+    LocalBucket(final int incarnation, final T data) {
+        Preconditions.checkArgument(incarnation >= 0);
+        this.version = ((long)incarnation) << Integer.SIZE;
+        this.data = Preconditions.checkNotNull(data);
+    }
+
+    T getData() {
+        return data;
+    }
+
+    Long getVersion() {
+        return version;
+    }
+
+    Bucket<T> snapshot() {
+        bumpVersion = true;
+        return new BucketImpl<>(version, data);
+    }
+
+    boolean setData(final T data) {
+        this.data = Preconditions.checkNotNull(data);
+        if (bumpVersion) {
+            final long next = version.longValue() + 1;
+            if ((next & 0xffff_ffffL) == 0) {
+                return true;
+            }
+
+            version = next;
+            bumpVersion = false;
+        }
+        return false;
+    }
+}
index 6f81fe8b32e3c30ddfa3325a0c9b81b2a3d6ca93..361f5b7e1c07bf4b3cf07539a58bdf421cf177bb 100644 (file)
@@ -12,6 +12,8 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import java.io.Serializable;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.ContainsBucketVersions;
 import java.util.Map;
 import java.util.Set;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.ContainsBucketVersions;
@@ -50,7 +52,7 @@ public class Messages {
 
             protected ContainsBuckets(final Map<Address, Bucket<T>> buckets) {
                 Preconditions.checkArgument(buckets != null, "buckets can not be null");
 
             protected ContainsBuckets(final Map<Address, Bucket<T>> buckets) {
                 Preconditions.checkArgument(buckets != null, "buckets can not be null");
-                this.buckets = ImmutableMap.copyOf(buckets);
+                this.buckets = Collections.unmodifiableMap(new HashMap<>(buckets));
             }
 
             public final Map<Address, Bucket<T>> getBuckets() {
             }
 
             public final Map<Address, Bucket<T>> getBuckets() {
index 5fbd91cc81c349ad0244ac4b0d96c39f6ab4ebf7..339d4b1a3285b58ac1203523b996c8a268b9fb9d 100644 (file)
@@ -45,7 +45,7 @@ public class RemoteRpcRegistryMXBeanImpl extends AbstractMXBean implements Remot
 
     @Override
     public Set<String> getGlobalRpc() {
 
     @Override
     public Set<String> getGlobalRpc() {
-        RoutingTable table = rpcRegistry.getLocalBucket().getData();
+        RoutingTable table = rpcRegistry.getLocalData();
         Set<String> globalRpc = new HashSet<>(table.getRoutes().size());
         for (RpcRouter.RouteIdentifier<?, ?, ?> route : table.getRoutes()) {
             if (route.getRoute() == null) {
         Set<String> globalRpc = new HashSet<>(table.getRoutes().size());
         for (RpcRouter.RouteIdentifier<?, ?, ?> route : table.getRoutes()) {
             if (route.getRoute() == null) {
@@ -59,7 +59,7 @@ public class RemoteRpcRegistryMXBeanImpl extends AbstractMXBean implements Remot
 
     @Override
     public Set<String> getLocalRegisteredRoutedRpc() {
 
     @Override
     public Set<String> getLocalRegisteredRoutedRpc() {
-        RoutingTable table = rpcRegistry.getLocalBucket().getData();
+        RoutingTable table = rpcRegistry.getLocalData();
         Set<String> routedRpc = new HashSet<>(table.getRoutes().size());
         for (RpcRouter.RouteIdentifier<?, ?, ?> route : table.getRoutes()) {
             if (route.getRoute() != null) {
         Set<String> routedRpc = new HashSet<>(table.getRoutes().size());
         for (RpcRouter.RouteIdentifier<?, ?, ?> route : table.getRoutes()) {
             if (route.getRoute() != null) {
@@ -76,7 +76,7 @@ public class RemoteRpcRegistryMXBeanImpl extends AbstractMXBean implements Remot
 
     @Override
     public Map<String, String> findRpcByName(final String name) {
 
     @Override
     public Map<String, String> findRpcByName(final String name) {
-        RoutingTable localTable = rpcRegistry.getLocalBucket().getData();
+        RoutingTable localTable = rpcRegistry.getLocalData();
         // Get all RPCs from local bucket
         Map<String, String> rpcMap = new HashMap<>(getRpcMemberMapByName(localTable, name, LOCAL_CONSTANT));
 
         // Get all RPCs from local bucket
         Map<String, String> rpcMap = new HashMap<>(getRpcMemberMapByName(localTable, name, LOCAL_CONSTANT));
 
@@ -92,8 +92,8 @@ public class RemoteRpcRegistryMXBeanImpl extends AbstractMXBean implements Remot
     }
 
     @Override
     }
 
     @Override
-    public Map<String, String> findRpcByRoute(String routeId) {
-        RoutingTable localTable = rpcRegistry.getLocalBucket().getData();
+    public Map<String, String> findRpcByRoute(final String routeId) {
+        RoutingTable localTable = rpcRegistry.getLocalData();
         Map<String, String> rpcMap = new HashMap<>(getRpcMemberMapByRoute(localTable, routeId, LOCAL_CONSTANT));
 
         Map<Address, Bucket<RoutingTable>> buckets = rpcRegistry.getRemoteBuckets();
         Map<String, String> rpcMap = new HashMap<>(getRpcMemberMapByRoute(localTable, routeId, LOCAL_CONSTANT));
 
         Map<Address, Bucket<RoutingTable>> buckets = rpcRegistry.getRemoteBuckets();
@@ -109,7 +109,7 @@ public class RemoteRpcRegistryMXBeanImpl extends AbstractMXBean implements Remot
     /**
      * Search if the routing table route String contains routeName.
      */
     /**
      * Search if the routing table route String contains routeName.
      */
-    private Map<String,String> getRpcMemberMapByRoute(final RoutingTable table, final String routeName,
+    private static Map<String,String> getRpcMemberMapByRoute(final RoutingTable table, final String routeName,
                                                       final String address) {
         Set<RpcRouter.RouteIdentifier<?, ?, ?>> routes = table.getRoutes();
         Map<String, String> rpcMap = new HashMap<>(routes.size());
                                                       final String address) {
         Set<RpcRouter.RouteIdentifier<?, ?, ?>> routes = table.getRoutes();
         Map<String, String> rpcMap = new HashMap<>(routes.size());
@@ -130,7 +130,7 @@ public class RemoteRpcRegistryMXBeanImpl extends AbstractMXBean implements Remot
     /**
      * Search if the routing table route type contains name.
      */
     /**
      * Search if the routing table route type contains name.
      */
-    private Map<String, String>  getRpcMemberMapByName(final RoutingTable table, final String name,
+    private static Map<String, String>  getRpcMemberMapByName(final RoutingTable table, final String name,
                                                        final String address) {
         Set<RpcRouter.RouteIdentifier<?, ?, ?>> routes = table.getRoutes();
         Map<String, String> rpcMap = new HashMap<>(routes.size());
                                                        final String address) {
         Set<RpcRouter.RouteIdentifier<?, ?, ?>> routes = table.getRoutes();
         Map<String, String> rpcMap = new HashMap<>(routes.size());
index 06781b430d0081657a9477b03e5869b0521d1b71..76e63c4e1d3da01d8aa7ce7a864f360d6f6b4dfe 100644 (file)
@@ -15,6 +15,7 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.testkit.JavaTestKit;
 import com.typesafe.config.Config;
 import akka.actor.ActorSystem;
 import akka.testkit.JavaTestKit;
 import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 import java.util.concurrent.TimeUnit;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import java.util.concurrent.TimeUnit;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -32,7 +33,8 @@ public class RemoteRpcProviderTest {
 
     @BeforeClass
     public static void setup() throws InterruptedException {
 
     @BeforeClass
     public static void setup() throws InterruptedException {
-        moduleConfig = new RemoteRpcProviderConfig.Builder("odl-cluster-rpc").build();
+        moduleConfig = new RemoteRpcProviderConfig.Builder("odl-cluster-rpc")
+                .withConfigReader(ConfigFactory::load).build();
         final Config config = moduleConfig.get();
         system = ActorSystem.create("odl-cluster-rpc", config);
 
         final Config config = moduleConfig.get();
         system = ActorSystem.create("odl-cluster-rpc", config);
 
index 3ceb5c041cbe0cd0523496497e9bfe2f0f1aa289..0c3a57e8c9661e7c000884e62ba0010d376f8033 100644 (file)
@@ -80,7 +80,7 @@ public class RpcRegistryTest {
 
     @BeforeClass
     public static void staticSetup() throws InterruptedException {
 
     @BeforeClass
     public static void staticSetup() throws InterruptedException {
-        AkkaConfigurationReader reader = () -> ConfigFactory.load();
+        AkkaConfigurationReader reader = ConfigFactory::load;
 
         RemoteRpcProviderConfig config1 = new RemoteRpcProviderConfig.Builder("memberA").gossipTickInterval("200ms")
                 .withConfigReader(reader).build();
 
         RemoteRpcProviderConfig config1 = new RemoteRpcProviderConfig.Builder("memberA").gossipTickInterval("200ms")
                 .withConfigReader(reader).build();
index 4e3961aac139f653a8886bf5eb18937891df21c9..59dd29b3142f5c8f80ad81dce17fa859c382eb57 100644 (file)
@@ -7,22 +7,39 @@
  */
 package org.opendaylight.controller.remote.rpc.registry.gossip;
 
  */
 package org.opendaylight.controller.remote.rpc.registry.gossip;
 
+import static akka.actor.ActorRef.noSender;
+
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.Address;
 import akka.actor.Props;
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.Address;
 import akka.actor.Props;
+import akka.actor.Status.Success;
+import akka.pattern.Patterns;
+import akka.persistence.SaveSnapshotSuccess;
 import akka.testkit.JavaTestKit;
 import akka.testkit.JavaTestKit;
-import akka.testkit.TestActorRef;
+import akka.util.Timeout;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigFactory;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.AfterClass;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
 import org.opendaylight.controller.remote.rpc.TerminationMonitor;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
 import org.opendaylight.controller.remote.rpc.TerminationMonitor;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
+import scala.concurrent.Await;
+import scala.concurrent.duration.Duration;
 
 public class BucketStoreTest {
 
 
 public class BucketStoreTest {
 
@@ -41,6 +58,8 @@ public class BucketStoreTest {
 
     private static ActorSystem system;
 
 
     private static ActorSystem system;
 
+    private JavaTestKit kit;
+
     @BeforeClass
     public static void setup() {
         system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("unit-test"));
     @BeforeClass
     public static void setup() {
         system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("unit-test"));
@@ -52,24 +71,33 @@ public class BucketStoreTest {
         JavaTestKit.shutdownActorSystem(system);
     }
 
         JavaTestKit.shutdownActorSystem(system);
     }
 
+    @Before
+    public void before() {
+        kit = new JavaTestKit(system);
+    }
+
+    @After
+    public void after() {
+        kit.shutdown(system);
+    }
+
     /**
      * Given remote buckets, should merge with local copy of remote buckets.
      */
     @Test
     /**
      * Given remote buckets, should merge with local copy of remote buckets.
      */
     @Test
-    public void testReceiveUpdateRemoteBuckets() {
-
-        final BucketStore<T> store = createStore();
+    public void testReceiveUpdateRemoteBuckets() throws Exception {
 
 
+        final ActorRef store = createStore();
         Address localAddress = system.provider().getDefaultAddress();
         Address localAddress = system.provider().getDefaultAddress();
-        Bucket<T> localBucket = new BucketImpl<>(new T());
+        Bucket<T> localBucket = new BucketImpl<>(0L, new T());
 
         Address a1 = new Address("tcp", "system1");
         Address a2 = new Address("tcp", "system2");
         Address a3 = new Address("tcp", "system3");
 
 
         Address a1 = new Address("tcp", "system1");
         Address a2 = new Address("tcp", "system2");
         Address a3 = new Address("tcp", "system3");
 
-        Bucket<T> b1 = new BucketImpl<>(new T());
-        Bucket<T> b2 = new BucketImpl<>(new T());
-        Bucket<T> b3 = new BucketImpl<>(new T());
+        Bucket<T> b1 = new BucketImpl<>(0L, new T());
+        Bucket<T> b2 = new BucketImpl<>(0L, new T());
+        Bucket<T> b3 = new BucketImpl<>(0L, new T());
 
         Map<Address, Bucket<T>> remoteBuckets = new HashMap<>(3);
         remoteBuckets.put(a1, b1);
 
         Map<Address, Bucket<T>> remoteBuckets = new HashMap<>(3);
         remoteBuckets.put(a1, b1);
@@ -77,38 +105,40 @@ public class BucketStoreTest {
         remoteBuckets.put(a3, b3);
         remoteBuckets.put(localAddress, localBucket);
 
         remoteBuckets.put(a3, b3);
         remoteBuckets.put(localAddress, localBucket);
 
+        Await.result(Patterns.ask(store, new WaitUntilDonePersisting(),
+                Timeout.apply(5, TimeUnit.SECONDS)), Duration.Inf());
+
         //Given remote buckets
         //Given remote buckets
-        store.receiveUpdateRemoteBuckets(remoteBuckets);
+        store.tell(new UpdateRemoteBuckets<>(remoteBuckets), noSender());
 
 
-        //Should NOT contain local bucket
-        //Should contain ONLY 3 entries i.e a1, a2, a3
-        Map<Address, Bucket<T>> remoteBucketsInStore = store.getRemoteBuckets();
-        Assert.assertFalse("remote buckets contains local bucket", remoteBucketsInStore.containsKey(localAddress));
-        Assert.assertTrue(remoteBucketsInStore.size() == 3);
+        //Should contain local bucket
+        //Should contain 4 entries i.e a1, a2, a3, local
+        Map<Address, Bucket<T>> remoteBucketsInStore = getBuckets(store);
+        Assert.assertTrue(remoteBucketsInStore.size() == 4);
 
         //Add a new remote bucket
         Address a4 = new Address("tcp", "system4");
 
         //Add a new remote bucket
         Address a4 = new Address("tcp", "system4");
-        Bucket<T> b4 = new BucketImpl<>(new T());
+        Bucket<T> b4 = new BucketImpl<>(0L, new T());
         remoteBuckets.clear();
         remoteBuckets.put(a4, b4);
         remoteBuckets.clear();
         remoteBuckets.put(a4, b4);
-        store.receiveUpdateRemoteBuckets(remoteBuckets);
+        store.tell(new UpdateRemoteBuckets<>(remoteBuckets), noSender());
 
         //Should contain a4
 
         //Should contain a4
-        //Should contain 4 entries now i.e a1, a2, a3, a4
-        remoteBucketsInStore = store.getRemoteBuckets();
+        //Should contain 5 entries now i.e a1, a2, a3, a4, local
+        remoteBucketsInStore = getBuckets(store);
         Assert.assertTrue("Does not contain a4", remoteBucketsInStore.containsKey(a4));
         Assert.assertTrue("Does not contain a4", remoteBucketsInStore.containsKey(a4));
-        Assert.assertTrue(remoteBucketsInStore.size() == 4);
+        Assert.assertTrue(remoteBucketsInStore.size() == 5);
 
         //Update a bucket
 
         //Update a bucket
-        Bucket<T> b3New = new BucketImpl<>(new T());
+        Bucket<T> b3New = new BucketImpl<>(0L, new T());
         remoteBuckets.clear();
         remoteBuckets.put(a3, b3New);
         remoteBuckets.put(a1, null);
         remoteBuckets.put(a2, null);
         remoteBuckets.clear();
         remoteBuckets.put(a3, b3New);
         remoteBuckets.put(a1, null);
         remoteBuckets.put(a2, null);
-        store.receiveUpdateRemoteBuckets(remoteBuckets);
+        store.tell(new UpdateRemoteBuckets<>(remoteBuckets), noSender());
 
         //Should only update a3
 
         //Should only update a3
-        remoteBucketsInStore = store.getRemoteBuckets();
+        remoteBucketsInStore = getBuckets(store);
         Bucket<T> b3InStore = remoteBucketsInStore.get(a3);
         Assert.assertEquals(b3New.getVersion(), b3InStore.getVersion());
 
         Bucket<T> b3InStore = remoteBucketsInStore.get(a3);
         Assert.assertEquals(b3New.getVersion(), b3InStore.getVersion());
 
@@ -117,11 +147,11 @@ public class BucketStoreTest {
         Bucket<T> b2InStore = remoteBucketsInStore.get(a2);
         Assert.assertEquals(b1.getVersion(), b1InStore.getVersion());
         Assert.assertEquals(b2.getVersion(), b2InStore.getVersion());
         Bucket<T> b2InStore = remoteBucketsInStore.get(a2);
         Assert.assertEquals(b1.getVersion(), b1InStore.getVersion());
         Assert.assertEquals(b2.getVersion(), b2InStore.getVersion());
-        Assert.assertTrue(remoteBucketsInStore.size() == 4);
+        Assert.assertTrue(remoteBucketsInStore.size() == 5);
 
         //Should update versions map
         //versions map contains versions for all remote buckets (4).
 
         //Should update versions map
         //versions map contains versions for all remote buckets (4).
-        Map<Address, Long> versionsInStore = store.getVersions();
+        Map<Address, Long> versionsInStore = getVersions(store);
         Assert.assertEquals(4, versionsInStore.size());
         Assert.assertEquals((Long)b1.getVersion(), versionsInStore.get(a1));
         Assert.assertEquals((Long)b2.getVersion(), versionsInStore.get(a2));
         Assert.assertEquals(4, versionsInStore.size());
         Assert.assertEquals((Long)b1.getVersion(), versionsInStore.get(a1));
         Assert.assertEquals((Long)b2.getVersion(), versionsInStore.get(a2));
@@ -131,13 +161,12 @@ public class BucketStoreTest {
         //Send older version of bucket
         remoteBuckets.clear();
         remoteBuckets.put(a3, b3);
         //Send older version of bucket
         remoteBuckets.clear();
         remoteBuckets.put(a3, b3);
-        store.receiveUpdateRemoteBuckets(remoteBuckets);
+        store.tell(new UpdateRemoteBuckets<>(remoteBuckets), noSender());
 
         //Should NOT update a3
 
         //Should NOT update a3
-        remoteBucketsInStore = store.getRemoteBuckets();
+        remoteBucketsInStore = getBuckets(store);
         b3InStore = remoteBucketsInStore.get(a3);
         Assert.assertEquals(b3InStore.getVersion(), b3New.getVersion());
         b3InStore = remoteBucketsInStore.get(a3);
         Assert.assertEquals(b3InStore.getVersion(), b3New.getVersion());
-
     }
 
     /**
     }
 
     /**
@@ -145,10 +174,63 @@ public class BucketStoreTest {
      *
      * @return instance of BucketStore class
      */
      *
      * @return instance of BucketStore class
      */
-    private static BucketStore<T> createStore() {
-        final Props props = Props.create(BucketStore.class, new RemoteRpcProviderConfig(system.settings().config()),
-            new T());
-        final TestActorRef<BucketStore<T>> testRef = TestActorRef.create(system, props, "testStore");
-        return testRef.underlyingActor();
+    private ActorRef createStore() {
+        return kit.childActorOf(Props.create(TestingBucketStore.class,
+            new RemoteRpcProviderConfig(system.settings().config()), "testStore", new T()));
+    }
+
+    @SuppressWarnings("unchecked")
+    private static Map<Address, Bucket<T>> getBuckets(final ActorRef store) throws Exception {
+        final GetAllBucketsReply<T> result = (GetAllBucketsReply) Await.result(Patterns.ask(store, new GetAllBuckets(),
+                Timeout.apply(1, TimeUnit.SECONDS)), Duration.Inf());
+        return result.getBuckets();
+    }
+
+    @SuppressWarnings("unchecked")
+    private static Map<Address, Long> getVersions(final ActorRef store) throws Exception {
+        return ((GetBucketVersionsReply) Await.result(Patterns.ask(store, new GetBucketVersions(),
+            Timeout.apply(1, TimeUnit.SECONDS)), Duration.Inf())).getVersions();
+    }
+
+    private static final class TestingBucketStore extends BucketStore<T> {
+
+        private final List<ActorRef> toNotify = new ArrayList<>();
+
+        TestingBucketStore(final RemoteRpcProviderConfig config,
+                                  final String persistenceId,
+                                  final T initialData) {
+            super(config, persistenceId, initialData);
+        }
+
+        @Override
+        protected void handleCommand(Object message) throws Exception {
+            if (message instanceof WaitUntilDonePersisting) {
+                handlePersistAsk();
+            } else if (message instanceof SaveSnapshotSuccess) {
+                super.handleCommand(message);
+                handleSnapshotSuccess();
+            } else {
+                super.handleCommand(message);
+            }
+        }
+
+        private void handlePersistAsk() {
+            if (isPersisting()) {
+                toNotify.add(getSender());
+            } else {
+                getSender().tell(new Success(null), noSender());
+            }
+        }
+
+        private void handleSnapshotSuccess() {
+            toNotify.forEach(ref -> ref.tell(new Success(null), noSender()));
+        }
+    }
+
+    /**
+     * Message sent to the TestingBucketStore that replies with success once the actor is done persisting.
+     */
+    private static final class WaitUntilDonePersisting {
+
     }
 }
     }
 }
index 524e69ffe3f45f2a26e07a018a82d522ee21caa8..11ad5eceb72638703ade0672dbca6ac15519a4a2 100644 (file)
@@ -40,6 +40,8 @@ unit-test {
   akka {
     loglevel = "DEBUG"
     #loggers = ["akka.event.slf4j.Slf4jLogger"]
   akka {
     loglevel = "DEBUG"
     #loggers = ["akka.event.slf4j.Slf4jLogger"]
+    persistence.snapshot-store.plugin = "in-memory-snapshot-store"
+    persistence.journal.plugin = "in-memory-journal"
   }
   bounded-mailbox {
     mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
   }
   bounded-mailbox {
     mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
@@ -47,6 +49,17 @@ unit-test {
     mailbox-capacity = 1000
     mailbox-push-timeout-time = 10ms
   }
     mailbox-capacity = 1000
     mailbox-push-timeout-time = 10ms
   }
+
+  in-memory-journal {
+    class = "org.opendaylight.controller.cluster.raft.utils.InMemoryJournal"
+  }
+
+  in-memory-snapshot-store {
+    # Class name of the plugin.
+    class = "org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore"
+    # Dispatcher for the plugin actor.
+    plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
+  }
 }
 
 memberA {
 }
 
 memberA {
@@ -58,6 +71,9 @@ memberA {
   akka {
     loglevel = "INFO"
     loggers = ["akka.event.slf4j.Slf4jLogger"]
   akka {
     loglevel = "INFO"
     loggers = ["akka.event.slf4j.Slf4jLogger"]
+
+    persistence.snapshot-store.plugin = "in-memory-snapshot-store"
+    persistence.journal.plugin = "in-memory-journal"
     actor {
       provider = "akka.cluster.ClusterActorRefProvider"
       debug {
     actor {
       provider = "akka.cluster.ClusterActorRefProvider"
       debug {
@@ -82,6 +98,16 @@ memberA {
       auto-down-unreachable-after = 10s
     }
   }
       auto-down-unreachable-after = 10s
     }
   }
+  in-memory-journal {
+    class = "org.opendaylight.controller.cluster.raft.utils.InMemoryJournal"
+  }
+
+  in-memory-snapshot-store {
+    # Class name of the plugin.
+    class = "org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore"
+    # Dispatcher for the plugin actor.
+    plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
+  }
 }
 memberB {
   bounded-mailbox {
 }
 memberB {
   bounded-mailbox {
@@ -93,6 +119,9 @@ memberB {
     loglevel = "INFO"
     loggers = ["akka.event.slf4j.Slf4jLogger"]
 
     loglevel = "INFO"
     loggers = ["akka.event.slf4j.Slf4jLogger"]
 
+    persistence.snapshot-store.plugin = "in-memory-snapshot-store"
+    persistence.journal.plugin = "in-memory-journal"
+
     actor {
       provider = "akka.cluster.ClusterActorRefProvider"
       debug {
     actor {
       provider = "akka.cluster.ClusterActorRefProvider"
       debug {
@@ -117,6 +146,16 @@ memberB {
       auto-down-unreachable-after = 10s
     }
   }
       auto-down-unreachable-after = 10s
     }
   }
+  in-memory-journal {
+    class = "org.opendaylight.controller.cluster.raft.utils.InMemoryJournal"
+  }
+
+  in-memory-snapshot-store {
+    # Class name of the plugin.
+    class = "org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore"
+    # Dispatcher for the plugin actor.
+    plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
+  }
 }
 memberC {
   bounded-mailbox {
 }
 memberC {
   bounded-mailbox {
@@ -127,6 +166,10 @@ memberC {
   akka {
     loglevel = "INFO"
     loggers = ["akka.event.slf4j.Slf4jLogger"]
   akka {
     loglevel = "INFO"
     loggers = ["akka.event.slf4j.Slf4jLogger"]
+
+    persistence.snapshot-store.plugin = "in-memory-snapshot-store"
+    persistence.journal.plugin = "in-memory-journal"
+
     actor {
       provider = "akka.cluster.ClusterActorRefProvider"
       debug {
     actor {
       provider = "akka.cluster.ClusterActorRefProvider"
       debug {
@@ -151,5 +194,15 @@ memberC {
       auto-down-unreachable-after = 10s
     }
   }
       auto-down-unreachable-after = 10s
     }
   }
+  in-memory-journal {
+    class = "org.opendaylight.controller.cluster.raft.utils.InMemoryJournal"
+  }
+
+  in-memory-snapshot-store {
+    # Class name of the plugin.
+    class = "org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore"
+    # Dispatcher for the plugin actor.
+    plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
+  }
 }
 
 }