BUG 2817 - Basic implementation of RemoveServer in the Raft code 03/29803/3
authorMoiz Raja <moraja@cisco.com>
Mon, 16 Nov 2015 23:56:46 +0000 (15:56 -0800)
committerGerrit Code Review <gerrit@opendaylight.org>
Wed, 18 Nov 2015 01:06:38 +0000 (01:06 +0000)
When a RemoveServer is received it may ask for the removal of
a the current leader or one of the followers. As a first pass
we do not support removal of the current leader. To correctly
implement removal of the leader we would have to implement
leader transition which I intend to build in a future patch.

When a follower is removed the server configuration is changed
immediately on the leader and the new configuration persisted
to the journal. When other followers receive the removed
journal entry they would also remove the server from their
configuration, this is the same as what was done for the
AddServer implementation.

As soon as then new configuration is persisted we respond with
success to the caller. This is the same as for AddServer.

When the ServerConfiguration is complete we send a ServerRemoved
message to the follower which has been removed.

Change-Id: I2b85d82cbeef13cca830e3cc212aebbbcd95c818
Signed-off-by: Moiz Raja <moraja@cisco.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/RemoveServer.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/RemoveServerReply.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/ServerChangeStatus.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/ServerRemoved.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/TestActorFactory.java

index 39f49931a143d68cb0cd27c90154b2fe4bc0f901..207642a7213e637fc8af6539ea9e989c0a75d43e 100644 (file)
@@ -22,7 +22,10 @@ import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
 import org.opendaylight.controller.cluster.raft.messages.AddServer;
 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
 import org.opendaylight.controller.cluster.raft.messages.FollowerCatchUpTimeout;
