Merge "Bug 1577: Gates access to Shard actor until its initialized"
authorMoiz Raja <moraja@cisco.com>
Wed, 15 Oct 2014 19:36:01 +0000 (19:36 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Wed, 15 Oct 2014 19:36:01 +0000 (19:36 +0000)
21 files changed:
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedPersistentActorWithMetering.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/exceptions/NotInitializedException.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/identifiers/ShardIdentifier.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ActorInitialized.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ActorNotInitialized.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/identifiers/ShardIdentifierTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/TestUtils.java

diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedPersistentActorWithMetering.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedPersistentActorWithMetering.java
new file mode 100644 (file)
index 0000000..365a5bd
--- /dev/null
@@ -0,0 +1,24 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.common.actor;
+
+/**
+ * Actor with its behaviour metered. Metering is enabled by configuration.
+ */
+public abstract class AbstractUntypedPersistentActorWithMetering extends AbstractUntypedPersistentActor {
+
+    public AbstractUntypedPersistentActorWithMetering() {
+        if (isMetricsCaptureEnabled())
+            getContext().become(new MeteringBehavior(this));
+    }
+
+    private boolean isMetricsCaptureEnabled(){
+        CommonConfig config = new CommonConfig(getContext().system().settings().config());
+        return config.isMetricCaptureEnabled();
+    }
+}
index c780881a2ffad1ed50695b7a38111068ec2f8e3f..5195a2f918c6248463e26663b953378948c8e0dc 100644 (file)
@@ -12,6 +12,7 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.dispatch.OnComplete;
 import akka.util.Timeout;
+import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
@@ -76,44 +77,44 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
 
         Preconditions.checkNotNull(path, "path should not be null");
         Preconditions.checkNotNull(listener, "listener should not be null");
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("Registering listener: {} for path: {} scope: {}", listener, path, scope);
-        }
-        ActorRef dataChangeListenerActor = actorContext.getActorSystem().actorOf(
-            DataChangeListener.props(listener ));
+
+        LOG.debug("Registering listener: {} for path: {} scope: {}", listener, path, scope);
 
         String shardName = ShardStrategyFactory.getStrategy(path).findShard(path);
 
-        Future future = actorContext.executeLocalShardOperationAsync(shardName,
-            new RegisterChangeListener(path, dataChangeListenerActor.path(), scope),
-            new Timeout(actorContext.getOperationDuration().$times(
-                REGISTER_DATA_CHANGE_LISTENER_TIMEOUT_FACTOR)));
+        Optional<ActorRef> shard = actorContext.findLocalShard(shardName);
 
-        if (future != null) {
-            final DataChangeListenerRegistrationProxy listenerRegistrationProxy =
+        //if shard is NOT local
+        if (!shard.isPresent()) {
+            LOG.debug("No local shard for shardName {} was found so returning a noop registration", shardName);
+            return new NoOpDataChangeListenerRegistration(listener);
+        }
+        //if shard is local
+        ActorRef dataChangeListenerActor = actorContext.getActorSystem().actorOf(DataChangeListener.props(listener));
+        Future future = actorContext.executeOperationAsync(shard.get(),
+                new RegisterChangeListener(path, dataChangeListenerActor.path(), scope),
+                new Timeout(actorContext.getOperationDuration().$times(REGISTER_DATA_CHANGE_LISTENER_TIMEOUT_FACTOR)));
+
+        final DataChangeListenerRegistrationProxy listenerRegistrationProxy =
                 new DataChangeListenerRegistrationProxy(listener, dataChangeListenerActor);
 
-            future.onComplete(new OnComplete(){
+        future.onComplete(new OnComplete() {
 
-                @Override public void onComplete(Throwable failure, Object result)
+            @Override
+            public void onComplete(Throwable failure, Object result)
                     throws Throwable {
-                    if(failure != null){
-                        LOG.error("Failed to register listener at path " + path.toString(), failure);
-                        return;
-                    }
-                    RegisterChangeListenerReply reply = (RegisterChangeListenerReply) result;
-                    listenerRegistrationProxy.setListenerRegistrationActor(actorContext
-                        .actorSelection(reply.getListenerRegistrationPath()));
+                if (failure != null) {
+                    LOG.error("Failed to register listener at path " + path.toString(), failure);
+                    return;
                 }
-            }, actorContext.getActorSystem().dispatcher());
-            return listenerRegistrationProxy;
-        }
-        if(LOG.isDebugEnabled()) {
-            LOG.debug(
-                "No local shard for shardName {} was found so returning a noop registration",
-                shardName);
-        }
-        return new NoOpDataChangeListenerRegistration(listener);
+                RegisterChangeListenerReply reply = (RegisterChangeListenerReply) result;
+                listenerRegistrationProxy.setListenerRegistrationActor(actorContext
+                        .actorSelection(reply.getListenerRegistrationPath()));
+            }
+        }, actorContext.getActorSystem().dispatcher());
+
+        return listenerRegistrationProxy;
+
     }
 
     @Override
