Merge "Refactor snapshot code"
authorTom Pantelis <tpanteli@brocade.com>
Thu, 26 Mar 2015 19:22:37 +0000 (19:22 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Thu, 26 Mar 2015 19:22:37 +0000 (19:22 +0000)
21 files changed:
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ConcurrentDOMDataBroker.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ActorNotInitialized.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/FindPrimary.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PrimaryFound.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PrimaryNotFound.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListener.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListenerReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ConcurrentDOMDataBrokerTest.java [moved from opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DOMConcurrentDataCommitCoordinatorTest.java with 99% similarity]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListenerReplyTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListenerTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockClusterWrapper.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockConfiguration.java
opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf

index 886c473..538f298 100644 (file)
@@ -15,12 +15,12 @@ import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
@@ -91,8 +91,9 @@ public class ConcurrentDOMDataBroker extends AbstractDOMDataBroker {
 
         final long startTime = System.nanoTime();
 
+        final Iterator<DOMStoreThreePhaseCommitCohort> cohortIterator = cohorts.iterator();
+
         // Not using Futures.allAsList here to avoid its internal overhead.
-        final AtomicInteger remaining = new AtomicInteger(cohorts.size());
         FutureCallback<Boolean> futureCallback = new FutureCallback<Boolean>() {
             @Override
             public void onSuccess(Boolean result) {
@@ -102,9 +103,12 @@ public class ConcurrentDOMDataBroker extends AbstractDOMDataBroker {
                             new TransactionCommitFailedException(
                                             "Can Commit failed, no detailed cause available."));
                 } else {
-                    if(remaining.decrementAndGet() == 0) {
+                    if(!cohortIterator.hasNext()) {
                         // All cohorts completed successfully - we can move on to the preCommit phase
                         doPreCommit(startTime, clientSubmitFuture, transaction, cohorts);
+                    } else {
+                        ListenableFuture<Boolean> canCommitFuture = cohortIterator.next().canCommit();
+                        Futures.addCallback(canCommitFuture, this, internalFutureCallbackExecutor);
                     }
                 }
             }
@@ -116,24 +120,26 @@ public class ConcurrentDOMDataBroker extends AbstractDOMDataBroker {
             }
         };
 
-        for(DOMStoreThreePhaseCommitCohort cohort: cohorts) {
-            ListenableFuture<Boolean> canCommitFuture = cohort.canCommit();
-            Futures.addCallback(canCommitFuture, futureCallback, internalFutureCallbackExecutor);
-        }
+        ListenableFuture<Boolean> canCommitFuture = cohortIterator.next().canCommit();
+        Futures.addCallback(canCommitFuture, futureCallback, internalFutureCallbackExecutor);
     }
 
     private void doPreCommit(final long startTime, final AsyncNotifyingSettableFuture clientSubmitFuture,
             final DOMDataWriteTransaction transaction,
             final Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
 
+        final Iterator<DOMStoreThreePhaseCommitCohort> cohortIterator = cohorts.iterator();
+
         // Not using Futures.allAsList here to avoid its internal overhead.
-        final AtomicInteger remaining = new AtomicInteger(cohorts.size());
         FutureCallback<Void> futureCallback = new FutureCallback<Void>() {
             @Override
             public void onSuccess(Void notUsed) {
-                if(remaining.decrementAndGet() == 0) {
+                if(!cohortIterator.hasNext()) {
                     // All cohorts completed successfully - we can move on to the commit phase
                     doCommit(startTime, clientSubmitFuture, transaction, cohorts);
+                } else {
+                    ListenableFuture<Void> preCommitFuture = cohortIterator.next().preCommit();
+                    Futures.addCallback(preCommitFuture, this, internalFutureCallbackExecutor);
                 }
             }
 
@@ -144,26 +150,28 @@ public class ConcurrentDOMDataBroker extends AbstractDOMDataBroker {
             }
         };
 
-        for(DOMStoreThreePhaseCommitCohort cohort: cohorts) {
-            ListenableFuture<Void> preCommitFuture = cohort.preCommit();
-            Futures.addCallback(preCommitFuture, futureCallback, internalFutureCallbackExecutor);
-        }
+        ListenableFuture<Void> preCommitFuture = cohortIterator.next().preCommit();
+        Futures.addCallback(preCommitFuture, futureCallback, internalFutureCallbackExecutor);
     }
 
     private void doCommit(final long startTime, final AsyncNotifyingSettableFuture clientSubmitFuture,
             final DOMDataWriteTransaction transaction,
             final Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
 
+        final Iterator<DOMStoreThreePhaseCommitCohort> cohortIterator = cohorts.iterator();
+
         // Not using Futures.allAsList here to avoid its internal overhead.
-        final AtomicInteger remaining = new AtomicInteger(cohorts.size());
         FutureCallback<Void> futureCallback = new FutureCallback<Void>() {
             @Override
             public void onSuccess(Void notUsed) {
-                if(remaining.decrementAndGet() == 0) {
+                if(!cohortIterator.hasNext()) {
                     // All cohorts completed successfully - we're done.
                     commitStatsTracker.addDuration(System.nanoTime() - startTime);
 
                     clientSubmitFuture.set();
+                } else {
+                    ListenableFuture<Void> commitFuture = cohortIterator.next().commit();
+                    Futures.addCallback(commitFuture, this, internalFutureCallbackExecutor);
                 }
             }
 
@@ -174,10 +182,8 @@ public class ConcurrentDOMDataBroker extends AbstractDOMDataBroker {
             }
         };
 
-        for(DOMStoreThreePhaseCommitCohort cohort: cohorts) {
-            ListenableFuture<Void> commitFuture = cohort.commit();
-            Futures.addCallback(commitFuture, futureCallback, internalFutureCallbackExecutor);
-        }
+        ListenableFuture<Void> commitFuture = cohortIterator.next().commit();
+        Futures.addCallback(commitFuture, futureCallback, internalFutureCallbackExecutor);
     }
 
     private void handleException(final AsyncNotifyingSettableFuture clientSubmitFuture,
index 681132e..8ac424a 100644 (file)
@@ -116,7 +116,7 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration
             DataChangeScope scope) {
 
         Future<Object> future = actorContext.executeOperationAsync(shard,
-                new RegisterChangeListener(path, dataChangeListenerActor.path(), scope),
+                new RegisterChangeListener(path, dataChangeListenerActor, scope),
                 actorContext.getDatastoreContext().getShardInitializationTimeout());
 
         future.onComplete(new OnComplete<Object>(){
index 66467af..a30b6f7 100644 (file)
@@ -677,7 +677,7 @@ public class Shard extends RaftActor {
         LOG.debug("{}: registerDataChangeListener sending reply, listenerRegistrationPath = {} ",
                 persistenceId(), listenerRegistration.path());
 
-        getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()), getSelf());
+        getSender().tell(new RegisterChangeListenerReply(listenerRegistration), getSelf());
     }
 
     private ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
index bc4c825..55a86ce 100644 (file)
@@ -43,19 +43,19 @@ import java.util.concurrent.CountDownLatch;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
+import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
+import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
-import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
@@ -96,6 +96,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     // A data store could be of type config/operational
     private final String type;
 
+    private final String shardManagerIdentifierString;
+
     private final ClusterWrapper cluster;
 
     private final Configuration configuration;
@@ -122,6 +124,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         this.datastoreContext = datastoreContext;
         this.dataPersistenceProvider = createDataPersistenceProvider(datastoreContext.isPersistent());
         this.type = datastoreContext.getDataStoreType();
+        this.shardManagerIdentifierString = ShardManagerIdentifier.builder().type(type).build().toString();
         this.shardDispatcherPath =
                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
         this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
@@ -158,8 +161,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     @Override
     public void handleCommand(Object message) throws Exception {
-        if (FindPrimary.SERIALIZABLE_CLASS.isInstance(message)) {
-            findPrimary(FindPrimary.fromSerializable(message));
+        if (message  instanceof FindPrimary) {
+            findPrimary((FindPrimary)message);
         } else if(message instanceof FindLocalShard){
             findLocalShard((FindLocalShard) message);
         } else if (message instanceof UpdateSchemaContext) {
@@ -203,13 +206,15 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         ShardInformation shardInfo = message.getShardInfo();
 
         LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(),
-                shardInfo.getShardId());
+                shardInfo.getShardName());
 
         shardInfo.removeOnShardInitialized(message.getOnShardInitialized());
 
         if(!shardInfo.isShardInitialized()) {
-            message.getSender().tell(new ActorNotInitialized(), getSelf());
+            LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName());
+            message.getSender().tell(createNotInitializedException(shardInfo.shardId), getSelf());
         } else {
+            LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName());
             message.getSender().tell(createNoShardLeaderException(shardInfo.shardId), getSelf());
         }
     }
@@ -297,7 +302,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     private void markShardAsInitialized(String shardName) {
-        LOG.debug("Initializing shard [{}]", shardName);
+        LOG.debug("{}: Initializing shard [{}]", persistenceId(), shardName);
 
         ShardInformation shardInformation = localShards.get(shardName);
         if (shardInformation != null) {
@@ -367,6 +372,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
                 shardInformation.addOnShardInitialized(onShardInitialized);
 
+                LOG.debug("{}: Scheduling timer to wait for shard {}", persistenceId(), shardInformation.getShardName());
+
                 Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
                         datastoreContext.getShardInitializationTimeout().duration(), getSelf(),
                         new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender),
@@ -375,8 +382,12 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                 onShardInitialized.setTimeoutSchedule(timeoutSchedule);
 
             } else if (!shardInformation.isShardInitialized()) {
-                getSender().tell(new ActorNotInitialized(), getSelf());
+                LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(),
+                        shardInformation.getShardName());
+                getSender().tell(createNotInitializedException(shardInformation.shardId), getSelf());
             } else {
+                LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(),
+                        shardInformation.getShardName());
                 getSender().tell(createNoShardLeaderException(shardInformation.shardId), getSelf());
             }
 