+import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
+import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
+import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.slf4j.Logger;
@@ -51,14 +54,17 @@ class RaftActorServerConfigurationSupport {
 
     boolean handleMessage(Object message, RaftActor raftActor, ActorRef sender) {
         if(message instanceof AddServer) {
-            onAddServer((AddServer)message, raftActor, sender);
+            onAddServer((AddServer) message, raftActor, sender);
+            return true;
+        } else if(message instanceof RemoveServer) {
+            onRemoveServer((RemoveServer) message, raftActor, sender);
             return true;
         } else if (message instanceof FollowerCatchUpTimeout) {
-            currentOperationState.onFollowerCatchupTimeout(raftActor, (FollowerCatchUpTimeout)message);
+            currentOperationState.onFollowerCatchupTimeout(raftActor, (FollowerCatchUpTimeout) message);
             return true;
         } else if (message instanceof UnInitializedFollowerSnapshotReply) {
             currentOperationState.onUnInitializedFollowerSnapshotReply(raftActor,
-                    (UnInitializedFollowerSnapshotReply)message);
+                    (UnInitializedFollowerSnapshotReply) message);
             return true;
         } else if(message instanceof ApplyState) {
             return onApplyState((ApplyState) message, raftActor);
@@ -70,6 +76,19 @@ class RaftActorServerConfigurationSupport {
         }
     }
 
+    private void onRemoveServer(RemoveServer removeServer, RaftActor raftActor, ActorRef sender) {
+        if(removeServer.getServerId().equals(raftActor.getLeaderId())){
+            // Removing current leader is not supported yet
+            // TODO: To properly support current leader removal we need to first implement transfer of leadership
+            LOG.debug("Cannot remove {} replica because it is the Leader", removeServer.getServerId());
+            sender.tell(new RemoveServerReply(ServerChangeStatus.NOT_SUPPORTED, raftActor.getLeaderId()), raftActor.getSelf());
+        } else if(!raftContext.getPeerIds().contains(removeServer.getServerId())) {
+            sender.tell(new RemoveServerReply(ServerChangeStatus.DOES_NOT_EXIST, raftActor.getLeaderId()), raftActor.getSelf());
+        } else {
+            onNewOperation(raftActor, new RemoveServerContext(removeServer, raftContext.getPeerAddress(removeServer.getServerId()), sender));
+        }
+    }
+
     private boolean onApplyState(ApplyState applyState, RaftActor raftActor) {
         Payload data = applyState.getReplicatedLogEntry().getData();
         if(data instanceof ServerConfigurationPayload) {
@@ -196,6 +215,8 @@ class RaftActorServerConfigurationSupport {
                 sendReply(raftActor, operationContext, replyStatus);
             }
 
+            operationContext.operationComplete(raftActor, replyStatus);
+
             currentOperationState = IDLE;
 
             ServerOperationContext<?> nextOperation = pendingOperationsQueue.poll();
@@ -248,7 +269,7 @@ class RaftActorServerConfigurationSupport {
             // Sanity check - we could get an ApplyState from a previous operation that timed out so make
             // sure it's meant for us.
             if(operationContext.getContextId().equals(applyState.getIdentifier())) {
-                LOG.info("{}: {} has been successfully replicated to a majority of followers",
+                LOG.info("{}: {} has been successfully replicated to a majority of followers", raftActor.getId(),
                         applyState.getReplicatedLogEntry().getData());
 
                 operationComplete(raftActor, operationContext, null);
@@ -295,6 +316,7 @@ class RaftActorServerConfigurationSupport {
             operationComplete(raftActor, getAddServerContext(),
                     isLeader ? ServerChangeStatus.TIMEOUT : ServerChangeStatus.NO_LEADER);
         }
+
     }
 
     /**
@@ -464,6 +486,8 @@ class RaftActorServerConfigurationSupport {
         abstract Object newReply(ServerChangeStatus status, String leaderId);
 
         abstract InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support);
+
+        abstract void operationComplete(RaftActor raftActor, ServerChangeStatus serverChangeStatus);
     }
 
     /**
@@ -483,5 +507,65 @@ class RaftActorServerConfigurationSupport {
         InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support) {
             return support.new InitialAddServerState(this);
         }
+
+        @Override
+        void operationComplete(RaftActor raftActor, ServerChangeStatus serverChangeStatus) {
+
+        }
+    }
+
+    private abstract class RemoveServerState extends AbstractOperationState {
+        private final RemoveServerContext removeServerContext;
+
+
+        protected RemoveServerState(RemoveServerContext removeServerContext) {
+            this.removeServerContext = Preconditions.checkNotNull(removeServerContext);
+
+        }
+
+        public RemoveServerContext getRemoveServerContext() {
+            return removeServerContext;
+        }
+
+    }
+
+    private class InitialRemoveServerState extends RemoveServerState implements InitialOperationState{
+
+        protected InitialRemoveServerState(RemoveServerContext removeServerContext) {
+            super(removeServerContext);
+        }
+
+        @Override
+        public void initiate(RaftActor raftActor) {
+            raftContext.removePeer(getRemoveServerContext().getOperation().getServerId());
+            persistNewServerConfiguration(raftActor, getRemoveServerContext());
+        }
+    }
+
+    private static class RemoveServerContext extends ServerOperationContext<RemoveServer> {
+        private final String peerAddress;
+
+        RemoveServerContext(RemoveServer operation, String peerAddress, ActorRef clientRequestor) {
+            super(operation, clientRequestor);
+            this.peerAddress = peerAddress;
+        }
+
+        @Override
+        Object newReply(ServerChangeStatus status, String leaderId) {
+            return new RemoveServerReply(status, leaderId);
+        }
+
+        @Override
+        InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support) {
+            return support.new InitialRemoveServerState(this);
+        }
+
+        @Override
+        void operationComplete(RaftActor raftActor, ServerChangeStatus serverChangeStatus) {
+            if(peerAddress != null) {
+                raftActor.context().actorSelection(peerAddress).tell(new ServerRemoved(getOperation().getServerId()), raftActor.getSelf());
+            }
+        }
+
     }
 }
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/RemoveServer.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/RemoveServer.java
new file mode 100644 (file)
index 0000000..9f4977e
--- /dev/null
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.messages;
+
+import com.google.common.base.Preconditions;
+import java.io.Serializable;
+
+/**
+ * Message sent to remove a replica (§4.1).
+ */
+public class RemoveServer implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final String serverId;
+
+    public RemoveServer(String serverId) {
+        this.serverId = Preconditions.checkNotNull(serverId);
+    }
+
+    public String getServerId() {
+        return serverId;
+    }
+
+    @Override
+    public String toString() {
+        return "RemoveServer{" + "serverId='" + serverId + '\'' + '}';
+    }
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/RemoveServerReply.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/RemoveServerReply.java
new file mode 100644 (file)
index 0000000..36a48fc
--- /dev/null
@@ -0,0 +1,27 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.messages;
+
+import javax.annotation.Nullable;
+
+/**
+ * Reply to a RemoveServer message (§4.1).
+ */
+public class RemoveServerReply extends AbstractServerChangeReply {
+    private static final long serialVersionUID = 1L;
+
+    public RemoveServerReply(ServerChangeStatus status, @Nullable String leaderHint) {
+        super(status, leaderHint);
+    }
+
+    @Override
+    public String toString() {
+        return "RemoveServerReply{" + "status=" + getStatus() + ", leaderHint='" + getLeaderHint() + '\'' + '}';
+    }
+}
index ca2f35cbf6a7a50542e22cc3e6fc336f17b8fb0e..64a0f66fc2643b4d8c0227483ad2be84b22f860b 100644 (file)
@@ -16,5 +16,8 @@ public enum ServerChangeStatus {
     OK,
     NO_LEADER,
     TIMEOUT,
-    ALREADY_EXISTS
+    ALREADY_EXISTS,
+    DOES_NOT_EXIST,  // Server with the specified address does not exist
+    NOT_SUPPORTED,   // Some types of RemoveServer for example Removing the current Leader may not be
+                     // supported (at least initially)
 }
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/ServerRemoved.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/ServerRemoved.java
new file mode 100644 (file)
index 0000000..fba601c
--- /dev/null
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.messages;
+
+import com.google.common.base.Preconditions;
+import java.io.Serializable;
+
+/**
+ * The ServerRemoved message is sent to a server which has been removed successfully from the ServerConfiguration.
+ * The Server can then choose to self destruct or notify it's parents as needed.
+ */
+public class ServerRemoved implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final String serverId;
+
+    public ServerRemoved(String serverId) {
+        this.serverId = Preconditions.checkNotNull(serverId);
+    }
+
+    public String getServerId() {
+        return serverId;
+    }
+
+    @Override
+    public String toString() {
+        return "ServerRemoved{" +
+                "serverId='" + serverId + '\'' +
+                '}';
+    }
+}
index df526d8c52a0773466175b098c585ecd90646e99..36e6299265ae4128ce30782fde95c19646d7b5eb 100644 (file)
@@ -44,7 +44,10 @@ import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.FollowerCatchUpTimeout;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
+import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
+import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
+import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
@@ -564,7 +567,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
 
         TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
-                MockRaftActor.props(LEADER_ID, ImmutableMap.<String,String>of(FOLLOWER_ID, followerActor.path().toString()),
+                MockRaftActor.props(LEADER_ID, ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
                         configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
                 actorFactory.generateActorId(LEADER_ID));
         noLeaderActor.underlyingActor().waitForInitializeBehaviorComplete();
@@ -625,7 +628,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
                 actorFactory.generateActorId(LEADER_ID));
 
         TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
-                MockRaftActor.props(FOLLOWER_ID, ImmutableMap.<String,String>of(LEADER_ID, leaderActor.path().toString()),
+                MockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
                         configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
                 actorFactory.generateActorId(FOLLOWER_ID));
         followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
@@ -652,6 +655,116 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         assertEquals("Message handled", false, handled);
     }
 
+    @Test
+    public void testRemoveServerWithNoLeader() {
+        DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+        configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+        TestActorRef<MockRaftActor> leaderActor = actorFactory.createTestActor(
+                MockRaftActor.props(LEADER_ID, ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
+                        configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
+                actorFactory.generateActorId(LEADER_ID));
+        leaderActor.underlyingActor().waitForInitializeBehaviorComplete();
+
+        leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
+        RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
+        assertEquals("getStatus", ServerChangeStatus.NO_LEADER, removeServerReply.getStatus());
+    }
+
+    @Test
+    public void testRemoveServerNonExistentServer() {
+        RaftActorContext initialActorContext = new MockRaftActorContext();
+
+        TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
+                MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
+                        initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
+                actorFactory.generateActorId(LEADER_ID));
+
+        leaderActor.tell(new RemoveServer(NEW_SERVER_ID), testKit.getRef());
+        RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
+        assertEquals("getStatus", ServerChangeStatus.DOES_NOT_EXIST, removeServerReply.getStatus());
+    }
+
+    @Test
+    public void testRemoveServerSelf() {
+        RaftActorContext initialActorContext = new MockRaftActorContext();
+
+        TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
+                MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
+                        initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
+                actorFactory.generateActorId(LEADER_ID));
+
+        leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
+        RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
+        assertEquals("getStatus", ServerChangeStatus.NOT_SUPPORTED, removeServerReply.getStatus());
+    }
+
+    @Test
+    public void testRemoveServerForwardToLeader() {
+        DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+        configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+        configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
+
+        RaftActorContext initialActorContext = new MockRaftActorContext();
+
+        TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
+                MockLeaderRaftActor.props(ImmutableMap.<String, String>of(),
+                        initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
+                actorFactory.generateActorId(LEADER_ID));
+
+        TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
+                MockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
+                        configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
+                actorFactory.generateActorId(FOLLOWER_ID));
+
+
+        followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
+                -1, -1, (short) 0), leaderActor);
+
+        followerRaftActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
+        RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
+        assertEquals("getStatus", ServerChangeStatus.NOT_SUPPORTED, removeServerReply.getStatus());
+    }
+
+    @Test
+    public void testRemoveServer() {
+        DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+        configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+        configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
+
+        final String followerActorId = actorFactory.generateActorId(FOLLOWER_ID);
+        final String followerActorPath = actorFactory.createTestActorPath(followerActorId);
+        RaftActorContext initialActorContext = new MockRaftActorContext();
+
+        TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
+                MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActorPath),
+                        initialActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
+                actorFactory.generateActorId(LEADER_ID));
+
+        TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
+
+        TestActorRef<CollectingMockRaftActor> followerRaftActor = actorFactory.createTestActor(
+                CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
+                        configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
+                followerActorId);
+
+        TestActorRef<MessageCollectorActor> collector =
+                actorFactory.createTestActor(MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId("collector"));
+
+        followerRaftActor.underlyingActor().setCollectorActor(collector);
+
+        leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
+        RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
+
+        assertEquals("getStatus", ServerChangeStatus.OK, removeServerReply.getStatus());
+
+        final ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollector, ApplyState.class);
+        assertEquals(0L, applyState.getReplicatedLogEntry().getIndex());
+        verifyServerConfigurationPayloadEntry(leaderActor.underlyingActor().getRaftActorContext().getReplicatedLog(), votingServer(LEADER_ID));
+
+        MessageCollectorActor.expectFirstMatching(collector, ServerRemoved.class);
+    }
+
     private ServerInfo votingServer(String id) {
         return new ServerInfo(id, true);
     }