index a3109b66b1116853ebb6977c99f72527468fea44..3934489646530c136d9cc3bfc2e41e8028cbb9ba 100644 (file)
@@ -33,6 +33,7 @@ import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
+import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
@@ -222,7 +223,9 @@ public class Shard extends RaftActor {
                 getLeader().forward(message, getContext());
             } else {
                 getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(
-                    "Could not find leader so transaction cannot be created")), getSelf());
+                    "Could not find shard leader so transaction cannot be created. This typically happens" +
+                            " when system is coming up or recovering and a leader is being elected. Try again" +
+                            " later.")), getSelf());
             }
         } else if (message instanceof PeerAddressResolved) {
             PeerAddressResolved resolved = (PeerAddressResolved) message;
@@ -522,6 +525,9 @@ public class Shard extends RaftActor {
         recoveryCoordinator = null;
         currentLogRecoveryBatch = null;
         updateJournalStats();
+
+        //notify shard manager
+        getContext().parent().tell(new ActorInitialized(), getSelf());
     }
 
     @Override
index a8a182380911db26f8c9530e363441d2d2b491cc..e68628dbf5c1fc992afb30ae986fff8ed8f6eef1 100644 (file)
@@ -24,11 +24,13 @@ import akka.persistence.RecoveryCompleted;
 import akka.persistence.RecoveryFailure;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
+import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
+import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
@@ -59,7 +61,7 @@ import java.util.Set;
  * <li> Monitor the cluster members and store their addresses
  * <ul>
  */
-public class ShardManager extends AbstractUntypedPersistentActor {
+public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     protected final LoggingAdapter LOG =
         Logging.getLogger(getContext().system(), this);
@@ -127,6 +129,8 @@ public class ShardManager extends AbstractUntypedPersistentActor {
             findLocalShard((FindLocalShard) message);
         } else if (message instanceof UpdateSchemaContext) {
             updateSchemaContext(message);
+        } else if(message instanceof ActorInitialized) {
+            onActorInitialized(message);
         } else if (message instanceof ClusterEvent.MemberUp){
             memberUp((ClusterEvent.MemberUp) message);
         } else if(message instanceof ClusterEvent.MemberRemoved) {
@@ -139,6 +143,31 @@ public class ShardManager extends AbstractUntypedPersistentActor {
 
     }
 
+    private void onActorInitialized(Object message) {
+        final ActorRef sender = getSender();
+
+        if (sender == null) {
+            return; //why is a non-actor sending this message? Just ignore.
+        }
+
+        String actorName = sender.path().name();
+        //find shard name from actor name; actor name is stringified shardId
+        ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(actorName).build();
+
+        if (shardId.getShardName() == null) {
+            return;
+        }
+        markShardAsInitialized(shardId.getShardName());
+    }
+
+    @VisibleForTesting protected void markShardAsInitialized(String shardName) {
+        LOG.debug("Initializing shard [{}]", shardName);
+        ShardInformation shardInformation = localShards.get(shardName);
+        if (shardInformation != null) {
+            shardInformation.setShardInitialized(true);
+        }
+    }
+
     @Override protected void handleRecover(Object message) throws Exception {
 
         if(message instanceof SchemaContextModules){
@@ -157,16 +186,23 @@ public class ShardManager extends AbstractUntypedPersistentActor {
     }
 
     private void findLocalShard(FindLocalShard message) {
-        ShardInformation shardInformation =
-            localShards.get(message.getShardName());
+        ShardInformation shardInformation = localShards.get(message.getShardName());
 
-        if(shardInformation != null){
-            getSender().tell(new LocalShardFound(shardInformation.getActor()), getSelf());
+        if(shardInformation == null){
+            getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
             return;
         }
 
-        getSender().tell(new LocalShardNotFound(message.getShardName()),
-            getSelf());
+        sendResponse(shardInformation, new LocalShardFound(shardInformation.getActor()));
+    }
+
+    private void sendResponse(ShardInformation shardInformation, Object message) {
+        if (!shardInformation.isShardInitialized()) {
+            getSender().tell(new ActorNotInitialized(), getSelf());
+            return;
+        }
+
+        getSender().tell(message, getSelf());
     }
 
     private void memberRemoved(ClusterEvent.MemberRemoved message) {
@@ -176,7 +212,7 @@ public class ShardManager extends AbstractUntypedPersistentActor {
     private void memberUp(ClusterEvent.MemberUp message) {
         String memberName = message.member().roles().head();
 
-        memberNameToAddress.put(memberName , message.member().address());
+        memberNameToAddress.put(memberName, message.member().address());
 
         for(ShardInformation info : localShards.values()){
             String shardName = info.getShardName();
@@ -229,28 +265,27 @@ public class ShardManager extends AbstractUntypedPersistentActor {
     }
 
     private void findPrimary(FindPrimary message) {
+        final ActorRef sender = getSender();
         String shardName = message.getShardName();
 
         // First see if the there is a local replica for the shard
         ShardInformation info = localShards.get(shardName);
-        if(info != null) {
+        if (info != null) {
             ActorPath shardPath = info.getActorPath();
-            if (shardPath != null) {
-                getSender()
-                    .tell(
-                        new PrimaryFound(shardPath.toString()).toSerializable(),
-                        getSelf());
-                return;
-            }
+            sendResponse(info, new PrimaryFound(shardPath.toString()).toSerializable());
+            return;
         }
 
-        List<String> members =
-            configuration.getMembersFromShardName(shardName);
+        List<String> members = configuration.getMembersFromShardName(shardName);
 
         if(cluster.getCurrentMemberName() != null) {
             members.remove(cluster.getCurrentMemberName());
         }
 
+        /**
+         * FIXME: Instead of sending remote shard actor path back to sender,
+         * forward FindPrimary message to remote shard manager
+         */
         // There is no way for us to figure out the primary (for now) so assume
         // that one of the remote nodes is a primary
         for(String memberName : members) {
@@ -376,6 +411,7 @@ public class ShardManager extends AbstractUntypedPersistentActor {
         private final ActorRef actor;
         private final ActorPath actorPath;
         private final Map<ShardIdentifier, String> peerAddresses;
+        private boolean shardInitialized = false; //flag that determines if the actor is ready for business
 
         private ShardInformation(String shardName, ActorRef actor,
             Map<ShardIdentifier, String> peerAddresses) {
@@ -413,6 +449,14 @@ public class ShardManager extends AbstractUntypedPersistentActor {
 
             }
         }
+
+        public boolean isShardInitialized() {
+            return shardInitialized;
+        }
+
+        public void setShardInitialized(boolean shardInitialized) {
+            this.shardInitialized = shardInitialized;
+        }
     }
 
     private static class ShardManagerCreator implements Creator<ShardManager> {
index a7a5b31b174e4e0d03db192aa367a43ebe67ad62..515be372e8c76b7008b7356a666bbf94e19a1972 100644 (file)
@@ -157,7 +157,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho
             }
             ActorSelection cohort = actorContext.actorSelection(actorPath);
 
-            futureList.add(actorContext.executeRemoteOperationAsync(cohort, message));
+            futureList.add(actorContext.executeOperationAsync(cohort, message));
         }
 
         return Futures.sequence(futureList, actorContext.getActorSystem().dispatcher());
index 6cf16b44268c6c16e26e0658632f61994ee33971..19d9a66a528eb417d5bff41948641b68e9c8e481 100644 (file)
@@ -22,6 +22,7 @@ import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.SettableFuture;
 
+import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
@@ -156,7 +157,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             if(remoteTransactionActorsMB.get()) {
                 for(ActorSelection actor : remoteTransactionActors) {
                     LOG.trace("Sending CloseTransaction to {}", actor);
-                    actorContext.sendRemoteOperationAsync(actor,
+                    actorContext.sendOperationAsync(actor,
                             new CloseTransaction().toSerializable());
                 }
             }
@@ -379,9 +380,14 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         }
 
         try {
-            Object response = actorContext.executeShardOperation(shardName,
-                new CreateTransaction(identifier.toString(), this.transactionType.ordinal(),
-                    getTransactionChainId()).toSerializable());
+            Optional<ActorSelection> primaryShard = actorContext.findPrimaryShard(shardName);
+            if (!primaryShard.isPresent()) {
+                throw new PrimaryNotFoundException("Primary could not be found for shard " + shardName);
+            }
+
+            Object response = actorContext.executeOperation(primaryShard.get(),
+                    new CreateTransaction(identifier.toString(), this.transactionType.ordinal(),
+                            getTransactionChainId()).toSerializable());
             if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
                 CreateTransactionReply reply =
                     CreateTransactionReply.fromSerializable(response);
@@ -502,7 +508,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             if(LOG.isDebugEnabled()) {
                 LOG.debug("Tx {} closeTransaction called", identifier);
             }
-            actorContext.sendRemoteOperationAsync(getActor(), new CloseTransaction().toSerializable());
+            actorContext.sendOperationAsync(getActor(), new CloseTransaction().toSerializable());
         }
 
         @Override
@@ -513,7 +519,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             }
             // Send the ReadyTransaction message to the Tx actor.
 
-            final Future<Object> replyFuture = actorContext.executeRemoteOperationAsync(getActor(),
+            final Future<Object> replyFuture = actorContext.executeOperationAsync(getActor(),
                     new ReadyTransaction().toSerializable());
 
             // Combine all the previously recorded put/merge/delete operation reply Futures and the
@@ -576,8 +582,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             if(LOG.isDebugEnabled()) {
                 LOG.debug("Tx {} deleteData called path = {}", identifier, path);
             }
-            recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
-                    new DeleteData(path).toSerializable() ));
+            recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(),
+                    new DeleteData(path).toSerializable()));
         }
 
         @Override
@@ -585,7 +591,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             if(LOG.isDebugEnabled()) {
                 LOG.debug("Tx {} mergeData called path = {}", identifier, path);
             }
-            recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
+            recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(),
                     new MergeData(path, data, schemaContext).toSerializable()));
         }
 
@@ -594,7 +600,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             if(LOG.isDebugEnabled()) {
                 LOG.debug("Tx {} writeData called path = {}", identifier, path);
             }
-            recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
+            recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(),
                     new WriteData(path, data, schemaContext).toSerializable()));
         }
 
@@ -686,7 +692,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                 }
             };
 