@@ -392,13 +403,26 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                 "recovering and a leader is being elected. Try again later.", shardId));
     }
 
+    private NotInitializedException createNotInitializedException(ShardIdentifier shardId) {
+        return new NotInitializedException(String.format(
+                "Found primary shard %s but it's not initialized yet. Please try again later", shardId));
+    }
+
     private void memberRemoved(ClusterEvent.MemberRemoved message) {
+        String memberName = message.member().roles().head();
+
+        LOG.debug("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
+                message.member().address());
+
         memberNameToAddress.remove(message.member().roles().head());
     }
 
     private void memberUp(ClusterEvent.MemberUp message) {
         String memberName = message.member().roles().head();
 
+        LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
+                message.member().address());
+
         memberNameToAddress.put(memberName, message.member().address());
 
         for(ShardInformation info : localShards.values()){
@@ -461,6 +485,11 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     }
 
+    @VisibleForTesting
+    protected ClusterWrapper getCluster() {
+        return cluster;
+    }
+
     @VisibleForTesting
     protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) {
         return getContext().actorOf(Shard.props(info.getShardId(),
@@ -469,6 +498,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     private void findPrimary(FindPrimary message) {
+        LOG.debug("{}: In findPrimary: {}", persistenceId(), message);
+
         final String shardName = message.getShardName();
 
         // First see if the there is a local replica for the shard
@@ -477,10 +508,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             sendResponse(info, message.isWaitUntilReady(), true, new Supplier<Object>() {
                 @Override
                 public Object get() {
-                    Object found = new PrimaryFound(info.getSerializedLeaderActor()).toSerializable();
+                    Object found = new PrimaryFound(info.getSerializedLeaderActor());
 
                     if(LOG.isDebugEnabled()) {
-                        LOG.debug("{}: Found primary for {}: {}", shardName, found);
+                        LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
                     }
 
                     return found;
@@ -490,38 +521,35 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return;
         }
 
-        List<String> members = configuration.getMembersFromShardName(shardName);
+        for(Map.Entry<String, Address> entry: memberNameToAddress.entrySet()) {
+            if(!cluster.getCurrentMemberName().equals(entry.getKey())) {
+                String path = getShardManagerActorPathBuilder(entry.getValue()).toString();
 
-        if(cluster.getCurrentMemberName() != null) {
-            members.remove(cluster.getCurrentMemberName());
-        }
+                LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}", persistenceId(),
+                        shardName, path);
 
-        /**
-         * FIXME: Instead of sending remote shard actor path back to sender,
-         * forward FindPrimary message to remote shard manager
-         */
-        // There is no way for us to figure out the primary (for now) so assume
-        // that one of the remote nodes is a primary
-        for(String memberName : members) {
-            Address address = memberNameToAddress.get(memberName);
-            if(address != null){
-                String path =
-                    getShardActorPath(shardName, memberName);
-                getSender().tell(new PrimaryFound(path).toSerializable(), getSelf());
+                getContext().actorSelection(path).forward(message, getContext());
                 return;
             }
         }
-        getSender().tell(new PrimaryNotFound(shardName).toSerializable(), getSelf());
+
+        LOG.debug("{}: No shard found for {}", persistenceId(), shardName);
+
+        getSender().tell(new PrimaryNotFoundException(
+                String.format("No primary shard found for %s.", shardName)), getSelf());
+    }
+
+    private StringBuilder getShardManagerActorPathBuilder(Address address) {
+        StringBuilder builder = new StringBuilder();
+        builder.append(address.toString()).append("/user/").append(shardManagerIdentifierString);
+        return builder;
     }
 
     private String getShardActorPath(String shardName, String memberName) {
         Address address = memberNameToAddress.get(memberName);
         if(address != null) {
-            StringBuilder builder = new StringBuilder();
-            builder.append(address.toString())
-                .append("/user/")
-                .append(ShardManagerIdentifier.builder().type(type).build().toString())
-                .append("/")
+            StringBuilder builder = getShardManagerActorPathBuilder(address);
+            builder.append("/")
                 .append(getShardIdentifier(memberName, shardName));
             return builder.toString();
         }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ActorNotInitialized.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ActorNotInitialized.java
deleted file mode 100644 (file)
index 576010f..0000000
+++ /dev/null
@@ -1,14 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore.messages;
-
-import java.io.Serializable;
-
-public class ActorNotInitialized implements Serializable {
-    private static final long serialVersionUID = 1L;
-}
index d51d680..2c18eaa 100644 (file)
@@ -9,13 +9,14 @@
 package org.opendaylight.controller.cluster.datastore.messages;
 
 import com.google.common.base.Preconditions;
+import java.io.Serializable;
 
 /**
  * The FindPrimary message is used to locate the primary of any given shard
  *
  */
-public class FindPrimary implements SerializableMessage{
-    public static final Class<FindPrimary> SERIALIZABLE_CLASS = FindPrimary.class;
+public class FindPrimary implements Serializable {
+    private static final long serialVersionUID = 1L;
 
     private final String shardName;
     private final boolean waitUntilReady;
@@ -36,15 +37,6 @@ public class FindPrimary implements SerializableMessage{
         return waitUntilReady;
     }
 
-    @Override
-    public Object toSerializable() {
-        return this;
-    }
-
-    public static FindPrimary fromSerializable(Object message){
-        return (FindPrimary) message;
-    }
-
     @Override
     public String toString() {
         StringBuilder builder = new StringBuilder();
index a556502..4c154d4 100644 (file)
@@ -8,56 +8,48 @@
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
+import java.io.Serializable;
 
-public class PrimaryFound implements SerializableMessage {
-  public static final Class<PrimaryFound> SERIALIZABLE_CLASS = PrimaryFound.class;
-  private final String primaryPath;
+public class PrimaryFound implements Serializable {
+    private static final long serialVersionUID = 1L;
 
-  public PrimaryFound(final String primaryPath) {
-    this.primaryPath = primaryPath;
-  }
+    private final String primaryPath;
 
-  public String getPrimaryPath() {
-    return primaryPath;
-  }
-
-  @Override
-  public boolean equals(final Object o) {
-    if (this == o) {
-        return true;
+    public PrimaryFound(final String primaryPath) {
+        this.primaryPath = primaryPath;
     }
-    if (o == null || getClass() != o.getClass()) {
-        return false;
-    }
-
-    PrimaryFound that = (PrimaryFound) o;
 
-    if (!primaryPath.equals(that.primaryPath)) {
-        return false;
+    public String getPrimaryPath() {
+        return primaryPath;
     }
 
-    return true;
-  }
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
 
-  @Override
-  public int hashCode() {
-    return primaryPath.hashCode();
-  }
+        PrimaryFound that = (PrimaryFound) o;
 
-  @Override
-  public String toString() {
-    return "PrimaryFound{" +
-            "primaryPath='" + primaryPath + '\'' +
-            '}';
-  }
+        if (!primaryPath.equals(that.primaryPath)) {
+            return false;
+        }
 
+        return true;
+    }
 
-  @Override
-  public Object toSerializable() {
-    return  this;
-  }
+    @Override
+    public int hashCode() {
+        return primaryPath.hashCode();
+    }
 
-  public static PrimaryFound fromSerializable(final Object message){
-    return (PrimaryFound) message;
-  }
+    @Override
+    public String toString() {
+        StringBuilder builder = new StringBuilder();
+        builder.append("PrimaryFound [primaryPath=").append(primaryPath).append("]");
+        return builder.toString();
+    }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PrimaryNotFound.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PrimaryNotFound.java
deleted file mode 100644 (file)
index b47c91b..0000000
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore.messages;
-
-import com.google.common.base.Preconditions;
-
-public class PrimaryNotFound implements SerializableMessage {
-  public static final Class<PrimaryNotFound> SERIALIZABLE_CLASS = PrimaryNotFound.class;
-
-    private final String shardName;
-
-    public PrimaryNotFound(final String shardName){
-
-        Preconditions.checkNotNull(shardName, "shardName should not be null");
-
-        this.shardName = shardName;
-    }
-
-    @Override
-    public boolean equals(final Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-
-        PrimaryNotFound that = (PrimaryNotFound) o;
-
-        if (shardName != null ? !shardName.equals(that.shardName) : that.shardName != null) {
-            return false;
-        }
-
-        return true;
-    }
-
-    @Override
-    public int hashCode() {
-        return shardName != null ? shardName.hashCode() : 0;
-    }
-
-  @Override
-  public Object toSerializable() {
-    return this;
-  }
-
-  public static PrimaryNotFound fromSerializable(final Object message){
-    return (PrimaryNotFound) message;
-  }
-}
index dea0851..1d8edec 100644 (file)
@@ -9,7 +9,9 @@
 package org.opendaylight.controller.cluster.datastore.messages;
 
 import akka.actor.ActorPath;
+import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
+import akka.serialization.Serialization;
 import org.opendaylight.controller.cluster.datastore.util.InstanceIdentifierUtils;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
 import org.opendaylight.controller.protobuff.messages.registration.ListenerRegistrationMessages;
@@ -20,15 +22,15 @@ public class RegisterChangeListener implements SerializableMessage {
             ListenerRegistrationMessages.RegisterChangeListener.class;
 
     private final YangInstanceIdentifier path;
-    private final ActorPath dataChangeListenerPath;
+    private final ActorRef dataChangeListener;
     private final AsyncDataBroker.DataChangeScope scope;
 
 
     public RegisterChangeListener(YangInstanceIdentifier path,
-        ActorPath dataChangeListenerPath,
+        ActorRef dataChangeListener,
         AsyncDataBroker.DataChangeScope scope) {
         this.path = path;
-        this.dataChangeListenerPath = dataChangeListenerPath;
+        this.dataChangeListener = dataChangeListener;
         this.scope = scope;
     }
 
@@ -42,7 +44,7 @@ public class RegisterChangeListener implements SerializableMessage {
     }
 
     public ActorPath getDataChangeListenerPath() {
-        return dataChangeListenerPath;
+        return dataChangeListener.path();
     }
 
 
@@ -50,14 +52,14 @@ public class RegisterChangeListener implements SerializableMessage {
     public ListenerRegistrationMessages.RegisterChangeListener toSerializable() {
       return ListenerRegistrationMessages.RegisterChangeListener.newBuilder()
           .setInstanceIdentifierPath(InstanceIdentifierUtils.toSerializable(path))
-          .setDataChangeListenerActorPath(dataChangeListenerPath.toString())
+          .setDataChangeListenerActorPath(Serialization.serializedActorPath(dataChangeListener))
           .setDataChangeScope(scope.ordinal()).build();
     }
 
-  public static RegisterChangeListener fromSerializable(ActorSystem actorSystem,Object serializable){
+  public static RegisterChangeListener fromSerializable(ActorSystem actorSystem, Object serializable){
     ListenerRegistrationMessages.RegisterChangeListener o = (ListenerRegistrationMessages.RegisterChangeListener) serializable;
     return new RegisterChangeListener(InstanceIdentifierUtils.fromSerializable(o.getInstanceIdentifierPath()),
-                                                actorSystem.actorFor(o.getDataChangeListenerActorPath()).path(),
+                                                actorSystem.provider().resolveActorRef(o.getDataChangeListenerActorPath()),
                                               AsyncDataBroker.DataChangeScope.values()[o.getDataChangeScope()]);
   }
 
index bbfbbaa..a2f0485 100644 (file)
@@ -9,32 +9,34 @@
 package org.opendaylight.controller.cluster.datastore.messages;
 
 import akka.actor.ActorPath;
+import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
+import akka.serialization.Serialization;
 import org.opendaylight.controller.protobuff.messages.registration.ListenerRegistrationMessages;
 
 public class RegisterChangeListenerReply implements SerializableMessage{
   public static final Class<ListenerRegistrationMessages.RegisterChangeListenerReply> SERIALIZABLE_CLASS =
           ListenerRegistrationMessages.RegisterChangeListenerReply.class;
-  private final ActorPath listenerRegistrationPath;
+  private final ActorRef listenerRegistration;
 
-  public RegisterChangeListenerReply(final ActorPath listenerRegistrationPath) {
-    this.listenerRegistrationPath = listenerRegistrationPath;
+  public RegisterChangeListenerReply(final ActorRef listenerRegistration) {
+    this.listenerRegistration = listenerRegistration;
   }
 
   public ActorPath getListenerRegistrationPath() {
-    return listenerRegistrationPath;
+    return listenerRegistration.path();
   }
 
   @Override
   public ListenerRegistrationMessages.RegisterChangeListenerReply toSerializable() {
     return ListenerRegistrationMessages.RegisterChangeListenerReply.newBuilder()
-            .setListenerRegistrationPath(listenerRegistrationPath.toString()).build();
+            .setListenerRegistrationPath(Serialization.serializedActorPath(listenerRegistration)).build();
   }
 
   public static RegisterChangeListenerReply fromSerializable(final ActorSystem actorSystem,final Object serializable){
     ListenerRegistrationMessages.RegisterChangeListenerReply o = (ListenerRegistrationMessages.RegisterChangeListenerReply) serializable;
     return new RegisterChangeListenerReply(
-        actorSystem.actorFor(o.getListenerRegistrationPath()).path()
+        actorSystem.provider().resolveActorRef(o.getListenerRegistrationPath())
         );
   }
 }
index 6f9bb7f..b6250fc 100644 (file)
@@ -41,13 +41,11 @@ import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedEx
 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
-import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
@@ -209,25 +207,22 @@ public class ActorContext {
             return ret;
         }
         Future<Object> future = executeOperationAsync(shardManager,
-                new FindPrimary(shardName, true).toSerializable(), shardInitializationTimeout);
+                new FindPrimary(shardName, true), shardInitializationTimeout);
 
         return future.transform(new Mapper<Object, ActorSelection>() {
             @Override
             public ActorSelection checkedApply(Object response) throws Exception {
-                if(PrimaryFound.SERIALIZABLE_CLASS.isInstance(response)) {
-                    PrimaryFound found = PrimaryFound.fromSerializable(response);
+                if(response instanceof PrimaryFound) {
+                    PrimaryFound found = (PrimaryFound)response;
 
                     LOG.debug("Primary found {}", found.getPrimaryPath());
                     ActorSelection actorSelection = actorSystem.actorSelection(found.getPrimaryPath());
                     primaryShardActorSelectionCache.put(shardName, Futures.successful(actorSelection));
                     return actorSelection;
-                } else if(response instanceof ActorNotInitialized) {
-                    throw new NotInitializedException(
-                            String.format("Found primary shard %s but it's not initialized yet. " +
-                                          "Please try again later", shardName));
-                } else if(response instanceof PrimaryNotFound) {
-                    throw new PrimaryNotFoundException(
-                            String.format("No primary shard found for %S.", shardName));
+                } else if(response instanceof NotInitializedException) {
+                    throw (NotInitializedException)response;
+                } else if(response instanceof PrimaryNotFoundException) {
+                    throw (PrimaryNotFoundException)response;
                 } else if(response instanceof NoShardLeaderException) {
                     throw (NoShardLeaderException)response;
                 }
@@ -274,10 +269,8 @@ public class ActorContext {
                     LocalShardFound found = (LocalShardFound)response;
                     LOG.debug("Local shard found {}", found.getPath());
                     return found.getPath();
-                } else if(response instanceof ActorNotInitialized) {
-                    throw new NotInitializedException(
-                            String.format("Found local shard for %s but it's not initialized yet.",
-                                    shardName));
+                } else if(response instanceof NotInitializedException) {
+                    throw (NotInitializedException)response;
                 } else if(response instanceof LocalShardNotFound) {
                     throw new LocalShardNotFoundException(
                             String.format("Local shard for %s does not exist.", shardName));
index f6c8f07..57e0e26 100644 (file)
@@ -7,10 +7,10 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
-import static org.mockito.Mockito.any;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.mock;
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
@@ -28,7 +28,7 @@ import org.junit.Test;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
-import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
+import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
 import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
@@ -96,7 +96,7 @@ public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest {
             Assert.assertEquals("getPath", path, registerMsg.getPath());
             Assert.assertEquals("getScope", scope, registerMsg.getScope());
 
-            reply(new RegisterChangeListenerReply(getRef().path()));
+            reply(new RegisterChangeListenerReply(getRef()));
 
             for(int i = 0; (i < 20 * 5) && proxy.getListenerRegistrationActor() == null; i++) {
                 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
@@ -173,7 +173,7 @@ public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest {
             FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
             Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
 
-            reply(new ActorNotInitialized());
+            reply(new NotInitializedException("not initialized"));
 
             new Within(duration("1 seconds")) {
                 @Override
@@ -242,7 +242,7 @@ public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest {
                 @Override
                 public Future<Object> answer(InvocationOnMock invocation) {
                     proxy.close();
-                    return Futures.successful((Object)new RegisterChangeListenerReply(getRef().path()));
+                    return Futures.successful((Object)new RegisterChangeListenerReply(getRef()));
                 }
             };
 
index ae7a4f9..95b1b78 100644 (file)
@@ -9,16 +9,23 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.AddressFromURIString;
 import akka.actor.Props;
+import akka.cluster.Cluster;
+import akka.cluster.ClusterEvent;
+import akka.dispatch.Dispatchers;
 import akka.japi.Creator;
 import akka.pattern.Patterns;
 import akka.persistence.RecoveryCompleted;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
 import akka.util.Timeout;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Uninterruptibles;
+import com.typesafe.config.ConfigFactory;
 import java.net.URI;
 import java.util.Arrays;
 import java.util.Collection;
@@ -35,15 +42,16 @@ import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
+import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
+import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
-import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
@@ -75,6 +83,11 @@ public class ShardManagerTest extends AbstractActorTest {
     private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder().
             dataStoreType(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS);
 
+    private static ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) {
+        String name = new ShardIdentifier(shardName, memberName,"config").toString();
+        return TestActorRef.create(system, Props.create(MessageCollectorActor.class), name);
+    }
+
     @Before
     public void setUp() {
         MockitoAnnotations.initMocks(this);
@@ -100,21 +113,22 @@ public class ShardManagerTest extends AbstractActorTest {
     }
 
     private Props newPropsShardMgrWithMockShardActor() {
+        return newPropsShardMgrWithMockShardActor("shardManager", mockShardActor, new MockClusterWrapper(),
+                new MockConfiguration());
+    }
+
+    private Props newPropsShardMgrWithMockShardActor(final String name, final ActorRef shardActor,
+            final ClusterWrapper clusterWrapper, final Configuration config) {
         Creator<ShardManager> creator = new Creator<ShardManager>() {
             private static final long serialVersionUID = 1L;
             @Override
             public ShardManager create() throws Exception {
-                return new ShardManager(new MockClusterWrapper(), new MockConfiguration(),
-                        datastoreContextBuilder.build(), ready) {
-                    @Override
-                    protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
-                        return mockShardActor;
-                    }
-                };
+                return new ForwardingShardManager(clusterWrapper, config, datastoreContextBuilder.build(),
+                        ready, name, shardActor);
             }
         };
 
-        return Props.create(new DelegatingShardManagerCreator(creator));
+        return Props.create(new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId());
     }
 
     @Test
@@ -124,9 +138,9 @@ public class ShardManagerTest extends AbstractActorTest {
 
             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
 
-            shardManager.tell(new FindPrimary("non-existent", false).toSerializable(), getRef());
+            shardManager.tell(new FindPrimary("non-existent", false), getRef());
 
-            expectMsgEquals(duration("5 seconds"), new PrimaryNotFound("non-existent").toSerializable());
+            expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
         }};
     }
 
@@ -146,9 +160,9 @@ public class ShardManagerTest extends AbstractActorTest {
             shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
                     RaftState.Leader.name())), mockShardActor);
 
-            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
+            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
 
-            PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
+            PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
         }};
@@ -170,9 +184,9 @@ public class ShardManagerTest extends AbstractActorTest {
                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
             shardManager.tell(new LeaderStateChanged(memberId1, memberId2), mockShardActor);
 
-            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
+            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
 
-            PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
+            PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
                     primaryFound.getPrimaryPath().contains("member-2-shard-default"));
         }};
@@ -183,9 +197,9 @@ public class ShardManagerTest extends AbstractActorTest {
         new JavaTestKit(getSystem()) {{
             final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
 
-            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
+            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
 
-            expectMsgClass(duration("5 seconds"), ActorNotInitialized.class);
+            expectMsgClass(duration("5 seconds"), NotInitializedException.class);
         }};
     }
 
@@ -197,7 +211,7 @@ public class ShardManagerTest extends AbstractActorTest {
             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
             shardManager.tell(new ActorInitialized(), mockShardActor);
 
-            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
+            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
 
             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
         }};
@@ -215,15 +229,15 @@ public class ShardManagerTest extends AbstractActorTest {
             shardManager.tell(new RoleChangeNotification(memberId,
                     RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
 
-            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
+            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
 
             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
 
             shardManager.tell(new LeaderStateChanged(memberId, memberId), mockShardActor);
 
-            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
+            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
 
-            PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
+            PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
         }};
@@ -238,7 +252,7 @@ public class ShardManagerTest extends AbstractActorTest {
 
             // We're passing waitUntilInitialized = true to FindPrimary so the response should be
             // delayed until we send ActorInitialized and RoleChangeNotification.
-            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef());
+            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
 
             expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
 
@@ -254,7 +268,7 @@ public class ShardManagerTest extends AbstractActorTest {
 
             shardManager.tell(new LeaderStateChanged(memberId, memberId), mockShardActor);
 
-            PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
+            PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
             assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
                     primaryFound.getPrimaryPath().contains("member-1-shard-default"));
 
@@ -269,9 +283,9 @@ public class ShardManagerTest extends AbstractActorTest {
 
             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
 
-            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef());
+            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
 
-            expectMsgClass(duration("2 seconds"), ActorNotInitialized.class);
+            expectMsgClass(duration("2 seconds"), NotInitializedException.class);
 
             shardManager.tell(new ActorInitialized(), mockShardActor);
 
@@ -289,7 +303,7 @@ public class ShardManagerTest extends AbstractActorTest {
             shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
                     null, RaftState.Candidate.name()), mockShardActor);
 
-            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef());
+            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
 
             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
         }};
@@ -303,12 +317,78 @@ public class ShardManagerTest extends AbstractActorTest {
             shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
             shardManager.tell(new ActorInitialized(), mockShardActor);
 
-            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef());
+            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
 
             expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
         }};
     }
 
+    @Test
+    public void testOnReceiveFindPrimaryForRemoteShard() throws Exception {
+        String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
+
+        // Create an ActorSystem ShardManager actor for member-1.
+
+        final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
+        Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
+
+        ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
+
+        final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
+                newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
+                        new MockConfiguration()), shardManagerID);
+
+        // Create an ActorSystem ShardManager actor for member-2.
+
+        final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
+
+        Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
+
+        final ActorRef mockShardActor2 = newMockShardActor(system2, "astronauts", "member-2");
+
+        MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
+                put("default", Arrays.asList("member-1", "member-2")).
+                put("astronauts", Arrays.asList("member-2")).build());
+
+        final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
+                newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
+                        mockConfig2), shardManagerID);
+
+        new JavaTestKit(system1) {{
+
+            shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+            shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+
+            shardManager2.tell(new ActorInitialized(), mockShardActor2);
+
+            String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
+            shardManager2.tell(new LeaderStateChanged(memberId2, memberId2), mockShardActor2);
+            shardManager2.tell(new RoleChangeNotification(memberId2,
+                    RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
+
+            shardManager1.underlyingActor().waitForMemberUp();
+
+            shardManager1.tell(new FindPrimary("astronauts", false), getRef());
+
+            PrimaryFound found = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
+            String path = found.getPrimaryPath();
+            assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config"));
+
+            shardManager2.underlyingActor().verifyFindPrimary();
+
+            Cluster.get(system2).down(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
+
+            shardManager1.underlyingActor().waitForMemberRemoved();
+
+            shardManager1.tell(new FindPrimary("astronauts", false), getRef());
+
+            expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
+        }};
+
+        JavaTestKit.shutdownActorSystem(system1);
+        JavaTestKit.shutdownActorSystem(system2);
+    }
+
     @Test
     public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
         new JavaTestKit(getSystem()) {{
@@ -348,7 +428,7 @@ public class ShardManagerTest extends AbstractActorTest {
 
             shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
 
-            expectMsgClass(duration("5 seconds"), ActorNotInitialized.class);
+            expectMsgClass(duration("5 seconds"), NotInitializedException.class);
         }};
     }
 
@@ -371,42 +451,6 @@ public class ShardManagerTest extends AbstractActorTest {
         }};
     }
 
-    @Test
-    public void testOnReceiveMemberUp() throws Exception {
-        new JavaTestKit(getSystem()) {{
-            final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
-
-            MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
-
-            shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
-
-            PrimaryFound found = PrimaryFound.fromSerializable(expectMsgClass(duration("5 seconds"),
-                    PrimaryFound.SERIALIZABLE_CLASS));
-            String path = found.getPrimaryPath();
-            assertTrue("Found path contains " + path, path.contains("member-2-shard-astronauts-config"));
-        }};
-    }
-
-    @Test
-    public void testOnReceiveMemberDown() throws Exception {
-
-        new JavaTestKit(getSystem()) {{
-            final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
-
-            MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
-
-            shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
-
-            expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
-
-            MockClusterWrapper.sendMemberRemoved(shardManager, "member-2", getRef().path().toString());
-
-            shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
-
-            expectMsgClass(duration("5 seconds"), PrimaryNotFound.SERIALIZABLE_CLASS);
-        }};
-    }
-
     @Test
     public void testOnRecoveryJournalIsCleaned() {
         InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
@@ -804,4 +848,69 @@ public class ShardManagerTest extends AbstractActorTest {
             return delegate.create();
         }
     }
+
+    private static class ForwardingShardManager extends ShardManager {
+        private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1);
+        private CountDownLatch memberUpReceived = new CountDownLatch(1);
+        private CountDownLatch memberRemovedReceived = new CountDownLatch(1);
+        private final ActorRef shardActor;
+        private final String name;
+
+        protected ForwardingShardManager(ClusterWrapper cluster, Configuration configuration,
+                DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch, String name,
+                ActorRef shardActor) {
+            super(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch);
+            this.shardActor = shardActor;
+            this.name = name;
+        }
+
+        @Override
+        public void handleCommand(Object message) throws Exception {
+            try{
+                super.handleCommand(message);
+            } finally {
+                if(message instanceof FindPrimary) {
+                    findPrimaryMessageReceived.countDown();
+                } else if(message instanceof ClusterEvent.MemberUp) {
+                    String role = ((ClusterEvent.MemberUp)message).member().roles().head();
+                    if(!getCluster().getCurrentMemberName().equals(role)) {
+                        memberUpReceived.countDown();
+                    }
+                } else if(message instanceof ClusterEvent.MemberRemoved) {
+                    String role = ((ClusterEvent.MemberRemoved)message).member().roles().head();
+                    if(!getCluster().getCurrentMemberName().equals(role)) {
+                        memberRemovedReceived.countDown();
+                    }
+                }
+            }
+        }
+
+        @Override
+        public String persistenceId() {
+            return name;
+        }
+
+        @Override
+        protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
+            return shardActor;
+        }
+
+        void waitForMemberUp() {
+            assertEquals("MemberUp received", true,
+                    Uninterruptibles.awaitUninterruptibly(memberUpReceived, 5, TimeUnit.SECONDS));
+            memberUpReceived = new CountDownLatch(1);
+        }
+
+        void waitForMemberRemoved() {
+            assertEquals("MemberRemoved received", true,
+                    Uninterruptibles.awaitUninterruptibly(memberRemovedReceived, 5, TimeUnit.SECONDS));
+            memberRemovedReceived = new CountDownLatch(1);
+        }
+
+        void verifyFindPrimary() {
+            assertEquals("FindPrimary received", true,
+                    Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS));
+            findPrimaryMessageReceived = new CountDownLatch(1);
+        }
+    }
 }
index adc7f47..3e0bc42 100644 (file)
@@ -120,7 +120,7 @@ public class ShardTest extends AbstractShardTest {
                     "testRegisterChangeListener-DataChangeListener");
 
             shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
-                    dclActor.path(), AsyncDataBroker.DataChangeScope.BASE), getRef());
+                    dclActor, AsyncDataBroker.DataChangeScope.BASE), getRef());
 
             RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
                     RegisterChangeListenerReply.class);