@@ -716,6 +829,20 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         }
     }
 
+    public static class CollectingMockRaftActor extends AbstractMockRaftActor {
+
+        CollectingMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider, TestActorRef<MessageCollectorActor> collectorActor) {
+            super(id, peerAddresses, config, dataPersistenceProvider, collectorActor);
+        }
+
+        public static Props props(final String id, final Map<String, String> peerAddresses,
+                                  ConfigParams config, DataPersistenceProvider dataPersistenceProvider){
+
+            return Props.create(CollectingMockRaftActor.class, id, peerAddresses, Optional.of(config), dataPersistenceProvider, null);
+        }
+
+    }
+
     public static class MockLeaderRaftActor extends AbstractMockRaftActor {
         public MockLeaderRaftActor(Map<String, String> peerAddresses, ConfigParams config,
                 RaftActorContext fromContext) {
index 79df92b7d4d64280f9795dfa64e0e0cd507d8fc1..4ca018c5e2a29634f37a716fe889367a3b0f789d 100644 (file)
@@ -141,6 +141,10 @@ public class TestActorFactory implements AutoCloseable {
         killActor(actor, kit, true);
     }
 
+    public String createTestActorPath(String actorId){
+        return "akka://test/user/" + actorId;
+    }
+
     private void killActor(ActorRef actor, JavaTestKit kit, boolean remove) {
         LOG.info("Killing actor {}", actor);
         kit.watch(actor);