-            Future<Object> readFuture = actorContext.executeRemoteOperationAsync(getActor(),
+            Future<Object> readFuture = actorContext.executeOperationAsync(getActor(),
                     new ReadData(path).toSerializable());
             readFuture.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
         }
@@ -773,7 +779,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                 }
             };
 
-            Future<Object> future = actorContext.executeRemoteOperationAsync(getActor(),
+            Future<Object> future = actorContext.executeOperationAsync(getActor(),
                     new DataExists(path).toSerializable());
             future.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
         }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/exceptions/NotInitializedException.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/exceptions/NotInitializedException.java
new file mode 100644 (file)
index 0000000..302d684
--- /dev/null
@@ -0,0 +1,14 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.exceptions;
+
+public class NotInitializedException extends RuntimeException {
+    public NotInitializedException(String message) {
+        super(message);
+    }
+}
index c6928815938058dbddc7bdb8b920c60fa2e0f7da..d65af61ba3f8a957b6f67bdc355035fa93297431 100644 (file)
@@ -10,11 +10,17 @@ package org.opendaylight.controller.cluster.datastore.identifiers;
 
 import com.google.common.base.Preconditions;
 
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
 public class ShardIdentifier {
     private final String shardName;
     private final String memberName;
     private final String type;
 
+    //format and pattern should be in sync
+    private final String format = "%s-shard-%s-%s";
+    private static final Pattern pattern = Pattern.compile("(\\S+)-shard-(\\S+)-(\\S+)");
 
     public ShardIdentifier(String shardName, String memberName, String type) {
 
@@ -60,15 +66,31 @@ public class ShardIdentifier {
     }
 
     @Override public String toString() {
-        StringBuilder builder = new StringBuilder();
-        builder.append(memberName).append("-shard-").append(shardName).append("-").append(type);
-        return builder.toString();
+        //ensure the output of toString matches the pattern above
+        return new StringBuilder(memberName)
+                    .append("-shard-")
+                    .append(shardName)
+                    .append("-")
+                    .append(type)
+                    .toString();
     }
 
     public static Builder builder(){
         return new Builder();
     }
 
+    public String getShardName() {
+        return shardName;
+    }
+
+    public String getMemberName() {
+        return memberName;
+    }
+
+    public String getType() {
+        return type;
+    }
+
     public static class Builder {
         private String shardName;
         private String memberName;
@@ -93,5 +115,15 @@ public class ShardIdentifier {
             return this;
         }
 
+        public Builder fromShardIdString(String shardId){
+            Matcher matcher = pattern.matcher(shardId);
+
+            if (matcher.matches()) {
+                memberName = matcher.group(1);
+                shardName = matcher.group(2);
+                type = matcher.group(3);
+            }
+            return this;
+        }
     }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ActorInitialized.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ActorInitialized.java
new file mode 100644 (file)
index 0000000..b034f87
--- /dev/null
@@ -0,0 +1,13 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import java.io.Serializable;
+
+public class ActorInitialized implements Serializable {
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ActorNotInitialized.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ActorNotInitialized.java
new file mode 100644 (file)
index 0000000..de25ef9
--- /dev/null
@@ -0,0 +1,13 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import java.io.Serializable;
+
+public class ActorNotInitialized implements Serializable {
+}
index 8ba333d2799a5177c7b1b8b5b09ab6b4ec87d126..44f4ef77d7ff057dbea32ebfd9d3404f5b30f862 100644 (file)
@@ -13,12 +13,14 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
-import akka.pattern.Patterns;
 import akka.util.Timeout;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.Configuration;
-import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
+import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
+import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
@@ -101,14 +103,17 @@ public class ActorContext {
     }
 
     /**
-     * Finds the primary for a given shard
+     * Finds the primary shard for the given shard name
      *
      * @param shardName
      * @return
      */
-    public ActorSelection findPrimary(String shardName) {
-        String path = findPrimaryPath(shardName);
-        return actorSystem.actorSelection(path);
+    public Optional<ActorSelection> findPrimaryShard(String shardName) {
+        String path = findPrimaryPathOrNull(shardName);
+        if (path == null){
+            return Optional.absent();
+        }
+        return Optional.of(actorSystem.actorSelection(path));
     }
 
     /**
@@ -118,36 +123,36 @@ public class ActorContext {
      * @return a reference to a local shard actor which represents the shard
      *         specified by the shardName
      */
-    public ActorRef findLocalShard(String shardName) {
-        Object result = executeLocalOperation(shardManager,
-            new FindLocalShard(shardName));
+    public Optional<ActorRef> findLocalShard(String shardName) {
+        Object result = executeOperation(shardManager, new FindLocalShard(shardName));
 
         if (result instanceof LocalShardFound) {
             LocalShardFound found = (LocalShardFound) result;
-
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("Local shard found {}", found.getPath());
-            }
-            return found.getPath();
+            LOG.debug("Local shard found {}", found.getPath());
+            return Optional.of(found.getPath());
         }
 
-        return null;
+        return Optional.absent();
     }
 
 
-    public String findPrimaryPath(String shardName) {
-        Object result = executeLocalOperation(shardManager,
-            new FindPrimary(shardName).toSerializable());
+    private String findPrimaryPathOrNull(String shardName) {
+        Object result = executeOperation(shardManager, new FindPrimary(shardName).toSerializable());
 
         if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
             PrimaryFound found = PrimaryFound.fromSerializable(result);
 
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("Primary found {}", found.getPrimaryPath());
-            }
+            LOG.debug("Primary found {}", found.getPrimaryPath());
             return found.getPrimaryPath();
+
+        } else if (result.getClass().equals(ActorNotInitialized.class)){
+            throw new NotInitializedException(
+                String.format("Found primary shard[%s] but its not initialized yet. Please try again later", shardName)
+            );
+
+        } else {
+            return null;
         }
-        throw new PrimaryNotFoundException("Could not find primary for shardName " + shardName);
     }
 
 
@@ -158,16 +163,25 @@ public class ActorContext {
      * @param message
      * @return The response of the operation
      */
-    public Object executeLocalOperation(ActorRef actor, Object message) {
-        Future<Object> future = ask(actor, message, operationTimeout);
+    public Object executeOperation(ActorRef actor, Object message) {
+        Future<Object> future = executeOperationAsync(actor, message, operationTimeout);
 
         try {
             return Await.result(future, operationDuration);
         } catch (Exception e) {
-            throw new TimeoutException("Sending message " + message.getClass().toString() + " to actor " + actor.toString() + " failed" , e);
+            throw new TimeoutException("Sending message " + message.getClass().toString() +
+                    " to actor " + actor.toString() + " failed. Try again later.", e);
         }
     }
 
+    public Future<Object> executeOperationAsync(ActorRef actor, Object message, Timeout timeout) {
+        Preconditions.checkArgument(actor != null, "actor must not be null");
+        Preconditions.checkArgument(message != null, "message must not be null");
+
+        LOG.debug("Sending message {} to {}", message.getClass().toString(), actor.toString());
+        return ask(actor, message, timeout);
+    }
+
     /**
      * Execute an operation on a remote actor and wait for it's response
      *
@@ -175,19 +189,14 @@ public class ActorContext {
      * @param message
      * @return
      */
-    public Object executeRemoteOperation(ActorSelection actor, Object message) {
-
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("Sending remote message {} to {}", message.getClass().toString(),
-                actor.toString());
-        }
-        Future<Object> future = ask(actor, message, operationTimeout);
+    public Object executeOperation(ActorSelection actor, Object message) {
+        Future<Object> future = executeOperationAsync(actor, message);
 
         try {
             return Await.result(future, operationDuration);
         } catch (Exception e) {
             throw new TimeoutException("Sending message " + message.getClass().toString() +
-                    " to actor " + actor.toString() + " failed, e);
+                    " to actor " + actor.toString() + " failed. Try again later.", e);
         }
     }
 
@@ -198,11 +207,12 @@ public class ActorContext {
      * @param message the message to send
      * @return a Future containing the eventual result
      */
-    public Future<Object> executeRemoteOperationAsync(ActorSelection actor, Object message) {
+    public Future<Object> executeOperationAsync(ActorSelection actor, Object message) {
+        Preconditions.checkArgument(actor != null, "actor must not be null");
+        Preconditions.checkArgument(message != null, "message must not be null");
+
+        LOG.debug("Sending message {} to {}", message.getClass().toString(), actor.toString());
 
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("Sending remote message {} to {}", message.getClass().toString(), actor.toString());
-        }
         return ask(actor, message, operationTimeout);
     }
 
@@ -213,86 +223,15 @@ public class ActorContext {
      * @param actor the ActorSelection
      * @param message the message to send
      */
-    public void sendRemoteOperationAsync(ActorSelection actor, Object message) {
-        actor.tell(message, ActorRef.noSender());
-    }
-
-    public void sendShardOperationAsync(String shardName, Object message) {
-        ActorSelection primary = findPrimary(shardName);
-
-        primary.tell(message, ActorRef.noSender());
-    }
+    public void sendOperationAsync(ActorSelection actor, Object message) {
+        Preconditions.checkArgument(actor != null, "actor must not be null");
+        Preconditions.checkArgument(message != null, "message must not be null");
 
+        LOG.debug("Sending message {} to {}", message.getClass().toString(), actor.toString());
 
-    /**
-     * Execute an operation on the primary for a given shard
-     * <p>
-     * This method first finds the primary for a given shard ,then sends
-     * the message to the remote shard and waits for a response
-     * </p>
-     *
-     * @param shardName
-     * @param message
-     * @return
-     * @throws org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException         if the message to the remote shard times out
-     * @throws org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException if the primary shard is not found
-     */
-    public Object executeShardOperation(String shardName, Object message) {
-        ActorSelection primary = findPrimary(shardName);
-
-        return executeRemoteOperation(primary, message);
-    }
-
-    /**
-     * Execute an operation on the the local shard only
-     * <p>
-     *     This method first finds the address of the local shard if any. It then
-     *     executes the operation on it.
-     * </p>
-     *
-     * @param shardName the name of the shard on which the operation needs to be executed
-     * @param message the message that needs to be sent to the shard
-     * @return the message that was returned by the local actor on which the
-     *         the operation was executed. If a local shard was not found then
-     *         null is returned
-     * @throws org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException
-     *         if the operation does not complete in a specified time duration
-     */
-    public Object executeLocalShardOperation(String shardName, Object message) {
-        ActorRef local = findLocalShard(shardName);
-
-        if(local != null) {
-            return executeLocalOperation(local, message);
-        }
-
-        return null;
-    }
-
-
-    /**
-     * Execute an operation on the the local shard only asynchronously
-     *
-     * <p>
-     *     This method first finds the address of the local shard if any. It then
-     *     executes the operation on it.
-     * </p>
-     *
-     * @param shardName the name of the shard on which the operation needs to be executed
-     * @param message the message that needs to be sent to the shard
-     * @param timeout the amount of time that this method should wait for a response before timing out
-     * @return null if the shard could not be located else a future on which the caller can wait
-     *
-     */
-    public Future executeLocalShardOperationAsync(String shardName, Object message, Timeout timeout) {
-        ActorRef local = findLocalShard(shardName);
-        if(local == null){
-            return null;
-        }
-        return Patterns.ask(local, message, timeout);
+        actor.tell(message, ActorRef.noSender());
     }
 
-
-
     public void shutdown() {
         shardManager.tell(PoisonPill.getInstance(), null);
         actorSystem.shutdown();
@@ -337,10 +276,13 @@ public class ActorContext {
      */
     public void broadcast(Object message){
         for(String shardName : configuration.getAllShardNames()){
-            try {
-                sendShardOperationAsync(shardName, message);
-            } catch(Exception e){
-                LOG.warn("broadcast failed to send message " +  message.getClass().getSimpleName() + " to shard " + shardName, e);
+
+            Optional<ActorSelection> primary = findPrimaryShard(shardName);
+            if (primary.isPresent()) {
+                primary.get().tell(message, ActorRef.noSender());
+            } else {
+                LOG.warn("broadcast failed to send message {} to shard {}. Primary not found",
+                        message.getClass().getSimpleName(), shardName);
             }
         }
     }
index 2ed11cfbda21eff65b3cb6223b671731c891ab67..c79d76203589c51126dbd281adfd7dcaf4712db4 100644 (file)
@@ -82,7 +82,7 @@ public class DataChangeListenerProxyTest extends AbstractActorTest {
         ActorContext
             testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)), new MockClusterWrapper(), new MockConfiguration());
         Object messages = testContext
-            .executeLocalOperation(actorRef, "messages");
+            .executeOperation(actorRef, "messages");
 
         Assert.assertNotNull(messages);
 
index ab3ff795d3cb4a4e66e3ddd708236a8d15eec365..aaf080bdf7d8d50de4a3f31713143389994872a6 100644 (file)
@@ -66,7 +66,7 @@ public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest{
         ActorContext
             testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)),new MockClusterWrapper(), new MockConfiguration());
         Object messages = testContext
-            .executeLocalOperation(actorRef, "messages");
+            .executeOperation(actorRef, "messages");
 
         assertNotNull(messages);
 
@@ -95,7 +95,7 @@ public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest{
         ActorContext
             testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)),new MockClusterWrapper(), new MockConfiguration());
         Object messages = testContext
-            .executeLocalOperation(actorRef, "messages");
+            .executeOperation(actorRef, "messages");
 
         assertNotNull(messages);
 
index 08c3ea9602adb9cd891f9e1fe573ded671e5d6d7..d57a5eea4a684c2572376e303f227907426b10ce 100644 (file)
@@ -8,6 +8,7 @@ import akka.actor.Props;
 import akka.dispatch.ExecutionContexts;
 import akka.dispatch.Futures;
 import akka.util.Timeout;
+import com.google.common.base.Optional;
 import com.google.common.util.concurrent.MoreExecutors;
 import org.junit.After;
 import org.junit.Before;
@@ -97,11 +98,11 @@ public class DistributedDataStoreTest extends AbstractActorTest{
 
         ListenerRegistration registration =
                 distributedDataStore.registerChangeListener(TestModel.TEST_PATH, new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
-            @Override
-            public void onDataChanged(AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
-                throw new UnsupportedOperationException("onDataChanged");
-            }
-        }, AsyncDataBroker.DataChangeScope.BASE);
+                    @Override
+                    public void onDataChanged(AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
+                        throw new UnsupportedOperationException("onDataChanged");
+                    }
+                }, AsyncDataBroker.DataChangeScope.BASE);
 
         // Since we do not expect the shard to be local registration will return a NoOpRegistration
         assertTrue(registration instanceof NoOpDataChangeListenerRegistration);
@@ -119,8 +120,9 @@ public class DistributedDataStoreTest extends AbstractActorTest{
         Future future = mock(Future.class);
         when(actorContext.getOperationDuration()).thenReturn(FiniteDuration.apply(5, TimeUnit.SECONDS));
         when(actorContext.getActorSystem()).thenReturn(getSystem());
+        when(actorContext.findLocalShard(anyString())).thenReturn(Optional.of(doNothingActorRef));
         when(actorContext
-            .executeLocalShardOperationAsync(anyString(), anyObject(), any(Timeout.class))).thenReturn(future);
+                .executeOperationAsync(eq(doNothingActorRef), anyObject(), any(Timeout.class))).thenReturn(future);
 
         ListenerRegistration registration =
             distributedDataStore.registerChangeListener(TestModel.TEST_PATH,
@@ -153,8 +155,9 @@ public class DistributedDataStoreTest extends AbstractActorTest{
         when(actorSystem.dispatcher()).thenReturn(executor);
         when(actorSystem.actorOf(any(Props.class))).thenReturn(doNothingActorRef);
         when(actorContext.getActorSystem()).thenReturn(actorSystem);
+        when(actorContext.findLocalShard(anyString())).thenReturn(Optional.of(doNothingActorRef));
         when(actorContext
-            .executeLocalShardOperationAsync(anyString(), anyObject(), any(Timeout.class))).thenReturn(f);
+            .executeOperationAsync(eq(doNothingActorRef), anyObject(), any(Timeout.class))).thenReturn(f);
         when(actorContext.actorSelection(any(ActorPath.class))).thenReturn(actorSelection);
 
         ListenerRegistration registration =
@@ -195,8 +198,9 @@ public class DistributedDataStoreTest extends AbstractActorTest{
         when(actorSystem.dispatcher()).thenReturn(executor);
         when(actorSystem.actorOf(any(Props.class))).thenReturn(doNothingActorRef);
         when(actorContext.getActorSystem()).thenReturn(actorSystem);
+        when(actorContext.findLocalShard(anyString())).thenReturn(Optional.of(doNothingActorRef));
         when(actorContext
-            .executeLocalShardOperationAsync(anyString(), anyObject(), any(Timeout.class))).thenReturn(f);
+            .executeOperationAsync(eq(doNothingActorRef), anyObject(), any(Timeout.class))).thenReturn(f);
         when(actorContext.actorSelection(any(ActorPath.class))).thenReturn(actorSelection);
 
         ListenerRegistration registration =
index 8a3cdd0c8aa3b9890811c8a52318c8c18051d7b8..ed7b6866bf5fffff21cae1f61379a3126257ea07 100644 (file)
@@ -22,6 +22,8 @@ import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
@@ -29,6 +31,7 @@ import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
@@ -53,6 +56,8 @@ import static org.mockito.Mockito.when;
 
 public class ShardManagerTest {
     private static ActorSystem system;
+    Configuration mockConfig = new MockConfiguration();
+    private static ActorRef defaultShardMockActor;
 
     @BeforeClass
     public static void setUpClass() {
@@ -60,13 +65,18 @@ public class ShardManagerTest {
         myJournal.put("class", "org.opendaylight.controller.cluster.datastore.ShardManagerTest$MyJournal");
         myJournal.put("plugin-dispatcher", "akka.actor.default-dispatcher");
         Config config = ConfigFactory.load()
-            .withValue("akka.persistence.journal.plugin",
-                ConfigValueFactory.fromAnyRef("my-journal"))
-            .withValue("my-journal", ConfigValueFactory.fromMap(myJournal));
+                .withValue("akka.persistence.journal.plugin",
+                        ConfigValueFactory.fromAnyRef("my-journal"))
+                .withValue("my-journal", ConfigValueFactory.fromMap(myJournal));
 
         MyJournal.clear();
 
         system = ActorSystem.create("test", config);
+
+        String name = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1","config").toString();
+        defaultShardMockActor = system.actorOf(Props.create(DoNothingActor.class), name);
+
+
     }
 
     @AfterClass
@@ -86,15 +96,15 @@ public class ShardManagerTest {
         new JavaTestKit(system) {
             {
                 final Props props = ShardManager
-                    .props("config", new MockClusterWrapper(),
-                        new MockConfiguration(), new DatastoreContext());
+                        .props("config", new MockClusterWrapper(),
+                                new MockConfiguration(), new DatastoreContext());
 
                 final ActorRef subject = getSystem().actorOf(props);
 
                 subject.tell(new FindPrimary("inventory").toSerializable(), getRef());
 
                 expectMsgEquals(duration("2 seconds"),
-                    new PrimaryNotFound("inventory").toSerializable());
+                        new PrimaryNotFound("inventory").toSerializable());
             }};
     }
 
@@ -103,17 +113,19 @@ public class ShardManagerTest {
 
         new JavaTestKit(system) {{
             final Props props = ShardManager
-                .props("config", new MockClusterWrapper(),
-                    new MockConfiguration(), new DatastoreContext());
+                    .props("config", new MockClusterWrapper(),
+                            new MockConfiguration(), new DatastoreContext());
 
             final ActorRef subject = getSystem().actorOf(props);
 
             subject.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+            subject.tell(new ActorInitialized(), defaultShardMockActor);
 
             subject.tell(new FindPrimary(Shard.DEFAULT_NAME).toSerializable(), getRef());
 
             expectMsgClass(duration("1 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
-        }};
+        }
+        };
     }
 
     @Test
@@ -121,8 +133,8 @@ public class ShardManagerTest {
 
         new JavaTestKit(system) {{
             final Props props = ShardManager
-                .props("config", new MockClusterWrapper(),
-                    new MockConfiguration(), new DatastoreContext());
+                    .props("config", new MockClusterWrapper(),
+                            new MockConfiguration(), new DatastoreContext());
 
             final ActorRef subject = getSystem().actorOf(props);
 
@@ -150,12 +162,13 @@ public class ShardManagerTest {
 
         new JavaTestKit(system) {{
             final Props props = ShardManager
-                .props("config", mockClusterWrapper,
-                    new MockConfiguration(), new DatastoreContext());
+                    .props("config", mockClusterWrapper,
+                            new MockConfiguration(), new DatastoreContext());
 
             final ActorRef subject = getSystem().actorOf(props);
 
             subject.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+            subject.tell(new ActorInitialized(), defaultShardMockActor);
 
             subject.tell(new FindLocalShard(Shard.DEFAULT_NAME), getRef());
 
@@ -171,7 +184,7 @@ public class ShardManagerTest {
             }.get(); // this extracts the received message
 
             assertTrue(out.path().toString(),
-                out.path().toString().contains("member-1-shard-default-config"));
+                    out.path().toString().contains("member-1-shard-default-config"));
         }};
     }
 
@@ -180,8 +193,8 @@ public class ShardManagerTest {
 
         new JavaTestKit(system) {{
             final Props props = ShardManager
-                .props("config", new MockClusterWrapper(),
-                    new MockConfiguration(), new DatastoreContext());
+                    .props("config", new MockClusterWrapper(),
+                            new MockConfiguration(), new DatastoreContext());
 
             final ActorRef subject = getSystem().actorOf(props);
 
@@ -211,8 +224,8 @@ public class ShardManagerTest {
 
         new JavaTestKit(system) {{
             final Props props = ShardManager
-                .props("config", new MockClusterWrapper(),
-                    new MockConfiguration(), new DatastoreContext());
+                    .props("config", new MockClusterWrapper(),
+                            new MockConfiguration(), new DatastoreContext());
 
             final ActorRef subject = getSystem().actorOf(props);
 
@@ -233,14 +246,14 @@ public class ShardManagerTest {
     @Test
     public void testOnRecoveryJournalIsEmptied(){
         MyJournal.addToJournal(1L, new ShardManager.SchemaContextModules(
-            ImmutableSet.of("foo")));
+                ImmutableSet.of("foo")));
 
         assertEquals(1, MyJournal.get().size());
 
         new JavaTestKit(system) {{
             final Props props = ShardManager
-                .props("config", new MockClusterWrapper(),
-                    new MockConfiguration(), new DatastoreContext());
+                    .props("config", new MockClusterWrapper(),
+                            new MockConfiguration(), new DatastoreContext());
 
             final ActorRef subject = getSystem().actorOf(props);
 
@@ -257,10 +270,10 @@ public class ShardManagerTest {
     public void testOnRecoveryPreviouslyKnownModulesAreDiscovered() throws Exception {
         new JavaTestKit(system) {{
             final Props props = ShardManager
-                .props("config", new MockClusterWrapper(),
-                    new MockConfiguration(), new DatastoreContext());
+                    .props("config", new MockClusterWrapper(),
+                            new MockConfiguration(), new DatastoreContext());
             final TestActorRef<ShardManager> subject =
-                TestActorRef.create(system, props);
+                    TestActorRef.create(system, props);
 
             subject.underlyingActor().onReceiveRecover(new ShardManager.SchemaContextModules(ImmutableSet.of("foo")));
 
@@ -272,13 +285,13 @@ public class ShardManagerTest {
 
     @Test
     public void testOnUpdateSchemaContextUpdateKnownModulesIfTheyContainASuperSetOfTheKnownModules()
-        throws Exception {
+            throws Exception {
         new JavaTestKit(system) {{
             final Props props = ShardManager
-                .props("config", new MockClusterWrapper(),
-                    new MockConfiguration(), new DatastoreContext());
+                    .props("config", new MockClusterWrapper(),
+                            new MockConfiguration(), new DatastoreContext());
             final TestActorRef<ShardManager> subject =
-                TestActorRef.create(system, props);
+                    TestActorRef.create(system, props);
 
             Collection<String> knownModules = subject.underlyingActor().getKnownModules();
 
@@ -318,13 +331,13 @@ public class ShardManagerTest {
 
     @Test
     public void testOnUpdateSchemaContextDoNotUpdateKnownModulesIfTheyDoNotContainASuperSetOfKnownModules()
-        throws Exception {
+            throws Exception {
         new JavaTestKit(system) {{
             final Props props = ShardManager
-                .props("config", new MockClusterWrapper(),
-                    new MockConfiguration(), new DatastoreContext());
+                    .props("config", new MockClusterWrapper(),
+                            new MockConfiguration(), new DatastoreContext());
             final TestActorRef<ShardManager> subject =
-                TestActorRef.create(system, props);
+                    TestActorRef.create(system, props);
 
             Collection<String> knownModules = subject.underlyingActor().getKnownModules();
 
@@ -386,7 +399,7 @@ public class ShardManagerTest {
         }
 
         @Override public Future<Void> doAsyncReplayMessages(final String persistenceId, long fromSequenceNr, long toSequenceNr, long max,
-            final Procedure<PersistentRepr> replayCallback) {
+                                                            final Procedure<PersistentRepr> replayCallback) {
             if(journal.size() == 0){
                 return Futures.successful(null);
             }
@@ -395,8 +408,8 @@ public class ShardManagerTest {
                 public Void call() throws Exception {
                     for (Map.Entry<Long, Object> entry : journal.entrySet()) {
                         PersistentRepr persistentMessage =
-                            new PersistentImpl(entry.getValue(), entry.getKey(), persistenceId,
-                                false, null, null);
+                                new PersistentImpl(entry.getValue(), entry.getKey(), persistenceId,
+                                        false, null, null);
                         replayCallback.apply(persistentMessage);
                     }
                     return null;
@@ -409,7 +422,7 @@ public class ShardManagerTest {
         }
 
         @Override public Future<Void> doAsyncWriteMessages(
-            final Iterable<PersistentRepr> persistentReprs) {
+                final Iterable<PersistentRepr> persistentReprs) {
             return Futures.future(new Callable<Void>() {
                 @Override
                 public Void call() throws Exception {
@@ -424,12 +437,12 @@ public class ShardManagerTest {
         }
 
         @Override public Future<Void> doAsyncWriteConfirmations(
-            Iterable<PersistentConfirmation> persistentConfirmations) {
+                Iterable<PersistentConfirmation> persistentConfirmations) {
             return Futures.successful(null);
         }
 
         @Override public Future<Void> doAsyncDeleteMessages(Iterable<PersistentId> persistentIds,
-            boolean b) {
+                                                            boolean b) {
             clear();
             return Futures.successful(null);
         }
index 1cd0f85fa1917057f77144f23009a7edb65c0150..3c9d857fe81db981d341233e2c7249f06273bee6 100644 (file)
@@ -91,12 +91,12 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
                     .successful(((SerializableMessage) responses[i]).toSerializable()));
         }
 
-        stubber.when(actorContext).executeRemoteOperationAsync(any(ActorSelection.class),
+        stubber.when(actorContext).executeOperationAsync(any(ActorSelection.class),
                 isA(requestType));
     }
 
     private void verifyCohortInvocations(int nCohorts, Class<?> requestType) {
-        verify(actorContext, times(nCohorts)).executeRemoteOperationAsync(
+        verify(actorContext, times(nCohorts)).executeOperationAsync(
                 any(ActorSelection.class), isA(requestType));
     }
 
index e5392e025158704f44d152306aaa727b64d460e8..bdcca42d15c45553d6feb8ff6cc9bd7a792795dc 100644 (file)
@@ -1,28 +1,17 @@
 package org.opendaylight.controller.cluster.datastore;
 
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.fail;
 import akka.actor.ActorPath;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.Props;
 import akka.dispatch.Futures;
-
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.CheckedFuture;
-
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentMatcher;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
-
-import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
-import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
-import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
-
 import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
@@ -52,22 +41,29 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCoh
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
+
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.argThat;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.argThat;
 import static org.mockito.Mockito.eq;
-import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.isA;
 import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
+import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
+import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
 
 @SuppressWarnings("resource")
 public class TransactionProxyTest extends AbstractActorTest {
@@ -224,11 +220,17 @@ public class TransactionProxyTest extends AbstractActorTest {
         ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
         doReturn(getSystem().actorSelection(actorRef.path())).
                 when(mockActorContext).actorSelection(actorRef.path().toString());
+
+        doReturn(Optional.of(getSystem().actorSelection(actorRef.path()))).
+                when(mockActorContext).findPrimaryShard(eq(DefaultShardStrategy.DEFAULT_SHARD));
+
         doReturn(createTransactionReply(actorRef)).when(mockActorContext).
-                executeShardOperation(eq(DefaultShardStrategy.DEFAULT_SHARD),
+                executeOperation(eq(getSystem().actorSelection(actorRef.path())),
                         eqCreateTransaction(memberName, type));
+
         doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(
                 anyString(), eq(actorRef.path().toString()));
+
         doReturn(actorRef.path()).when(mockActorContext).actorFor(actorRef.path().toString());
 
         return actorRef;
@@ -252,7 +254,7 @@ public class TransactionProxyTest extends AbstractActorTest {
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 READ_ONLY);
 
-        doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
+        doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqReadData());
 
         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
@@ -262,7 +264,7 @@ public class TransactionProxyTest extends AbstractActorTest {
 
         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-        doReturn(readDataReply(expectedNode)).when(mockActorContext).executeRemoteOperationAsync(
+        doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqReadData());
 
         readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
@@ -277,7 +279,7 @@ public class TransactionProxyTest extends AbstractActorTest {
         setupActorContextWithInitialCreateTransaction(READ_ONLY);
 
         doReturn(Futures.successful(new Object())).when(mockActorContext).
-                executeRemoteOperationAsync(any(ActorSelection.class), any());
+                executeOperationAsync(any(ActorSelection.class), any());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 READ_ONLY);
@@ -290,7 +292,7 @@ public class TransactionProxyTest extends AbstractActorTest {
         setupActorContextWithInitialCreateTransaction(READ_ONLY);
 
         doReturn(Futures.failed(new TestException())).when(mockActorContext).
-                executeRemoteOperationAsync(any(ActorSelection.class), any());
+                executeOperationAsync(any(ActorSelection.class), any());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 READ_ONLY);
@@ -300,12 +302,17 @@ public class TransactionProxyTest extends AbstractActorTest {
 
     private void testExceptionOnInitialCreateTransaction(Exception exToThrow, Invoker invoker)
             throws Throwable {
+        ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
 
-        doThrow(exToThrow).when(mockActorContext).executeShardOperation(
-                anyString(), any());
+        if (exToThrow instanceof PrimaryNotFoundException) {
+            doReturn(Optional.absent()).when(mockActorContext).findPrimaryShard(anyString());
+        } else {
+            doReturn(Optional.of(getSystem().actorSelection(actorRef.path()))).
+                    when(mockActorContext).findPrimaryShard(anyString());
+        }
+        doThrow(exToThrow).when(mockActorContext).executeOperation(any(ActorSelection.class), any());
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_ONLY);
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
 
         propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
     }
@@ -341,13 +348,13 @@ public class TransactionProxyTest extends AbstractActorTest {
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-        doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+        doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
 
         doReturn(Futures.failed(new TestException())).when(mockActorContext).
-                executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqDeleteData());
+                executeOperationAsync(eq(actorSelection(actorRef)), eqDeleteData());
 
-        doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
+        doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqReadData());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
@@ -360,7 +367,7 @@ public class TransactionProxyTest extends AbstractActorTest {
         try {
             propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
         } finally {
-            verify(mockActorContext, times(0)).executeRemoteOperationAsync(
+            verify(mockActorContext, times(0)).executeOperationAsync(
                     eq(actorSelection(actorRef)), eqReadData());
         }
     }
@@ -371,10 +378,10 @@ public class TransactionProxyTest extends AbstractActorTest {
 
         NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-        doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+        doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqWriteData(expectedNode));
 
-        doReturn(readDataReply(expectedNode)).when(mockActorContext).executeRemoteOperationAsync(
+        doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqReadData());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
@@ -406,14 +413,14 @@ public class TransactionProxyTest extends AbstractActorTest {
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 READ_ONLY);
 
-        doReturn(dataExistsReply(false)).when(mockActorContext).executeRemoteOperationAsync(
+        doReturn(dataExistsReply(false)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqDataExists());
 
         Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
 
         assertEquals("Exists response", false, exists);
 
-        doReturn(dataExistsReply(true)).when(mockActorContext).executeRemoteOperationAsync(
+        doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqDataExists());
 
         exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
@@ -436,7 +443,7 @@ public class TransactionProxyTest extends AbstractActorTest {
         setupActorContextWithInitialCreateTransaction(READ_ONLY);
 
         doReturn(Futures.successful(new Object())).when(mockActorContext).
-                executeRemoteOperationAsync(any(ActorSelection.class), any());
+                executeOperationAsync(any(ActorSelection.class), any());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 READ_ONLY);
@@ -449,7 +456,7 @@ public class TransactionProxyTest extends AbstractActorTest {
         setupActorContextWithInitialCreateTransaction(READ_ONLY);
 
         doReturn(Futures.failed(new TestException())).when(mockActorContext).
-                executeRemoteOperationAsync(any(ActorSelection.class), any());
+                executeOperationAsync(any(ActorSelection.class), any());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 READ_ONLY);
@@ -463,13 +470,13 @@ public class TransactionProxyTest extends AbstractActorTest {
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-        doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+        doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
 
         doReturn(Futures.failed(new TestException())).when(mockActorContext).
-                executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqDeleteData());
+                executeOperationAsync(eq(actorSelection(actorRef)), eqDeleteData());
 
-        doReturn(dataExistsReply(false)).when(mockActorContext).executeRemoteOperationAsync(
+        doReturn(dataExistsReply(false)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqDataExists());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
@@ -482,7 +489,7 @@ public class TransactionProxyTest extends AbstractActorTest {
         try {
             propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
         } finally {
-            verify(mockActorContext, times(0)).executeRemoteOperationAsync(
+            verify(mockActorContext, times(0)).executeOperationAsync(
                     eq(actorSelection(actorRef)), eqDataExists());
         }
     }
@@ -493,10 +500,10 @@ public class TransactionProxyTest extends AbstractActorTest {
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-        doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+        doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
 
-        doReturn(dataExistsReply(true)).when(mockActorContext).executeRemoteOperationAsync(
+        doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqDataExists());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
@@ -547,7 +554,7 @@ public class TransactionProxyTest extends AbstractActorTest {
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-        doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+        doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
@@ -555,7 +562,7 @@ public class TransactionProxyTest extends AbstractActorTest {
 
         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
 
-        verify(mockActorContext).executeRemoteOperationAsync(
+        verify(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
 
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
@@ -590,7 +597,7 @@ public class TransactionProxyTest extends AbstractActorTest {
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-        doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+        doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
@@ -598,7 +605,7 @@ public class TransactionProxyTest extends AbstractActorTest {
 
         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
 
-        verify(mockActorContext).executeRemoteOperationAsync(
+        verify(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
 
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
@@ -609,7 +616,7 @@ public class TransactionProxyTest extends AbstractActorTest {
     public void testDelete() throws Exception {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
 
-        doReturn(deleteDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+        doReturn(deleteDataReply()).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqDeleteData());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
@@ -617,7 +624,7 @@ public class TransactionProxyTest extends AbstractActorTest {
 
         transactionProxy.delete(TestModel.TEST_PATH);
 
-        verify(mockActorContext).executeRemoteOperationAsync(
+        verify(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqDeleteData());
 
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
@@ -656,13 +663,13 @@ public class TransactionProxyTest extends AbstractActorTest {
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-        doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
+        doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqReadData());
 
-        doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+        doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
 
-        doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeRemoteOperationAsync(
+        doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
@@ -691,13 +698,13 @@ public class TransactionProxyTest extends AbstractActorTest {
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-        doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+        doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
 
         doReturn(Futures.failed(new TestException())).when(mockActorContext).
-                executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
+                executeOperationAsync(eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
 
-        doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeRemoteOperationAsync(
+        doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
@@ -726,11 +733,11 @@ public class TransactionProxyTest extends AbstractActorTest {
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-        doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+        doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
 
         doReturn(Futures.failed(new TestException())).when(mockActorContext).
-                executeRemoteOperationAsync(eq(actorSelection(actorRef)),
+                executeOperationAsync(eq(actorSelection(actorRef)),
                         isA(ReadyTransaction.SERIALIZABLE_CLASS));
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
@@ -753,8 +760,9 @@ public class TransactionProxyTest extends AbstractActorTest {
     @Test
     public void testReadyWithInitialCreateTransactionFailure() throws Exception {
 
-        doThrow(new PrimaryNotFoundException("mock")).when(mockActorContext).executeShardOperation(
-                anyString(), any());
+        doReturn(Optional.absent()).when(mockActorContext).findPrimaryShard(anyString());
+//        doThrow(new PrimaryNotFoundException("mock")).when(mockActorContext).executeShardOperation(
+//                anyString(), any());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
                 WRITE_ONLY);
@@ -783,11 +791,11 @@ public class TransactionProxyTest extends AbstractActorTest {
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-        doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+        doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
 
         doReturn(Futures.successful(new Object())).when(mockActorContext).
-                executeRemoteOperationAsync(eq(actorSelection(actorRef)),
+                executeOperationAsync(eq(actorSelection(actorRef)),
                         isA(ReadyTransaction.SERIALIZABLE_CLASS));
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
@@ -820,7 +828,7 @@ public class TransactionProxyTest extends AbstractActorTest {
     public void testClose() throws Exception{
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
 
-        doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
+        doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqReadData());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
@@ -830,7 +838,7 @@ public class TransactionProxyTest extends AbstractActorTest {
 
         transactionProxy.close();
 
-        verify(mockActorContext).sendRemoteOperationAsync(
+        verify(mockActorContext).sendOperationAsync(
                 eq(actorSelection(actorRef)), isA(CloseTransaction.SERIALIZABLE_CLASS));
     }
 }
index afcd045434450d4dee2e89a8efb13b7866e77320..0b5e7132c77791cab3e2c4276fe1fb966bb6445e 100644 (file)
@@ -14,5 +14,14 @@ public class ShardIdentifierTest {
         assertEquals("member-1-shard-inventory-config", id.toString());
     }
 
+    @Test
+    public void testFromShardIdString(){
+        String shardIdStr = "member-1-shard-inventory-config";
+
+        ShardIdentifier id = ShardIdentifier.builder().fromShardIdString(shardIdStr).build();
 
+        assertEquals("member-1", id.getMemberName());
+        assertEquals("inventory", id.getShardName());
+        assertEquals("config", id.getType());
+    }
 }
index 5d8fb8393d6c4fd773a0a94b6fd156ac02fe1c14..fa6d0b060f222f28ec723f1b4bc52b2f1ade3aa4 100644 (file)
@@ -1,6 +1,5 @@
 package org.opendaylight.controller.cluster.datastore.utils;
 
-import java.util.concurrent.TimeUnit;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
@@ -8,7 +7,7 @@ import akka.actor.Props;
 import akka.actor.UntypedActor;
 import akka.japi.Creator;
 import akka.testkit.JavaTestKit;
-
+import com.google.common.base.Optional;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
@@ -16,12 +15,14 @@ import org.opendaylight.controller.cluster.datastore.Configuration;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
-
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
+
+import java.util.concurrent.TimeUnit;
+
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 
 public class ActorContextTest extends AbstractActorTest{
@@ -100,63 +101,6 @@ public class ActorContextTest extends AbstractActorTest{
         }
     }
 
-    @Test
-    public void testExecuteLocalShardOperationWithShardFound(){
-        new JavaTestKit(getSystem()) {{
-
-            new Within(duration("1 seconds")) {
-                @Override
-                protected void run() {
-
-                    ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
-
-                    ActorRef shardManagerActorRef = getSystem()
-                        .actorOf(MockShardManager.props(true, shardActorRef));
-
-                    ActorContext actorContext =
-                        new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
-                            mock(Configuration.class));
-
-                    Object out = actorContext.executeLocalShardOperation("default", "hello");
-
-                    assertEquals("hello", out);
-
-
-                    expectNoMsg();
-                }
-            };
-        }};
-
-    }
-
-    @Test
-    public void testExecuteLocalShardOperationWithShardNotFound(){
-        new JavaTestKit(getSystem()) {{
-
-            new Within(duration("1 seconds")) {
-                @Override
-                protected void run() {
-
-                    ActorRef shardManagerActorRef = getSystem()
-                        .actorOf(MockShardManager.props(false, null));
-
-                    ActorContext actorContext =
-                        new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
-                            mock(Configuration.class));
-
-                    Object out = actorContext.executeLocalShardOperation("default", "hello");
-
-                    assertNull(out);
-
-
-                    expectNoMsg();
-                }
-            };
-        }};
-
-    }
-
-
     @Test
     public void testFindLocalShardWithShardFound(){
         new JavaTestKit(getSystem()) {{
@@ -174,9 +118,9 @@ public class ActorContextTest extends AbstractActorTest{
                         new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
                             mock(Configuration.class));
 
-                    Object out = actorContext.findLocalShard("default");
+                    Optional<ActorRef> out = actorContext.findLocalShard("default");
 
-                    assertEquals(shardActorRef, out);
+                    assertEquals(shardActorRef, out.get());
 
 
                     expectNoMsg();
@@ -201,11 +145,8 @@ public class ActorContextTest extends AbstractActorTest{
                         new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
                             mock(Configuration.class));
 
-                    Object out = actorContext.findLocalShard("default");
-
-                    assertNull(out);
-
-
+                    Optional<ActorRef> out = actorContext.findLocalShard("default");
+                    assertTrue(!out.isPresent());
                     expectNoMsg();
                 }
             };
@@ -232,7 +173,7 @@ public class ActorContextTest extends AbstractActorTest{
 
                     ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
 
-                    Object out = actorContext.executeRemoteOperation(actor, "hello");
+                    Object out = actorContext.executeOperation(actor, "hello");
 
                     assertEquals("hello", out);
 
@@ -261,7 +202,7 @@ public class ActorContextTest extends AbstractActorTest{
 
                     ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
 
-                    Future<Object> future = actorContext.executeRemoteOperationAsync(actor, "hello");
+                    Future<Object> future = actorContext.executeOperationAsync(actor, "hello");
 
                     try {
                         Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS));
index 8fa3a17f901541f79be1ec45a373ebe097ba69a1..81b6bccaf08ea0f35b5d4ed8c8a0fea8f3524796 100644 (file)
@@ -12,6 +12,7 @@ import static org.junit.Assert.assertNotNull;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
+import com.google.common.base.Optional;
 
 public class MockActorContext extends ActorContext {
 
@@ -30,19 +31,13 @@ public class MockActorContext extends ActorContext {
         super(actorSystem, shardManager, new MockClusterWrapper(), new MockConfiguration());
     }
 
-
-    @Override public Object executeShardOperation(String shardName,
-        Object message) {
-        return executeShardOperationResponse;
-    }
-
-    @Override public Object executeRemoteOperation(ActorSelection actor,
-        Object message) {
+    @Override public Object executeOperation(ActorSelection actor,
+                                             Object message) {
         return executeRemoteOperationResponse;
     }
 
-    @Override public ActorSelection findPrimary(String shardName) {
-        return null;
+    @Override public Optional<ActorSelection> findPrimaryShard(String shardName) {
+        return Optional.absent();
     }
 
     public void setExecuteShardOperationResponse(Object response){
@@ -74,14 +69,9 @@ public class MockActorContext extends ActorContext {
     }
 
     @Override
-    public Object executeLocalOperation(ActorRef actor,
-        Object message) {
+    public Object executeOperation(ActorRef actor,
+                                   Object message) {
         return this.executeLocalOperationResponse;
     }
 
-    @Override
-    public Object executeLocalShardOperation(String shardName,
-        Object message) {
-        return this.executeLocalShardOperationResponse;
-    }
 }
index 4ddba2f1b9d773b3c4783d3e401fa7df43d32a0e..3bad4689506525bf683254534d44036ff8a61d7e 100644 (file)
@@ -21,7 +21,7 @@ public class TestUtils {
         ActorContext testContext = new ActorContext(actorSystem, actorSystem.actorOf(
             Props.create(DoNothingActor.class)), new MockClusterWrapper(), new MockConfiguration());
         Object messages = testContext
-            .executeLocalOperation(actorRef, "messages");
+            .executeOperation(actorRef, "messages");
 
         Assert.assertNotNull(messages);