@@ -208,7 +208,7 @@ public class ShardTest extends AbstractShardTest {
                     onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
 
             // Now send the RegisterChangeListener and wait for the reply.
-            shard.tell(new RegisterChangeListener(path, dclActor.path(),
+            shard.tell(new RegisterChangeListener(path, dclActor,
                     AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
 
             RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListenerReplyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListenerReplyTest.java
new file mode 100644 (file)
index 0000000..696a898
--- /dev/null
@@ -0,0 +1,58 @@
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import static junit.framework.TestCase.assertEquals;
+import akka.actor.Actor;
+import akka.serialization.Serialization;
+import akka.testkit.TestActorRef;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
+import org.opendaylight.controller.cluster.raft.TestActorFactory;
+import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import org.opendaylight.controller.protobuff.messages.registration.ListenerRegistrationMessages;
+
+public class RegisterChangeListenerReplyTest extends AbstractActorTest {
+
+    private TestActorFactory factory;
+
+
+    @Before
+    public void setUp(){
+        factory = new TestActorFactory(getSystem());
+    }
+
+    @After
+    public void shutDown(){
+        factory.close();
+    }
+
+    @Test
+    public void testToSerializable(){
+        TestActorRef<Actor> testActor = factory.createTestActor(MessageCollectorActor.props());
+
+        RegisterChangeListenerReply registerChangeListenerReply = new RegisterChangeListenerReply(testActor);
+
+        ListenerRegistrationMessages.RegisterChangeListenerReply serialized
+                = registerChangeListenerReply.toSerializable();
+
+        assertEquals(Serialization.serializedActorPath(testActor), serialized.getListenerRegistrationPath());
+    }
+
+    @Test
+    public void testFromSerializable(){
+        TestActorRef<Actor> testActor = factory.createTestActor(MessageCollectorActor.props());
+
+        RegisterChangeListenerReply registerChangeListenerReply = new RegisterChangeListenerReply(testActor);
+
+        ListenerRegistrationMessages.RegisterChangeListenerReply serialized
+                = registerChangeListenerReply.toSerializable();
+
+
+        RegisterChangeListenerReply fromSerialized
+                = RegisterChangeListenerReply.fromSerializable(getSystem(), serialized);
+
+        assertEquals(testActor.path().toString(), fromSerialized.getListenerRegistrationPath().toString());
+    }
+
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListenerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListenerTest.java
new file mode 100644 (file)
index 0000000..2354a79
--- /dev/null
@@ -0,0 +1,67 @@
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import static junit.framework.TestCase.assertEquals;
+import akka.actor.Actor;
+import akka.serialization.Serialization;
+import akka.testkit.TestActorRef;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
+import org.opendaylight.controller.cluster.raft.TestActorFactory;
+import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
+import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
+import org.opendaylight.controller.protobuff.messages.registration.ListenerRegistrationMessages;
+
+public class RegisterChangeListenerTest extends AbstractActorTest {
+
+    private TestActorFactory factory;
+
+    @Before
+    public void setUp(){
+        factory = new TestActorFactory(getSystem());
+    }
+
+    @After
+    public void shutDown(){
+        factory.close();
+    }
+
+    @Test
+    public void testToSerializable(){
+        TestActorRef<Actor> testActor = factory.createTestActor(MessageCollectorActor.props());
+        RegisterChangeListener registerChangeListener = new RegisterChangeListener(TestModel.TEST_PATH, testActor
+                , AsyncDataBroker.DataChangeScope.BASE);
+
+        ListenerRegistrationMessages.RegisterChangeListener serialized
+                = registerChangeListener.toSerializable();
+
+        NormalizedNodeMessages.InstanceIdentifier path = serialized.getInstanceIdentifierPath();
+
+        assertEquals("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test", path.getCode(0));
+        assertEquals(Serialization.serializedActorPath(testActor), serialized.getDataChangeListenerActorPath());
+        assertEquals(AsyncDataBroker.DataChangeScope.BASE.ordinal(), serialized.getDataChangeScope());
+
+    }
+
+    @Test
+    public void testFromSerializable(){
+        TestActorRef<Actor> testActor = factory.createTestActor(MessageCollectorActor.props());
+        RegisterChangeListener registerChangeListener = new RegisterChangeListener(TestModel.TEST_PATH, testActor
+                , AsyncDataBroker.DataChangeScope.SUBTREE);
+
+        ListenerRegistrationMessages.RegisterChangeListener serialized
+                = registerChangeListener.toSerializable();
+
+
+        RegisterChangeListener fromSerialized = RegisterChangeListener.fromSerializable(getSystem(), serialized);
+
+        assertEquals(TestModel.TEST_PATH, registerChangeListener.getPath());
+        assertEquals(testActor.path().toString(), fromSerialized.getDataChangeListenerPath().toString());
+        assertEquals(AsyncDataBroker.DataChangeScope.SUBTREE, fromSerialized.getScope());
+
+
+    }
+}
\ No newline at end of file
index 2746bcf..6b4f633 100644 (file)
@@ -37,13 +37,11 @@ import org.opendaylight.controller.cluster.datastore.DatastoreContext;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
-import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Await;
@@ -458,7 +456,7 @@ public class ActorContextTest extends AbstractActorTest{
                             mock(Configuration.class), dataStoreContext) {
                         @Override
                         protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
-                            return Futures.successful((Object) new PrimaryNotFound("foobar"));
+                            return Futures.successful((Object) new PrimaryNotFoundException("not found"));
                         }
                     };
 
@@ -491,7 +489,7 @@ public class ActorContextTest extends AbstractActorTest{
                             mock(Configuration.class), dataStoreContext) {
                         @Override
                         protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
-                            return Futures.successful((Object) new ActorNotInitialized());
+                            return Futures.successful((Object) new NotInitializedException("not iniislized"));
                         }
                     };
 
@@ -518,8 +516,8 @@ public class ActorContextTest extends AbstractActorTest{
 
             TestActorRef<MockShardManager> shardManagerActorRef = TestActorRef.create(getSystem(), MockShardManager.props());
             MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor();
-            shardManagerActor.addFindPrimaryResp("shard1", new PrimaryFound(shardActorRef1.path().toString()).toSerializable());
-            shardManagerActor.addFindPrimaryResp("shard2", new PrimaryFound(shardActorRef2.path().toString()).toSerializable());
+            shardManagerActor.addFindPrimaryResp("shard1", new PrimaryFound(shardActorRef1.path().toString()));
+            shardManagerActor.addFindPrimaryResp("shard2", new PrimaryFound(shardActorRef2.path().toString()));
             shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found"));
 
             Configuration mockConfig = mock(Configuration.class);
index fe40aa0..810b270 100644 (file)
@@ -14,14 +14,22 @@ import akka.actor.AddressFromURIString;
 import akka.cluster.ClusterEvent;
 import akka.cluster.MemberStatus;
 import akka.cluster.UniqueAddress;
-import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
-import scala.collection.JavaConversions;
 import java.util.HashSet;
 import java.util.Set;
+import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
+import scala.collection.JavaConversions;
 
 public class MockClusterWrapper implements ClusterWrapper{
 
     private Address selfAddress = new Address("akka.tcp", "test", "127.0.0.1", 2550);
+    private String currentMemberName = "member-1";
+
+    public MockClusterWrapper() {
+    }
+
+    public MockClusterWrapper(String currentMemberName) {
+        this.currentMemberName = currentMemberName;
+    }
 
     @Override
     public void subscribeToMemberEvents(ActorRef actorRef) {
@@ -29,7 +37,7 @@ public class MockClusterWrapper implements ClusterWrapper{
 
     @Override
     public String getCurrentMemberName() {
-        return "member-1";
+        return currentMemberName;
     }
 
     @Override
index 4ef7d65..0bc561f 100644 (file)
@@ -9,6 +9,8 @@
 package org.opendaylight.controller.cluster.datastore.utils;
 
 import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -18,11 +20,23 @@ import org.opendaylight.controller.cluster.datastore.Configuration;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy;
 
 public class MockConfiguration implements Configuration{
-    @Override public List<String> getMemberShardNames(final String memberName) {
-        return Arrays.asList("default");
+    private Map<String, List<String>> shardMembers = ImmutableMap.<String, List<String>>builder().
+            put("default", Arrays.asList("member-1", "member-2")).
+            /*put("astronauts", Arrays.asList("member-2", "member-3")).*/build();
+
+    public MockConfiguration() {
+    }
+
+    public MockConfiguration(Map<String, List<String>> shardMembers) {
+        this.shardMembers = shardMembers;
     }
 
-    @Override public Optional<String> getModuleNameFromNameSpace(
+    @Override
+    public List<String> getMemberShardNames(final String memberName) {
+        return new ArrayList<>(shardMembers.keySet());
+    }
+    @Override
+    public Optional<String> getModuleNameFromNameSpace(
         final String nameSpace) {
         return Optional.absent();
     }
@@ -44,7 +58,8 @@ public class MockConfiguration implements Configuration{
             return Arrays.asList("member-2", "member-3");
         }
 
-        return Collections.emptyList();
+        List<String> members = shardMembers.get(shardName);
+        return members != null ? members : Collections.<String>emptyList();
     }
 
     @Override public Set<String> getAllShardNames() {
index badec6f..0363462 100644 (file)
@@ -34,3 +34,105 @@ bounded-mailbox {
   mailbox-capacity = 1000
   mailbox-push-timeout-time = 100ms
 }
+
+Member1 {
+  bounded-mailbox {
+    mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
+    mailbox-capacity = 1000
+    mailbox-push-timeout-time = 100ms
+  }
+
+  in-memory-journal {
+    class = "org.opendaylight.controller.cluster.raft.utils.InMemoryJournal"
+  }
+
+  in-memory-snapshot-store {
+    class = "org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore"
+    plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
+  }
+
+  akka {
+    persistence.snapshot-store.plugin = "in-memory-snapshot-store"
+    persistence.journal.plugin = "in-memory-journal"
+    
+    loglevel = "DEBUG"
+    
+    actor {
+      provider = "akka.cluster.ClusterActorRefProvider"
+      
+      serializers {
+          java = "akka.serialization.JavaSerializer"
+          proto = "akka.remote.serialization.ProtobufSerializer"
+      }
+
+      serialization-bindings {
+          "com.google.protobuf.Message" = proto
+      }
+    }
+    remote {
+      log-remote-lifecycle-events = off
+      netty.tcp {
+        hostname = "127.0.0.1"
+        port = 2558
+      }
+    }
+
+    cluster {
+      auto-down-unreachable-after = 100s
+      
+      roles = [
+        "member-1"
+      ]
+    }
+  }
+}
+
+Member2 {
+  bounded-mailbox {
+    mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
+    mailbox-capacity = 1000
+    mailbox-push-timeout-time = 100ms
+  }
+  
+  in-memory-journal {
+    class = "org.opendaylight.controller.cluster.raft.utils.InMemoryJournal"
+  }
+
+  in-memory-snapshot-store {
+    class = "org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore"
+    plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
+  }
+  
+  akka {
+    persistence.snapshot-store.plugin = "in-memory-snapshot-store"
+    persistence.journal.plugin = "in-memory-journal"
+    
+    actor {
+      provider = "akka.cluster.ClusterActorRefProvider"
+      
+      serializers {
+          java = "akka.serialization.JavaSerializer"
+          proto = "akka.remote.serialization.ProtobufSerializer"
+      }
+
+      serialization-bindings {
+          "com.google.protobuf.Message" = proto
+      }
+    }
+    remote {
+      log-remote-lifecycle-events = off
+      netty.tcp {
+        hostname = "127.0.0.1"
+        port = 2559
+      }
+    }
+
+    cluster {
+      auto-down-unreachable-after = 100s
+      
+      roles = [
+        "member-2"
+      ]
+    }
+  }
+}

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.