BUG-7556: update version tracking 22/50622/22
authorRobert Varga <rovarga@cisco.com>
Wed, 18 Jan 2017 15:01:21 +0000 (16:01 +0100)
committerTom Pantelis <tpanteli@brocade.com>
Thu, 9 Feb 2017 12:12:20 +0000 (12:12 +0000)
This patch adds better version tracking, so it does not rely
on calendar time, but rather is monotonically increasing.

The 64bit version field is logically split into an incarnation
number (31 bits) and a version number (32 bits).

Change-Id: Ie0e1f4089cc1ee582037982d9837490348158975
Signed-off-by: Robert Varga <rovarga@cisco.com>
13 files changed:
opendaylight/md-sal/sal-remoterpc-connector/pom.xml
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderConfig.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Bucket.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketImpl.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/LocalBucket.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Messages.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/mbeans/RemoteRpcRegistryMXBeanImpl.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderTest.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreTest.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/resources/application.conf

index ee94c98..23e6e68 100644 (file)
             <groupId>org.opendaylight.yangtools</groupId>
             <artifactId>yang-test-util</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>sal-akka-raft</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
index c1846b8..1d66387 100644 (file)
@@ -24,7 +24,9 @@ public class RemoteRpcProviderConfig extends CommonConfig {
     protected static final String TAG_RPC_REGISTRY_PATH = "rpc-registry-path";
     protected static final String TAG_RPC_MGR_PATH = "rpc-manager-path";
     protected static final String TAG_ASK_DURATION = "ask-duration";
+
     private static final String TAG_GOSSIP_TICK_INTERVAL = "gossip-tick-interval";
+    private static final String TAG_RPC_REGISTRY_PERSISTENCE_ID = "rpc-registry-persistence-id";
 
     //locally cached values
     private Timeout cachedAskDuration;
@@ -56,7 +58,10 @@ public class RemoteRpcProviderConfig extends CommonConfig {
 
     public String getRpcRegistryPath() {
         return get().getString(TAG_RPC_REGISTRY_PATH);
+    }
 
+    public String getRpcRegistryPersistenceId() {
+        return get().getString(TAG_RPC_REGISTRY_PERSISTENCE_ID);
     }
 
     public String getRpcManagerPath() {
@@ -118,6 +123,8 @@ public class RemoteRpcProviderConfig extends CommonConfig {
             configHolder.put(TAG_ASK_DURATION, "15s");
             configHolder.put(TAG_GOSSIP_TICK_INTERVAL, "500ms");
 
+            // persistence
+            configHolder.put(TAG_RPC_REGISTRY_PERSISTENCE_ID, "remote-rpc-registry");
         }
 
         public Builder gossipTickInterval(final String interval) {
index c8415cc..54f7613 100644 (file)
@@ -43,7 +43,7 @@ public class RpcRegistry extends BucketStore<RoutingTable> {
     private final ActorRef rpcRegistrar;
 
     public RpcRegistry(final RemoteRpcProviderConfig config, final ActorRef rpcInvoker, final ActorRef rpcRegistrar) {
-        super(config, new RoutingTable(rpcInvoker));
+        super(config, config.getRpcRegistryPersistenceId(), new RoutingTable(rpcInvoker));
         this.rpcRegistrar = Preconditions.checkNotNull(rpcRegistrar);
     }
 
@@ -61,19 +61,19 @@ public class RpcRegistry extends BucketStore<RoutingTable> {
     }
 
     @Override
-    protected void handleReceive(final Object message) throws Exception {
+    protected void handleCommand(final Object message) throws Exception {
         if (message instanceof AddOrUpdateRoutes) {
             receiveAddRoutes((AddOrUpdateRoutes) message);
         } else if (message instanceof RemoveRoutes) {
             receiveRemoveRoutes((RemoveRoutes) message);
         } else {
-            super.handleReceive(message);
+            super.handleCommand(message);
         }
     }
 
     private void receiveAddRoutes(final AddOrUpdateRoutes msg) {
         LOG.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers());
-        updateLocalBucket(getLocalBucket().getData().addRpcs(msg.getRouteIdentifiers()));
+        updateLocalBucket(getLocalData().addRpcs(msg.getRouteIdentifiers()));
     }
 
     /**
@@ -83,7 +83,7 @@ public class RpcRegistry extends BucketStore<RoutingTable> {
      */
     private void receiveRemoveRoutes(final RemoveRoutes msg) {
         LOG.debug("RemoveRoutes: {}", msg.getRouteIdentifiers());
-        updateLocalBucket(getLocalBucket().getData().removeRpcs(msg.getRouteIdentifiers()));
+        updateLocalBucket(getLocalData().removeRpcs(msg.getRouteIdentifiers()));
     }
 
     @Override
index 4e3e551..faa51b9 100644 (file)
@@ -9,11 +9,12 @@ package org.opendaylight.controller.remote.rpc.registry.gossip;
 
 import akka.actor.ActorRef;
 import java.util.Optional;
+import javax.annotation.Nonnull;
 
 public interface Bucket<T extends BucketData<T>> {
     long getVersion();
 
-    T getData();
+    @Nonnull T getData();
 
     default Optional<ActorRef> getWatchActor() {
         return getData().getWatchActor();
index a927c93..ade614b 100644 (file)
@@ -7,27 +7,23 @@
  */
 package org.opendaylight.controller.remote.rpc.registry.gossip;
 
+import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
 import java.io.Serializable;
 
-public final class BucketImpl<T extends BucketData<T>> implements Bucket<T>, Serializable {
+final class BucketImpl<T extends BucketData<T>> implements Bucket<T>, Serializable {
     private static final long serialVersionUID = 294779770032719196L;
 
-    private Long version = System.currentTimeMillis();
+    // Guaranteed to be non-null.
+    // This is kept a Long for binary compatibility of serialization format.
+    private final Long version;
 
-    private T data;
+    // Guaranteed to be non-null
+    private final T data;
 
-    public BucketImpl(final T data) {
-        this.data = data;
-    }
-
-    public BucketImpl(final Bucket<T> other) {
-        this.version = other.getVersion();
-        this.data = other.getData();
-    }
-
-    public void setData(final T data) {
-        this.data = data;
-        this.version = System.currentTimeMillis() + 1;
+    BucketImpl(final Long version, final T data) {
+        this.version = Preconditions.checkNotNull(version);
+        this.data = Preconditions.checkNotNull(data);
     }
 
     @Override
@@ -44,4 +40,10 @@ public final class BucketImpl<T extends BucketData<T>> implements Bucket<T>, Ser
     public String toString() {
         return "BucketImpl{" + "version=" + version + ", data=" + data + '}';
     }
+
+    private Object readResolve() {
+        Verify.verifyNotNull(version);
+        Verify.verifyNotNull(data);
+        return this;
+    }
 }
index b4af0ad..70f4053 100644 (file)
@@ -8,12 +8,24 @@
 
 package org.opendaylight.controller.remote.rpc.registry.gossip;
 
+import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
+
 import akka.actor.ActorRef;
 import akka.actor.ActorRefProvider;
 import akka.actor.Address;
+import akka.actor.PoisonPill;
 import akka.actor.Terminated;
 import akka.cluster.ClusterActorRefProvider;
+import akka.persistence.DeleteSnapshotsFailure;
+import akka.persistence.DeleteSnapshotsSuccess;
+import akka.persistence.RecoveryCompleted;
+import akka.persistence.SaveSnapshotFailure;
+import akka.persistence.SaveSnapshotSuccess;
+import akka.persistence.SnapshotOffer;
+import akka.persistence.SnapshotSelectionCriteria;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.SetMultimap;
 import java.util.HashMap;
@@ -21,9 +33,8 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.Set;
-import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
@@ -31,7 +42,6 @@ import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketSto
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.RemoveRemoteBucket;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
-import org.opendaylight.controller.utils.ConditionalProbe;
 
 /**
  * A store that syncs its data across nodes in the cluster.
@@ -43,12 +53,7 @@ import org.opendaylight.controller.utils.ConditionalProbe;
  * This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}.
  *
  */
-public class BucketStore<T extends BucketData<T>> extends AbstractUntypedActorWithMetering {
-    /**
-     * Bucket owned by the node.
-     */
-    private final BucketImpl<T> localBucket;
-
+public class BucketStore<T extends BucketData<T>> extends AbstractUntypedPersistentActorWithMetering {
     /**
      * Buckets owned by other known nodes in the cluster.
      */
@@ -65,19 +70,31 @@ public class BucketStore<T extends BucketData<T>> extends AbstractUntypedActorWi
      */
     private final SetMultimap<ActorRef, Address> watchedActors = HashMultimap.create(1, 1);
 
+    private final RemoteRpcProviderConfig config;
+    private final String persistenceId;
+
     /**
      * Cluster address for this node.
      */
     private Address selfAddress;
 
-    // FIXME: should be part of test-specific subclass
-    private ConditionalProbe probe;
-
-    private final RemoteRpcProviderConfig config;
+    /**
+     * Bucket owned by the node. Initialized during recovery (due to incarnation number).
+     */
+    private LocalBucket<T> localBucket;
+    private T initialData;
+    private Integer incarnation;
+    private boolean persisting;
 
-    public BucketStore(final RemoteRpcProviderConfig config, final T initialData) {
+    public BucketStore(final RemoteRpcProviderConfig config, final String persistenceId, final T initialData) {
         this.config = Preconditions.checkNotNull(config);
-        this.localBucket = new BucketImpl<>(initialData);
+        this.initialData = Preconditions.checkNotNull(initialData);
+        this.persistenceId = Preconditions.checkNotNull(persistenceId);
+    }
+
+    @Override
+    public String persistenceId() {
+        return persistenceId;
     }
 
     @Override
@@ -92,9 +109,16 @@ public class BucketStore<T extends BucketData<T>> extends AbstractUntypedActorWi
 
     @SuppressWarnings("unchecked")
     @Override
-    protected void handleReceive(final Object message) throws Exception {
-        if (probe != null) {
-            probe.tell(message, getSelf());
+    protected void handleCommand(final Object message) throws Exception {
+        if (message instanceof GetAllBuckets) {
+            // GetAllBuckets is used only in testing
+            receiveGetAllBuckets();
+            return;
+        }
+
+        if (persisting) {
+            handleSnapshotMessage(message);
+            return;
         }
 
         if (message instanceof GetBucketsByMembers) {
@@ -107,21 +131,57 @@ public class BucketStore<T extends BucketData<T>> extends AbstractUntypedActorWi
             removeBucket(((RemoveRemoteBucket) message).getAddress());
         } else if (message instanceof Terminated) {
             actorTerminated((Terminated) message);
-        } else if (message instanceof GetAllBuckets) {
-            // GetAllBuckets is used only for unit tests.
-            receiveGetAllBuckets();
-        } else if (message instanceof ConditionalProbe) {
-            // The ConditionalProbe is only used for unit tests.
-            LOG.info("Received probe {} {}", getSelf(), message);
-            probe = (ConditionalProbe) message;
-            // Send back any message to tell the caller we got the probe.
-            getSender().tell("Got it", getSelf());
+        } else if (message instanceof DeleteSnapshotsSuccess) {
+            LOG.debug("{}: got command: {}", persistenceId(), message);
+        } else if (message instanceof DeleteSnapshotsFailure) {
+            LOG.warn("{}: failed to delete prior snapshots", persistenceId(),
+                ((DeleteSnapshotsFailure) message).cause());
         } else {
             LOG.debug("Unhandled message [{}]", message);
             unhandled(message);
         }
     }
 
+    private void handleSnapshotMessage(final Object message) {
+        if (message instanceof SaveSnapshotFailure) {
+            LOG.error("{}: failed to persist state", persistenceId(), ((SaveSnapshotFailure) message).cause());
+            persisting = false;
+            self().tell(PoisonPill.getInstance(), ActorRef.noSender());
+        } else if (message instanceof SaveSnapshotSuccess) {
+            LOG.debug("{}: got command: {}", persistenceId(), message);
+            SaveSnapshotSuccess saved = (SaveSnapshotSuccess)message;
+            deleteSnapshots(new SnapshotSelectionCriteria(saved.metadata().sequenceNr(),
+                    saved.metadata().timestamp() - 1, 0L, 0L));
+            persisting = false;
+            unstash();
+        } else {
+            LOG.debug("{}: stashing command {}", persistenceId(), message);
+            stash();
+        }
+    }
+
+    @Override
+    protected void handleRecover(final Object message) throws Exception {
+        if (message instanceof RecoveryCompleted) {
+            if (incarnation != null) {
+                incarnation = incarnation + 1;
+            } else {
+                incarnation = 0;
+            }
+
+            this.localBucket = new LocalBucket<>(incarnation.intValue(), initialData);
+            initialData = null;
+            LOG.debug("{}: persisting new incarnation {}", persistenceId(), incarnation);
+            persisting = true;
+            saveSnapshot(incarnation);
+        } else if (message instanceof SnapshotOffer) {
+            incarnation = (Integer) ((SnapshotOffer)message).snapshot();
+            LOG.debug("{}: recovered incarnation {}", persistenceId(), incarnation);
+        } else {
+            LOG.warn("{}: ignoring recovery message {}", persistenceId(), message);
+        }
+    }
+
     protected RemoteRpcProviderConfig getConfig() {
         return config;
     }
@@ -129,7 +189,8 @@ public class BucketStore<T extends BucketData<T>> extends AbstractUntypedActorWi
     /**
      * Returns all the buckets the this node knows about, self owned + remote.
      */
-    void receiveGetAllBuckets() {
+    @VisibleForTesting
+    protected void receiveGetAllBuckets() {
         final ActorRef sender = getSender();
         sender.tell(new GetAllBucketsReply<>(getAllBuckets()), getSelf());
     }
@@ -143,7 +204,7 @@ public class BucketStore<T extends BucketData<T>> extends AbstractUntypedActorWi
         Map<Address, Bucket<T>> all = new HashMap<>(remoteBuckets.size() + 1);
 
         //first add the local bucket
-        all.put(selfAddress, new BucketImpl<>(localBucket));
+        all.put(selfAddress, getLocalBucket().snapshot());
 
         //then get all remote buckets
         all.putAll(remoteBuckets);
@@ -173,7 +234,7 @@ public class BucketStore<T extends BucketData<T>> extends AbstractUntypedActorWi
 
         //first add the local bucket if asked
         if (members.contains(selfAddress)) {
-            buckets.put(selfAddress, new BucketImpl<>(localBucket));
+            buckets.put(selfAddress, getLocalBucket().snapshot());
         }
 
         //then get buckets for requested remote nodes
@@ -307,13 +368,34 @@ public class BucketStore<T extends BucketData<T>> extends AbstractUntypedActorWi
         // Default noop
     }
 
-    public BucketImpl<T> getLocalBucket() {
+    @VisibleForTesting
+    protected boolean isPersisting() {
+        return persisting;
+    }
+
+    public T getLocalData() {
+        return getLocalBucket().getData();
+    }
+
+    private LocalBucket<T> getLocalBucket() {
+        Preconditions.checkState(localBucket != null, "Attempted to access local bucket before recovery completed");
         return localBucket;
     }
 
     protected void updateLocalBucket(final T data) {
-        localBucket.setData(data);
-        versions.put(selfAddress, localBucket.getVersion());
+        final LocalBucket<T> local = getLocalBucket();
+        final boolean bumpIncarnation = local.setData(data);
+        versions.put(selfAddress, local.getVersion());
+
+        if (bumpIncarnation) {
+            LOG.debug("Version wrapped. incrementing incarnation");
+
+            Verify.verify(incarnation < Integer.MAX_VALUE, "Ran out of incarnations, cannot continue");
+            incarnation = incarnation + 1;
+
+            persisting = true;
+            saveSnapshot(incarnation);
+        }
     }
 
     public Map<Address, Bucket<T>> getRemoteBuckets() {
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/LocalBucket.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/LocalBucket.java
new file mode 100644 (file)
index 0000000..b2c5e0c
--- /dev/null
@@ -0,0 +1,64 @@
+/*
+ * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.registry.gossip;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Local bucket implementation. Unlike a full-blown {@link Bucket}, this class is mutable and tracks when it has been
+ * changed and when it has been sent anywhere.
+ *
+ * @author Robert Varga
+ */
+final class LocalBucket<T extends BucketData<T>> {
+    /*
+     * Decomposed 64bit signed version number. Always non-negative, hence the most significant bit is always zero.
+     * - incarnation number (most-significant 31 bits, forming an unsigned int)
+     * - version number (least-significant 32 bits, treated as unsigned int)
+     *
+     * We are keeping a boxed version here, as we stick it into a map anyway.
+     */
+    private Long version;
+    private T data;
+
+    // We bump versions only if we took a snapshot since last data update
+    private boolean bumpVersion;
+
+    LocalBucket(final int incarnation, final T data) {
+        Preconditions.checkArgument(incarnation >= 0);
+        this.version = ((long)incarnation) << Integer.SIZE;
+        this.data = Preconditions.checkNotNull(data);
+    }
+
+    T getData() {
+        return data;
+    }
+
+    Long getVersion() {
+        return version;
+    }
+
+    Bucket<T> snapshot() {
+        bumpVersion = true;
+        return new BucketImpl<>(version, data);
+    }
+
+    boolean setData(final T data) {
+        this.data = Preconditions.checkNotNull(data);
+        if (bumpVersion) {
+            final long next = version.longValue() + 1;
+            if ((next & 0xffff_ffffL) == 0) {
+                return true;
+            }
+
+            version = next;
+            bumpVersion = false;
+        }
+        return false;
+    }
+}
index 6f81fe8..361f5b7 100644 (file)
@@ -12,6 +12,8 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.ContainsBucketVersions;
@@ -50,7 +52,7 @@ public class Messages {
 
             protected ContainsBuckets(final Map<Address, Bucket<T>> buckets) {
                 Preconditions.checkArgument(buckets != null, "buckets can not be null");
-                this.buckets = ImmutableMap.copyOf(buckets);
+                this.buckets = Collections.unmodifiableMap(new HashMap<>(buckets));
             }
 
             public final Map<Address, Bucket<T>> getBuckets() {
index 5fbd91c..339d4b1 100644 (file)
@@ -45,7 +45,7 @@ public class RemoteRpcRegistryMXBeanImpl extends AbstractMXBean implements Remot
 
     @Override
     public Set<String> getGlobalRpc() {
-        RoutingTable table = rpcRegistry.getLocalBucket().getData();
+        RoutingTable table = rpcRegistry.getLocalData();
         Set<String> globalRpc = new HashSet<>(table.getRoutes().size());
         for (RpcRouter.RouteIdentifier<?, ?, ?> route : table.getRoutes()) {
             if (route.getRoute() == null) {
@@ -59,7 +59,7 @@ public class RemoteRpcRegistryMXBeanImpl extends AbstractMXBean implements Remot
 
     @Override
     public Set<String> getLocalRegisteredRoutedRpc() {
-        RoutingTable table = rpcRegistry.getLocalBucket().getData();
+        RoutingTable table = rpcRegistry.getLocalData();
         Set<String> routedRpc = new HashSet<>(table.getRoutes().size());
         for (RpcRouter.RouteIdentifier<?, ?, ?> route : table.getRoutes()) {
             if (route.getRoute() != null) {
@@ -76,7 +76,7 @@ public class RemoteRpcRegistryMXBeanImpl extends AbstractMXBean implements Remot
 
     @Override
     public Map<String, String> findRpcByName(final String name) {
-        RoutingTable localTable = rpcRegistry.getLocalBucket().getData();
+        RoutingTable localTable = rpcRegistry.getLocalData();
         // Get all RPCs from local bucket
         Map<String, String> rpcMap = new HashMap<>(getRpcMemberMapByName(localTable, name, LOCAL_CONSTANT));
 
@@ -92,8 +92,8 @@ public class RemoteRpcRegistryMXBeanImpl extends AbstractMXBean implements Remot
     }
 
     @Override
-    public Map<String, String> findRpcByRoute(String routeId) {
-        RoutingTable localTable = rpcRegistry.getLocalBucket().getData();
+    public Map<String, String> findRpcByRoute(final String routeId) {
+        RoutingTable localTable = rpcRegistry.getLocalData();
         Map<String, String> rpcMap = new HashMap<>(getRpcMemberMapByRoute(localTable, routeId, LOCAL_CONSTANT));
 
         Map<Address, Bucket<RoutingTable>> buckets = rpcRegistry.getRemoteBuckets();
@@ -109,7 +109,7 @@ public class RemoteRpcRegistryMXBeanImpl extends AbstractMXBean implements Remot
     /**
      * Search if the routing table route String contains routeName.
      */
-    private Map<String,String> getRpcMemberMapByRoute(final RoutingTable table, final String routeName,
+    private static Map<String,String> getRpcMemberMapByRoute(final RoutingTable table, final String routeName,
                                                       final String address) {
         Set<RpcRouter.RouteIdentifier<?, ?, ?>> routes = table.getRoutes();
         Map<String, String> rpcMap = new HashMap<>(routes.size());
@@ -130,7 +130,7 @@ public class RemoteRpcRegistryMXBeanImpl extends AbstractMXBean implements Remot
     /**
      * Search if the routing table route type contains name.
      */
-    private Map<String, String>  getRpcMemberMapByName(final RoutingTable table, final String name,
+    private static Map<String, String>  getRpcMemberMapByName(final RoutingTable table, final String name,
                                                        final String address) {
         Set<RpcRouter.RouteIdentifier<?, ?, ?>> routes = table.getRoutes();
         Map<String, String> rpcMap = new HashMap<>(routes.size());
index 06781b4..76e63c4 100644 (file)
@@ -15,6 +15,7 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.testkit.JavaTestKit;
 import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 import java.util.concurrent.TimeUnit;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -32,7 +33,8 @@ public class RemoteRpcProviderTest {
 
     @BeforeClass
     public static void setup() throws InterruptedException {
-        moduleConfig = new RemoteRpcProviderConfig.Builder("odl-cluster-rpc").build();
+        moduleConfig = new RemoteRpcProviderConfig.Builder("odl-cluster-rpc")
+                .withConfigReader(ConfigFactory::load).build();
         final Config config = moduleConfig.get();
         system = ActorSystem.create("odl-cluster-rpc", config);
 
index 3ceb5c0..0c3a57e 100644 (file)
@@ -80,7 +80,7 @@ public class RpcRegistryTest {
 
     @BeforeClass
     public static void staticSetup() throws InterruptedException {
-        AkkaConfigurationReader reader = () -> ConfigFactory.load();
+        AkkaConfigurationReader reader = ConfigFactory::load;
 
         RemoteRpcProviderConfig config1 = new RemoteRpcProviderConfig.Builder("memberA").gossipTickInterval("200ms")
                 .withConfigReader(reader).build();
index 4e3961a..59dd29b 100644 (file)
@@ -7,22 +7,39 @@
  */
 package org.opendaylight.controller.remote.rpc.registry.gossip;
 
+import static akka.actor.ActorRef.noSender;
+
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.Address;
 import akka.actor.Props;
+import akka.actor.Status.Success;
+import akka.pattern.Patterns;
+import akka.persistence.SaveSnapshotSuccess;
 import akka.testkit.JavaTestKit;
-import akka.testkit.TestActorRef;
+import akka.util.Timeout;
 import com.typesafe.config.ConfigFactory;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
 import org.opendaylight.controller.remote.rpc.TerminationMonitor;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
+import scala.concurrent.Await;
+import scala.concurrent.duration.Duration;
 
 public class BucketStoreTest {
 
@@ -41,6 +58,8 @@ public class BucketStoreTest {
 
     private static ActorSystem system;
 
+    private JavaTestKit kit;
+
     @BeforeClass
     public static void setup() {
         system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("unit-test"));
@@ -52,24 +71,33 @@ public class BucketStoreTest {
         JavaTestKit.shutdownActorSystem(system);
     }
 
+    @Before
+    public void before() {
+        kit = new JavaTestKit(system);
+    }
+
+    @After
+    public void after() {
+        kit.shutdown(system);
+    }
+
     /**
      * Given remote buckets, should merge with local copy of remote buckets.
      */
     @Test
-    public void testReceiveUpdateRemoteBuckets() {
-
-        final BucketStore<T> store = createStore();
+    public void testReceiveUpdateRemoteBuckets() throws Exception {
 
+        final ActorRef store = createStore();
         Address localAddress = system.provider().getDefaultAddress();
-        Bucket<T> localBucket = new BucketImpl<>(new T());
+        Bucket<T> localBucket = new BucketImpl<>(0L, new T());
 
         Address a1 = new Address("tcp", "system1");
         Address a2 = new Address("tcp", "system2");
         Address a3 = new Address("tcp", "system3");
 
-        Bucket<T> b1 = new BucketImpl<>(new T());
-        Bucket<T> b2 = new BucketImpl<>(new T());
-        Bucket<T> b3 = new BucketImpl<>(new T());
+        Bucket<T> b1 = new BucketImpl<>(0L, new T());
+        Bucket<T> b2 = new BucketImpl<>(0L, new T());
+        Bucket<T> b3 = new BucketImpl<>(0L, new T());
 
         Map<Address, Bucket<T>> remoteBuckets = new HashMap<>(3);
         remoteBuckets.put(a1, b1);
@@ -77,38 +105,40 @@ public class BucketStoreTest {
         remoteBuckets.put(a3, b3);
         remoteBuckets.put(localAddress, localBucket);
 
+        Await.result(Patterns.ask(store, new WaitUntilDonePersisting(),
+                Timeout.apply(5, TimeUnit.SECONDS)), Duration.Inf());
+
         //Given remote buckets
-        store.receiveUpdateRemoteBuckets(remoteBuckets);
+        store.tell(new UpdateRemoteBuckets<>(remoteBuckets), noSender());
 
-        //Should NOT contain local bucket
-        //Should contain ONLY 3 entries i.e a1, a2, a3
-        Map<Address, Bucket<T>> remoteBucketsInStore = store.getRemoteBuckets();
-        Assert.assertFalse("remote buckets contains local bucket", remoteBucketsInStore.containsKey(localAddress));
-        Assert.assertTrue(remoteBucketsInStore.size() == 3);
+        //Should contain local bucket
+        //Should contain 4 entries i.e a1, a2, a3, local
+        Map<Address, Bucket<T>> remoteBucketsInStore = getBuckets(store);
+        Assert.assertTrue(remoteBucketsInStore.size() == 4);
 
         //Add a new remote bucket
         Address a4 = new Address("tcp", "system4");
-        Bucket<T> b4 = new BucketImpl<>(new T());
+        Bucket<T> b4 = new BucketImpl<>(0L, new T());
         remoteBuckets.clear();
         remoteBuckets.put(a4, b4);
-        store.receiveUpdateRemoteBuckets(remoteBuckets);
+        store.tell(new UpdateRemoteBuckets<>(remoteBuckets), noSender());
 
         //Should contain a4
-        //Should contain 4 entries now i.e a1, a2, a3, a4
-        remoteBucketsInStore = store.getRemoteBuckets();
+        //Should contain 5 entries now i.e a1, a2, a3, a4, local
+        remoteBucketsInStore = getBuckets(store);
         Assert.assertTrue("Does not contain a4", remoteBucketsInStore.containsKey(a4));
-        Assert.assertTrue(remoteBucketsInStore.size() == 4);
+        Assert.assertTrue(remoteBucketsInStore.size() == 5);
 
         //Update a bucket
-        Bucket<T> b3New = new BucketImpl<>(new T());
+        Bucket<T> b3New = new BucketImpl<>(0L, new T());
         remoteBuckets.clear();
         remoteBuckets.put(a3, b3New);
         remoteBuckets.put(a1, null);
         remoteBuckets.put(a2, null);
-        store.receiveUpdateRemoteBuckets(remoteBuckets);
+        store.tell(new UpdateRemoteBuckets<>(remoteBuckets), noSender());
 
         //Should only update a3
-        remoteBucketsInStore = store.getRemoteBuckets();
+        remoteBucketsInStore = getBuckets(store);
         Bucket<T> b3InStore = remoteBucketsInStore.get(a3);
         Assert.assertEquals(b3New.getVersion(), b3InStore.getVersion());
 
@@ -117,11 +147,11 @@ public class BucketStoreTest {
         Bucket<T> b2InStore = remoteBucketsInStore.get(a2);
         Assert.assertEquals(b1.getVersion(), b1InStore.getVersion());
         Assert.assertEquals(b2.getVersion(), b2InStore.getVersion());
-        Assert.assertTrue(remoteBucketsInStore.size() == 4);
+        Assert.assertTrue(remoteBucketsInStore.size() == 5);
 
         //Should update versions map
         //versions map contains versions for all remote buckets (4).
-        Map<Address, Long> versionsInStore = store.getVersions();
+        Map<Address, Long> versionsInStore = getVersions(store);
         Assert.assertEquals(4, versionsInStore.size());
         Assert.assertEquals((Long)b1.getVersion(), versionsInStore.get(a1));
         Assert.assertEquals((Long)b2.getVersion(), versionsInStore.get(a2));
@@ -131,13 +161,12 @@ public class BucketStoreTest {
         //Send older version of bucket
         remoteBuckets.clear();
         remoteBuckets.put(a3, b3);
-        store.receiveUpdateRemoteBuckets(remoteBuckets);
+        store.tell(new UpdateRemoteBuckets<>(remoteBuckets), noSender());
 
         //Should NOT update a3
-        remoteBucketsInStore = store.getRemoteBuckets();
+        remoteBucketsInStore = getBuckets(store);
         b3InStore = remoteBucketsInStore.get(a3);
         Assert.assertEquals(b3InStore.getVersion(), b3New.getVersion());
-
     }
 
     /**
@@ -145,10 +174,63 @@ public class BucketStoreTest {
      *
      * @return instance of BucketStore class
      */
-    private static BucketStore<T> createStore() {
-        final Props props = Props.create(BucketStore.class, new RemoteRpcProviderConfig(system.settings().config()),
-            new T());
-        final TestActorRef<BucketStore<T>> testRef = TestActorRef.create(system, props, "testStore");
-        return testRef.underlyingActor();
+    private ActorRef createStore() {
+        return kit.childActorOf(Props.create(TestingBucketStore.class,
+            new RemoteRpcProviderConfig(system.settings().config()), "testStore", new T()));
+    }
+
+    @SuppressWarnings("unchecked")
+    private static Map<Address, Bucket<T>> getBuckets(final ActorRef store) throws Exception {
+        final GetAllBucketsReply<T> result = (GetAllBucketsReply) Await.result(Patterns.ask(store, new GetAllBuckets(),
+                Timeout.apply(1, TimeUnit.SECONDS)), Duration.Inf());
+        return result.getBuckets();
+    }
+
+    @SuppressWarnings("unchecked")
+    private static Map<Address, Long> getVersions(final ActorRef store) throws Exception {
+        return ((GetBucketVersionsReply) Await.result(Patterns.ask(store, new GetBucketVersions(),
+            Timeout.apply(1, TimeUnit.SECONDS)), Duration.Inf())).getVersions();
+    }
+
+    private static final class TestingBucketStore extends BucketStore<T> {
+
+        private final List<ActorRef> toNotify = new ArrayList<>();
+
+        TestingBucketStore(final RemoteRpcProviderConfig config,
+                                  final String persistenceId,
+                                  final T initialData) {
+            super(config, persistenceId, initialData);
+        }
+
+        @Override
+        protected void handleCommand(Object message) throws Exception {
+            if (message instanceof WaitUntilDonePersisting) {
+                handlePersistAsk();
+            } else if (message instanceof SaveSnapshotSuccess) {
+                super.handleCommand(message);
+                handleSnapshotSuccess();
+            } else {
+                super.handleCommand(message);
+            }
+        }
+
+        private void handlePersistAsk() {
+            if (isPersisting()) {
+                toNotify.add(getSender());
+            } else {
+                getSender().tell(new Success(null), noSender());
+            }
+        }
+
+        private void handleSnapshotSuccess() {
+            toNotify.forEach(ref -> ref.tell(new Success(null), noSender()));
+        }
+    }
+
+    /**
+     * Message sent to the TestingBucketStore that replies with success once the actor is done persisting.
+     */
+    private static final class WaitUntilDonePersisting {
+
     }
 }
index 524e69f..11ad5ec 100644 (file)
@@ -40,6 +40,8 @@ unit-test {
   akka {
     loglevel = "DEBUG"
     #loggers = ["akka.event.slf4j.Slf4jLogger"]
+    persistence.snapshot-store.plugin = "in-memory-snapshot-store"
+    persistence.journal.plugin = "in-memory-journal"
   }
   bounded-mailbox {
     mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
@@ -47,6 +49,17 @@ unit-test {
     mailbox-capacity = 1000
     mailbox-push-timeout-time = 10ms
   }
+
+  in-memory-journal {
+    class = "org.opendaylight.controller.cluster.raft.utils.InMemoryJournal"
+  }
+
+  in-memory-snapshot-store {
+    # Class name of the plugin.
+    class = "org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore"
+    # Dispatcher for the plugin actor.
+    plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
+  }
 }
 
 memberA {
@@ -58,6 +71,9 @@ memberA {
   akka {
     loglevel = "INFO"
     loggers = ["akka.event.slf4j.Slf4jLogger"]
+
+    persistence.snapshot-store.plugin = "in-memory-snapshot-store"
+    persistence.journal.plugin = "in-memory-journal"
     actor {
       provider = "akka.cluster.ClusterActorRefProvider"
       debug {
@@ -82,6 +98,16 @@ memberA {
       auto-down-unreachable-after = 10s
     }
   }
+  in-memory-journal {
+    class = "org.opendaylight.controller.cluster.raft.utils.InMemoryJournal"
+  }
+
+  in-memory-snapshot-store {
+    # Class name of the plugin.
+    class = "org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore"
+    # Dispatcher for the plugin actor.
+    plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
+  }
 }
 memberB {
   bounded-mailbox {
@@ -93,6 +119,9 @@ memberB {
     loglevel = "INFO"
     loggers = ["akka.event.slf4j.Slf4jLogger"]
 
+    persistence.snapshot-store.plugin = "in-memory-snapshot-store"
+    persistence.journal.plugin = "in-memory-journal"
+
     actor {
       provider = "akka.cluster.ClusterActorRefProvider"
       debug {
@@ -117,6 +146,16 @@ memberB {
       auto-down-unreachable-after = 10s
     }
   }
+  in-memory-journal {
+    class = "org.opendaylight.controller.cluster.raft.utils.InMemoryJournal"
+  }
+
+  in-memory-snapshot-store {
+    # Class name of the plugin.
+    class = "org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore"
+    # Dispatcher for the plugin actor.
+    plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
+  }
 }
 memberC {
   bounded-mailbox {
@@ -127,6 +166,10 @@ memberC {
   akka {
     loglevel = "INFO"
     loggers = ["akka.event.slf4j.Slf4jLogger"]
+
+    persistence.snapshot-store.plugin = "in-memory-snapshot-store"
+    persistence.journal.plugin = "in-memory-journal"
+
     actor {
       provider = "akka.cluster.ClusterActorRefProvider"
       debug {
@@ -151,5 +194,15 @@ memberC {
       auto-down-unreachable-after = 10s
     }
   }
+  in-memory-journal {
+    class = "org.opendaylight.controller.cluster.raft.utils.InMemoryJournal"
+  }
+
+  in-memory-snapshot-store {
+    # Class name of the plugin.
+    class = "org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore"
+    # Dispatcher for the plugin actor.
+    plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
+  }
 }
 

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.