From 05193e1f77cb4f1be9a58b946cd11e619c10bb8c Mon Sep 17 00:00:00 2001 From: Abhishek Kumar Date: Sat, 20 Sep 2014 01:19:11 -0700 Subject: [PATCH] Bug 1577: Gates access to Shard actor until its initialized 1. Shard manager creates Shard and mark them un-initialized. Shard completes recovery and onRecoveryComplete, sends a message to Shard manager to mark it initialized. If a request for Shard comes to Shard manager and the shard is not initialized, it sends ActorNotInitialized message. 2. Normalizes and refactors ActorContext. 3. Adds AbstractUntypedPersistentActorWithMetering to meter ShardManager. Change-Id: Ibf15a2ef56422bda53067039d2271a719b6b2ce3 Signed-off-by: Abhishek Kumar --- ...actUntypedPersistentActorWithMetering.java | 24 +++ .../datastore/DistributedDataStore.java | 59 +++--- .../controller/cluster/datastore/Shard.java | 8 +- .../cluster/datastore/ShardManager.java | 82 ++++++-- .../ThreePhaseCommitCohortProxy.java | 2 +- .../cluster/datastore/TransactionProxy.java | 30 +-- .../exceptions/NotInitializedException.java | 14 ++ .../identifiers/ShardIdentifier.java | 38 +++- .../datastore/messages/ActorInitialized.java | 13 ++ .../messages/ActorNotInitialized.java | 13 ++ .../cluster/datastore/utils/ActorContext.java | 176 ++++++------------ .../DataChangeListenerProxyTest.java | 2 +- ...taChangeListenerRegistrationProxyTest.java | 4 +- .../datastore/DistributedDataStoreTest.java | 20 +- .../cluster/datastore/ShardManagerTest.java | 89 +++++---- .../ThreePhaseCommitCohortProxyTest.java | 4 +- .../datastore/TransactionProxyTest.java | 126 +++++++------ .../identifiers/ShardIdentifierTest.java | 9 + .../datastore/utils/ActorContextTest.java | 81 ++------ .../datastore/utils/MockActorContext.java | 24 +-- .../cluster/datastore/utils/TestUtils.java | 2 +- 21 files changed, 440 insertions(+), 380 deletions(-) create mode 100644 opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedPersistentActorWithMetering.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/exceptions/NotInitializedException.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ActorInitialized.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ActorNotInitialized.java diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedPersistentActorWithMetering.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedPersistentActorWithMetering.java new file mode 100644 index 0000000000..365a5bd015 --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractUntypedPersistentActorWithMetering.java @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.common.actor; + +/** + * Actor with its behaviour metered. Metering is enabled by configuration. + */ +public abstract class AbstractUntypedPersistentActorWithMetering extends AbstractUntypedPersistentActor { + + public AbstractUntypedPersistentActorWithMetering() { + if (isMetricsCaptureEnabled()) + getContext().become(new MeteringBehavior(this)); + } + + private boolean isMetricsCaptureEnabled(){ + CommonConfig config = new CommonConfig(getContext().system().settings().config()); + return config.isMetricCaptureEnabled(); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java index c780881a2f..5195a2f918 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java @@ -12,6 +12,7 @@ import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.dispatch.OnComplete; import akka.util.Timeout; +import com.google.common.base.Optional; import com.google.common.base.Preconditions; import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; @@ -76,44 +77,44 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au Preconditions.checkNotNull(path, "path should not be null"); Preconditions.checkNotNull(listener, "listener should not be null"); - if(LOG.isDebugEnabled()) { - LOG.debug("Registering listener: {} for path: {} scope: {}", listener, path, scope); - } - ActorRef dataChangeListenerActor = actorContext.getActorSystem().actorOf( - DataChangeListener.props(listener )); + + LOG.debug("Registering listener: {} for path: {} scope: {}", listener, path, scope); String shardName = ShardStrategyFactory.getStrategy(path).findShard(path); - Future future = actorContext.executeLocalShardOperationAsync(shardName, - new RegisterChangeListener(path, dataChangeListenerActor.path(), scope), - new Timeout(actorContext.getOperationDuration().$times( - REGISTER_DATA_CHANGE_LISTENER_TIMEOUT_FACTOR))); + Optional shard = actorContext.findLocalShard(shardName); - if (future != null) { - final DataChangeListenerRegistrationProxy listenerRegistrationProxy = + //if shard is NOT local + if (!shard.isPresent()) { + LOG.debug("No local shard for shardName {} was found so returning a noop registration", shardName); + return new NoOpDataChangeListenerRegistration(listener); + } + //if shard is local + ActorRef dataChangeListenerActor = actorContext.getActorSystem().actorOf(DataChangeListener.props(listener)); + Future future = actorContext.executeOperationAsync(shard.get(), + new RegisterChangeListener(path, dataChangeListenerActor.path(), scope), + new Timeout(actorContext.getOperationDuration().$times(REGISTER_DATA_CHANGE_LISTENER_TIMEOUT_FACTOR))); + + final DataChangeListenerRegistrationProxy listenerRegistrationProxy = new DataChangeListenerRegistrationProxy(listener, dataChangeListenerActor); - future.onComplete(new OnComplete(){ + future.onComplete(new OnComplete() { - @Override public void onComplete(Throwable failure, Object result) + @Override + public void onComplete(Throwable failure, Object result) throws Throwable { - if(failure != null){ - LOG.error("Failed to register listener at path " + path.toString(), failure); - return; - } - RegisterChangeListenerReply reply = (RegisterChangeListenerReply) result; - listenerRegistrationProxy.setListenerRegistrationActor(actorContext - .actorSelection(reply.getListenerRegistrationPath())); + if (failure != null) { + LOG.error("Failed to register listener at path " + path.toString(), failure); + return; } - }, actorContext.getActorSystem().dispatcher()); - return listenerRegistrationProxy; - } - if(LOG.isDebugEnabled()) { - LOG.debug( - "No local shard for shardName {} was found so returning a noop registration", - shardName); - } - return new NoOpDataChangeListenerRegistration(listener); + RegisterChangeListenerReply reply = (RegisterChangeListenerReply) result; + listenerRegistrationProxy.setListenerRegistrationActor(actorContext + .actorSelection(reply.getListenerRegistrationPath())); + } + }, actorContext.getActorSystem().dispatcher()); + + return listenerRegistrationProxy; + } @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index a3109b66b1..3934489646 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -33,6 +33,7 @@ import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; +import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain; import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; @@ -222,7 +223,9 @@ public class Shard extends RaftActor { getLeader().forward(message, getContext()); } else { getSender().tell(new akka.actor.Status.Failure(new IllegalStateException( - "Could not find leader so transaction cannot be created")), getSelf()); + "Could not find shard leader so transaction cannot be created. This typically happens" + + " when system is coming up or recovering and a leader is being elected. Try again" + + " later.")), getSelf()); } } else if (message instanceof PeerAddressResolved) { PeerAddressResolved resolved = (PeerAddressResolved) message; @@ -522,6 +525,9 @@ public class Shard extends RaftActor { recoveryCoordinator = null; currentLogRecoveryBatch = null; updateJournalStats(); + + //notify shard manager + getContext().parent().tell(new ActorInitialized(), getSelf()); } @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index a8a1823809..e68628dbf5 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -24,11 +24,13 @@ import akka.persistence.RecoveryCompleted; import akka.persistence.RecoveryFailure; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor; +import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean; +import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; +import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; @@ -59,7 +61,7 @@ import java.util.Set; *
  • Monitor the cluster members and store their addresses *
      */ -public class ShardManager extends AbstractUntypedPersistentActor { +public class ShardManager extends AbstractUntypedPersistentActorWithMetering { protected final LoggingAdapter LOG = Logging.getLogger(getContext().system(), this); @@ -127,6 +129,8 @@ public class ShardManager extends AbstractUntypedPersistentActor { findLocalShard((FindLocalShard) message); } else if (message instanceof UpdateSchemaContext) { updateSchemaContext(message); + } else if(message instanceof ActorInitialized) { + onActorInitialized(message); } else if (message instanceof ClusterEvent.MemberUp){ memberUp((ClusterEvent.MemberUp) message); } else if(message instanceof ClusterEvent.MemberRemoved) { @@ -139,6 +143,31 @@ public class ShardManager extends AbstractUntypedPersistentActor { } + private void onActorInitialized(Object message) { + final ActorRef sender = getSender(); + + if (sender == null) { + return; //why is a non-actor sending this message? Just ignore. + } + + String actorName = sender.path().name(); + //find shard name from actor name; actor name is stringified shardId + ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(actorName).build(); + + if (shardId.getShardName() == null) { + return; + } + markShardAsInitialized(shardId.getShardName()); + } + + @VisibleForTesting protected void markShardAsInitialized(String shardName) { + LOG.debug("Initializing shard [{}]", shardName); + ShardInformation shardInformation = localShards.get(shardName); + if (shardInformation != null) { + shardInformation.setShardInitialized(true); + } + } + @Override protected void handleRecover(Object message) throws Exception { if(message instanceof SchemaContextModules){ @@ -157,16 +186,23 @@ public class ShardManager extends AbstractUntypedPersistentActor { } private void findLocalShard(FindLocalShard message) { - ShardInformation shardInformation = - localShards.get(message.getShardName()); + ShardInformation shardInformation = localShards.get(message.getShardName()); - if(shardInformation != null){ - getSender().tell(new LocalShardFound(shardInformation.getActor()), getSelf()); + if(shardInformation == null){ + getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf()); return; } - getSender().tell(new LocalShardNotFound(message.getShardName()), - getSelf()); + sendResponse(shardInformation, new LocalShardFound(shardInformation.getActor())); + } + + private void sendResponse(ShardInformation shardInformation, Object message) { + if (!shardInformation.isShardInitialized()) { + getSender().tell(new ActorNotInitialized(), getSelf()); + return; + } + + getSender().tell(message, getSelf()); } private void memberRemoved(ClusterEvent.MemberRemoved message) { @@ -176,7 +212,7 @@ public class ShardManager extends AbstractUntypedPersistentActor { private void memberUp(ClusterEvent.MemberUp message) { String memberName = message.member().roles().head(); - memberNameToAddress.put(memberName , message.member().address()); + memberNameToAddress.put(memberName, message.member().address()); for(ShardInformation info : localShards.values()){ String shardName = info.getShardName(); @@ -229,28 +265,27 @@ public class ShardManager extends AbstractUntypedPersistentActor { } private void findPrimary(FindPrimary message) { + final ActorRef sender = getSender(); String shardName = message.getShardName(); // First see if the there is a local replica for the shard ShardInformation info = localShards.get(shardName); - if(info != null) { + if (info != null) { ActorPath shardPath = info.getActorPath(); - if (shardPath != null) { - getSender() - .tell( - new PrimaryFound(shardPath.toString()).toSerializable(), - getSelf()); - return; - } + sendResponse(info, new PrimaryFound(shardPath.toString()).toSerializable()); + return; } - List members = - configuration.getMembersFromShardName(shardName); + List members = configuration.getMembersFromShardName(shardName); if(cluster.getCurrentMemberName() != null) { members.remove(cluster.getCurrentMemberName()); } + /** + * FIXME: Instead of sending remote shard actor path back to sender, + * forward FindPrimary message to remote shard manager + */ // There is no way for us to figure out the primary (for now) so assume // that one of the remote nodes is a primary for(String memberName : members) { @@ -376,6 +411,7 @@ public class ShardManager extends AbstractUntypedPersistentActor { private final ActorRef actor; private final ActorPath actorPath; private final Map peerAddresses; + private boolean shardInitialized = false; //flag that determines if the actor is ready for business private ShardInformation(String shardName, ActorRef actor, Map peerAddresses) { @@ -413,6 +449,14 @@ public class ShardManager extends AbstractUntypedPersistentActor { } } + + public boolean isShardInitialized() { + return shardInitialized; + } + + public void setShardInitialized(boolean shardInitialized) { + this.shardInitialized = shardInitialized; + } } private static class ShardManagerCreator implements Creator { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java index a7a5b31b17..515be372e8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java @@ -157,7 +157,7 @@ public class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCoho } ActorSelection cohort = actorContext.actorSelection(actorPath); - futureList.add(actorContext.executeRemoteOperationAsync(cohort, message)); + futureList.add(actorContext.executeOperationAsync(cohort, message)); } return Futures.sequence(futureList, actorContext.getActorSystem().dispatcher()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index 6cf16b4426..19d9a66a52 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -22,6 +22,7 @@ import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.SettableFuture; +import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; @@ -156,7 +157,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { if(remoteTransactionActorsMB.get()) { for(ActorSelection actor : remoteTransactionActors) { LOG.trace("Sending CloseTransaction to {}", actor); - actorContext.sendRemoteOperationAsync(actor, + actorContext.sendOperationAsync(actor, new CloseTransaction().toSerializable()); } } @@ -379,9 +380,14 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { } try { - Object response = actorContext.executeShardOperation(shardName, - new CreateTransaction(identifier.toString(), this.transactionType.ordinal(), - getTransactionChainId()).toSerializable()); + Optional primaryShard = actorContext.findPrimaryShard(shardName); + if (!primaryShard.isPresent()) { + throw new PrimaryNotFoundException("Primary could not be found for shard " + shardName); + } + + Object response = actorContext.executeOperation(primaryShard.get(), + new CreateTransaction(identifier.toString(), this.transactionType.ordinal(), + getTransactionChainId()).toSerializable()); if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) { CreateTransactionReply reply = CreateTransactionReply.fromSerializable(response); @@ -502,7 +508,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { if(LOG.isDebugEnabled()) { LOG.debug("Tx {} closeTransaction called", identifier); } - actorContext.sendRemoteOperationAsync(getActor(), new CloseTransaction().toSerializable()); + actorContext.sendOperationAsync(getActor(), new CloseTransaction().toSerializable()); } @Override @@ -513,7 +519,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { } // Send the ReadyTransaction message to the Tx actor. - final Future replyFuture = actorContext.executeRemoteOperationAsync(getActor(), + final Future replyFuture = actorContext.executeOperationAsync(getActor(), new ReadyTransaction().toSerializable()); // Combine all the previously recorded put/merge/delete operation reply Futures and the @@ -576,8 +582,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { if(LOG.isDebugEnabled()) { LOG.debug("Tx {} deleteData called path = {}", identifier, path); } - recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(), - new DeleteData(path).toSerializable() )); + recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(), + new DeleteData(path).toSerializable())); } @Override @@ -585,7 +591,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { if(LOG.isDebugEnabled()) { LOG.debug("Tx {} mergeData called path = {}", identifier, path); } - recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(), + recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(), new MergeData(path, data, schemaContext).toSerializable())); } @@ -594,7 +600,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { if(LOG.isDebugEnabled()) { LOG.debug("Tx {} writeData called path = {}", identifier, path); } - recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(), + recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(), new WriteData(path, data, schemaContext).toSerializable())); } @@ -686,7 +692,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { } }; - Future readFuture = actorContext.executeRemoteOperationAsync(getActor(), + Future readFuture = actorContext.executeOperationAsync(getActor(), new ReadData(path).toSerializable()); readFuture.onComplete(onComplete, actorContext.getActorSystem().dispatcher()); } @@ -773,7 +779,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { } }; - Future future = actorContext.executeRemoteOperationAsync(getActor(), + Future future = actorContext.executeOperationAsync(getActor(), new DataExists(path).toSerializable()); future.onComplete(onComplete, actorContext.getActorSystem().dispatcher()); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/exceptions/NotInitializedException.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/exceptions/NotInitializedException.java new file mode 100644 index 0000000000..302d684322 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/exceptions/NotInitializedException.java @@ -0,0 +1,14 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.exceptions; + +public class NotInitializedException extends RuntimeException { + public NotInitializedException(String message) { + super(message); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/identifiers/ShardIdentifier.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/identifiers/ShardIdentifier.java index c692881593..d65af61ba3 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/identifiers/ShardIdentifier.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/identifiers/ShardIdentifier.java @@ -10,11 +10,17 @@ package org.opendaylight.controller.cluster.datastore.identifiers; import com.google.common.base.Preconditions; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + public class ShardIdentifier { private final String shardName; private final String memberName; private final String type; + //format and pattern should be in sync + private final String format = "%s-shard-%s-%s"; + private static final Pattern pattern = Pattern.compile("(\\S+)-shard-(\\S+)-(\\S+)"); public ShardIdentifier(String shardName, String memberName, String type) { @@ -60,15 +66,31 @@ public class ShardIdentifier { } @Override public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append(memberName).append("-shard-").append(shardName).append("-").append(type); - return builder.toString(); + //ensure the output of toString matches the pattern above + return new StringBuilder(memberName) + .append("-shard-") + .append(shardName) + .append("-") + .append(type) + .toString(); } public static Builder builder(){ return new Builder(); } + public String getShardName() { + return shardName; + } + + public String getMemberName() { + return memberName; + } + + public String getType() { + return type; + } + public static class Builder { private String shardName; private String memberName; @@ -93,5 +115,15 @@ public class ShardIdentifier { return this; } + public Builder fromShardIdString(String shardId){ + Matcher matcher = pattern.matcher(shardId); + + if (matcher.matches()) { + memberName = matcher.group(1); + shardName = matcher.group(2); + type = matcher.group(3); + } + return this; + } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ActorInitialized.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ActorInitialized.java new file mode 100644 index 0000000000..b034f87d1c --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ActorInitialized.java @@ -0,0 +1,13 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.messages; + +import java.io.Serializable; + +public class ActorInitialized implements Serializable { +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ActorNotInitialized.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ActorNotInitialized.java new file mode 100644 index 0000000000..de25ef9ecc --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ActorNotInitialized.java @@ -0,0 +1,13 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.messages; + +import java.io.Serializable; + +public class ActorNotInitialized implements Serializable { +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java index 8ba333d279..44f4ef77d7 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java @@ -13,12 +13,14 @@ import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.ActorSystem; import akka.actor.PoisonPill; -import akka.pattern.Patterns; import akka.util.Timeout; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; import org.opendaylight.controller.cluster.datastore.Configuration; -import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; +import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException; +import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; @@ -101,14 +103,17 @@ public class ActorContext { } /** - * Finds the primary for a given shard + * Finds the primary shard for the given shard name * * @param shardName * @return */ - public ActorSelection findPrimary(String shardName) { - String path = findPrimaryPath(shardName); - return actorSystem.actorSelection(path); + public Optional findPrimaryShard(String shardName) { + String path = findPrimaryPathOrNull(shardName); + if (path == null){ + return Optional.absent(); + } + return Optional.of(actorSystem.actorSelection(path)); } /** @@ -118,36 +123,36 @@ public class ActorContext { * @return a reference to a local shard actor which represents the shard * specified by the shardName */ - public ActorRef findLocalShard(String shardName) { - Object result = executeLocalOperation(shardManager, - new FindLocalShard(shardName)); + public Optional findLocalShard(String shardName) { + Object result = executeOperation(shardManager, new FindLocalShard(shardName)); if (result instanceof LocalShardFound) { LocalShardFound found = (LocalShardFound) result; - - if(LOG.isDebugEnabled()) { - LOG.debug("Local shard found {}", found.getPath()); - } - return found.getPath(); + LOG.debug("Local shard found {}", found.getPath()); + return Optional.of(found.getPath()); } - return null; + return Optional.absent(); } - public String findPrimaryPath(String shardName) { - Object result = executeLocalOperation(shardManager, - new FindPrimary(shardName).toSerializable()); + private String findPrimaryPathOrNull(String shardName) { + Object result = executeOperation(shardManager, new FindPrimary(shardName).toSerializable()); if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) { PrimaryFound found = PrimaryFound.fromSerializable(result); - if(LOG.isDebugEnabled()) { - LOG.debug("Primary found {}", found.getPrimaryPath()); - } + LOG.debug("Primary found {}", found.getPrimaryPath()); return found.getPrimaryPath(); + + } else if (result.getClass().equals(ActorNotInitialized.class)){ + throw new NotInitializedException( + String.format("Found primary shard[%s] but its not initialized yet. Please try again later", shardName) + ); + + } else { + return null; } - throw new PrimaryNotFoundException("Could not find primary for shardName " + shardName); } @@ -158,16 +163,25 @@ public class ActorContext { * @param message * @return The response of the operation */ - public Object executeLocalOperation(ActorRef actor, Object message) { - Future future = ask(actor, message, operationTimeout); + public Object executeOperation(ActorRef actor, Object message) { + Future future = executeOperationAsync(actor, message, operationTimeout); try { return Await.result(future, operationDuration); } catch (Exception e) { - throw new TimeoutException("Sending message " + message.getClass().toString() + " to actor " + actor.toString() + " failed" , e); + throw new TimeoutException("Sending message " + message.getClass().toString() + + " to actor " + actor.toString() + " failed. Try again later.", e); } } + public Future executeOperationAsync(ActorRef actor, Object message, Timeout timeout) { + Preconditions.checkArgument(actor != null, "actor must not be null"); + Preconditions.checkArgument(message != null, "message must not be null"); + + LOG.debug("Sending message {} to {}", message.getClass().toString(), actor.toString()); + return ask(actor, message, timeout); + } + /** * Execute an operation on a remote actor and wait for it's response * @@ -175,19 +189,14 @@ public class ActorContext { * @param message * @return */ - public Object executeRemoteOperation(ActorSelection actor, Object message) { - - if(LOG.isDebugEnabled()) { - LOG.debug("Sending remote message {} to {}", message.getClass().toString(), - actor.toString()); - } - Future future = ask(actor, message, operationTimeout); + public Object executeOperation(ActorSelection actor, Object message) { + Future future = executeOperationAsync(actor, message); try { return Await.result(future, operationDuration); } catch (Exception e) { throw new TimeoutException("Sending message " + message.getClass().toString() + - " to actor " + actor.toString() + " failed" , e); + " to actor " + actor.toString() + " failed. Try again later.", e); } } @@ -198,11 +207,12 @@ public class ActorContext { * @param message the message to send * @return a Future containing the eventual result */ - public Future executeRemoteOperationAsync(ActorSelection actor, Object message) { + public Future executeOperationAsync(ActorSelection actor, Object message) { + Preconditions.checkArgument(actor != null, "actor must not be null"); + Preconditions.checkArgument(message != null, "message must not be null"); + + LOG.debug("Sending message {} to {}", message.getClass().toString(), actor.toString()); - if(LOG.isDebugEnabled()) { - LOG.debug("Sending remote message {} to {}", message.getClass().toString(), actor.toString()); - } return ask(actor, message, operationTimeout); } @@ -213,86 +223,15 @@ public class ActorContext { * @param actor the ActorSelection * @param message the message to send */ - public void sendRemoteOperationAsync(ActorSelection actor, Object message) { - actor.tell(message, ActorRef.noSender()); - } - - public void sendShardOperationAsync(String shardName, Object message) { - ActorSelection primary = findPrimary(shardName); - - primary.tell(message, ActorRef.noSender()); - } + public void sendOperationAsync(ActorSelection actor, Object message) { + Preconditions.checkArgument(actor != null, "actor must not be null"); + Preconditions.checkArgument(message != null, "message must not be null"); + LOG.debug("Sending message {} to {}", message.getClass().toString(), actor.toString()); - /** - * Execute an operation on the primary for a given shard - *

      - * This method first finds the primary for a given shard ,then sends - * the message to the remote shard and waits for a response - *

      - * - * @param shardName - * @param message - * @return - * @throws org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException if the message to the remote shard times out - * @throws org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException if the primary shard is not found - */ - public Object executeShardOperation(String shardName, Object message) { - ActorSelection primary = findPrimary(shardName); - - return executeRemoteOperation(primary, message); - } - - /** - * Execute an operation on the the local shard only - *

      - * This method first finds the address of the local shard if any. It then - * executes the operation on it. - *

      - * - * @param shardName the name of the shard on which the operation needs to be executed - * @param message the message that needs to be sent to the shard - * @return the message that was returned by the local actor on which the - * the operation was executed. If a local shard was not found then - * null is returned - * @throws org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException - * if the operation does not complete in a specified time duration - */ - public Object executeLocalShardOperation(String shardName, Object message) { - ActorRef local = findLocalShard(shardName); - - if(local != null) { - return executeLocalOperation(local, message); - } - - return null; - } - - - /** - * Execute an operation on the the local shard only asynchronously - * - *

      - * This method first finds the address of the local shard if any. It then - * executes the operation on it. - *

      - * - * @param shardName the name of the shard on which the operation needs to be executed - * @param message the message that needs to be sent to the shard - * @param timeout the amount of time that this method should wait for a response before timing out - * @return null if the shard could not be located else a future on which the caller can wait - * - */ - public Future executeLocalShardOperationAsync(String shardName, Object message, Timeout timeout) { - ActorRef local = findLocalShard(shardName); - if(local == null){ - return null; - } - return Patterns.ask(local, message, timeout); + actor.tell(message, ActorRef.noSender()); } - - public void shutdown() { shardManager.tell(PoisonPill.getInstance(), null); actorSystem.shutdown(); @@ -337,10 +276,13 @@ public class ActorContext { */ public void broadcast(Object message){ for(String shardName : configuration.getAllShardNames()){ - try { - sendShardOperationAsync(shardName, message); - } catch(Exception e){ - LOG.warn("broadcast failed to send message " + message.getClass().getSimpleName() + " to shard " + shardName, e); + + Optional primary = findPrimaryShard(shardName); + if (primary.isPresent()) { + primary.get().tell(message, ActorRef.noSender()); + } else { + LOG.warn("broadcast failed to send message {} to shard {}. Primary not found", + message.getClass().getSimpleName(), shardName); } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerProxyTest.java index 2ed11cfbda..c79d762035 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerProxyTest.java @@ -82,7 +82,7 @@ public class DataChangeListenerProxyTest extends AbstractActorTest { ActorContext testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)), new MockClusterWrapper(), new MockConfiguration()); Object messages = testContext - .executeLocalOperation(actorRef, "messages"); + .executeOperation(actorRef, "messages"); Assert.assertNotNull(messages); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java index ab3ff795d3..aaf080bdf7 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java @@ -66,7 +66,7 @@ public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest{ ActorContext testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)),new MockClusterWrapper(), new MockConfiguration()); Object messages = testContext - .executeLocalOperation(actorRef, "messages"); + .executeOperation(actorRef, "messages"); assertNotNull(messages); @@ -95,7 +95,7 @@ public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest{ ActorContext testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)),new MockClusterWrapper(), new MockConfiguration()); Object messages = testContext - .executeLocalOperation(actorRef, "messages"); + .executeOperation(actorRef, "messages"); assertNotNull(messages); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java index 08c3ea9602..d57a5eea4a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java @@ -8,6 +8,7 @@ import akka.actor.Props; import akka.dispatch.ExecutionContexts; import akka.dispatch.Futures; import akka.util.Timeout; +import com.google.common.base.Optional; import com.google.common.util.concurrent.MoreExecutors; import org.junit.After; import org.junit.Before; @@ -97,11 +98,11 @@ public class DistributedDataStoreTest extends AbstractActorTest{ ListenerRegistration registration = distributedDataStore.registerChangeListener(TestModel.TEST_PATH, new AsyncDataChangeListener>() { - @Override - public void onDataChanged(AsyncDataChangeEvent> change) { - throw new UnsupportedOperationException("onDataChanged"); - } - }, AsyncDataBroker.DataChangeScope.BASE); + @Override + public void onDataChanged(AsyncDataChangeEvent> change) { + throw new UnsupportedOperationException("onDataChanged"); + } + }, AsyncDataBroker.DataChangeScope.BASE); // Since we do not expect the shard to be local registration will return a NoOpRegistration assertTrue(registration instanceof NoOpDataChangeListenerRegistration); @@ -119,8 +120,9 @@ public class DistributedDataStoreTest extends AbstractActorTest{ Future future = mock(Future.class); when(actorContext.getOperationDuration()).thenReturn(FiniteDuration.apply(5, TimeUnit.SECONDS)); when(actorContext.getActorSystem()).thenReturn(getSystem()); + when(actorContext.findLocalShard(anyString())).thenReturn(Optional.of(doNothingActorRef)); when(actorContext - .executeLocalShardOperationAsync(anyString(), anyObject(), any(Timeout.class))).thenReturn(future); + .executeOperationAsync(eq(doNothingActorRef), anyObject(), any(Timeout.class))).thenReturn(future); ListenerRegistration registration = distributedDataStore.registerChangeListener(TestModel.TEST_PATH, @@ -153,8 +155,9 @@ public class DistributedDataStoreTest extends AbstractActorTest{ when(actorSystem.dispatcher()).thenReturn(executor); when(actorSystem.actorOf(any(Props.class))).thenReturn(doNothingActorRef); when(actorContext.getActorSystem()).thenReturn(actorSystem); + when(actorContext.findLocalShard(anyString())).thenReturn(Optional.of(doNothingActorRef)); when(actorContext - .executeLocalShardOperationAsync(anyString(), anyObject(), any(Timeout.class))).thenReturn(f); + .executeOperationAsync(eq(doNothingActorRef), anyObject(), any(Timeout.class))).thenReturn(f); when(actorContext.actorSelection(any(ActorPath.class))).thenReturn(actorSelection); ListenerRegistration registration = @@ -195,8 +198,9 @@ public class DistributedDataStoreTest extends AbstractActorTest{ when(actorSystem.dispatcher()).thenReturn(executor); when(actorSystem.actorOf(any(Props.class))).thenReturn(doNothingActorRef); when(actorContext.getActorSystem()).thenReturn(actorSystem); + when(actorContext.findLocalShard(anyString())).thenReturn(Optional.of(doNothingActorRef)); when(actorContext - .executeLocalShardOperationAsync(anyString(), anyObject(), any(Timeout.class))).thenReturn(f); + .executeOperationAsync(eq(doNothingActorRef), anyObject(), any(Timeout.class))).thenReturn(f); when(actorContext.actorSelection(any(ActorPath.class))).thenReturn(actorSelection); ListenerRegistration registration = diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java index 8a3cdd0c8a..ed7b6866bf 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java @@ -22,6 +22,8 @@ import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; +import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; @@ -29,6 +31,7 @@ import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; +import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor; import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper; import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; @@ -53,6 +56,8 @@ import static org.mockito.Mockito.when; public class ShardManagerTest { private static ActorSystem system; + Configuration mockConfig = new MockConfiguration(); + private static ActorRef defaultShardMockActor; @BeforeClass public static void setUpClass() { @@ -60,13 +65,18 @@ public class ShardManagerTest { myJournal.put("class", "org.opendaylight.controller.cluster.datastore.ShardManagerTest$MyJournal"); myJournal.put("plugin-dispatcher", "akka.actor.default-dispatcher"); Config config = ConfigFactory.load() - .withValue("akka.persistence.journal.plugin", - ConfigValueFactory.fromAnyRef("my-journal")) - .withValue("my-journal", ConfigValueFactory.fromMap(myJournal)); + .withValue("akka.persistence.journal.plugin", + ConfigValueFactory.fromAnyRef("my-journal")) + .withValue("my-journal", ConfigValueFactory.fromMap(myJournal)); MyJournal.clear(); system = ActorSystem.create("test", config); + + String name = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1","config").toString(); + defaultShardMockActor = system.actorOf(Props.create(DoNothingActor.class), name); + + } @AfterClass @@ -86,15 +96,15 @@ public class ShardManagerTest { new JavaTestKit(system) { { final Props props = ShardManager - .props("config", new MockClusterWrapper(), - new MockConfiguration(), new DatastoreContext()); + .props("config", new MockClusterWrapper(), + new MockConfiguration(), new DatastoreContext()); final ActorRef subject = getSystem().actorOf(props); subject.tell(new FindPrimary("inventory").toSerializable(), getRef()); expectMsgEquals(duration("2 seconds"), - new PrimaryNotFound("inventory").toSerializable()); + new PrimaryNotFound("inventory").toSerializable()); }}; } @@ -103,17 +113,19 @@ public class ShardManagerTest { new JavaTestKit(system) {{ final Props props = ShardManager - .props("config", new MockClusterWrapper(), - new MockConfiguration(), new DatastoreContext()); + .props("config", new MockClusterWrapper(), + new MockConfiguration(), new DatastoreContext()); final ActorRef subject = getSystem().actorOf(props); subject.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + subject.tell(new ActorInitialized(), defaultShardMockActor); subject.tell(new FindPrimary(Shard.DEFAULT_NAME).toSerializable(), getRef()); expectMsgClass(duration("1 seconds"), PrimaryFound.SERIALIZABLE_CLASS); - }}; + } + }; } @Test @@ -121,8 +133,8 @@ public class ShardManagerTest { new JavaTestKit(system) {{ final Props props = ShardManager - .props("config", new MockClusterWrapper(), - new MockConfiguration(), new DatastoreContext()); + .props("config", new MockClusterWrapper(), + new MockConfiguration(), new DatastoreContext()); final ActorRef subject = getSystem().actorOf(props); @@ -150,12 +162,13 @@ public class ShardManagerTest { new JavaTestKit(system) {{ final Props props = ShardManager - .props("config", mockClusterWrapper, - new MockConfiguration(), new DatastoreContext()); + .props("config", mockClusterWrapper, + new MockConfiguration(), new DatastoreContext()); final ActorRef subject = getSystem().actorOf(props); subject.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); + subject.tell(new ActorInitialized(), defaultShardMockActor); subject.tell(new FindLocalShard(Shard.DEFAULT_NAME), getRef()); @@ -171,7 +184,7 @@ public class ShardManagerTest { }.get(); // this extracts the received message assertTrue(out.path().toString(), - out.path().toString().contains("member-1-shard-default-config")); + out.path().toString().contains("member-1-shard-default-config")); }}; } @@ -180,8 +193,8 @@ public class ShardManagerTest { new JavaTestKit(system) {{ final Props props = ShardManager - .props("config", new MockClusterWrapper(), - new MockConfiguration(), new DatastoreContext()); + .props("config", new MockClusterWrapper(), + new MockConfiguration(), new DatastoreContext()); final ActorRef subject = getSystem().actorOf(props); @@ -211,8 +224,8 @@ public class ShardManagerTest { new JavaTestKit(system) {{ final Props props = ShardManager - .props("config", new MockClusterWrapper(), - new MockConfiguration(), new DatastoreContext()); + .props("config", new MockClusterWrapper(), + new MockConfiguration(), new DatastoreContext()); final ActorRef subject = getSystem().actorOf(props); @@ -233,14 +246,14 @@ public class ShardManagerTest { @Test public void testOnRecoveryJournalIsEmptied(){ MyJournal.addToJournal(1L, new ShardManager.SchemaContextModules( - ImmutableSet.of("foo"))); + ImmutableSet.of("foo"))); assertEquals(1, MyJournal.get().size()); new JavaTestKit(system) {{ final Props props = ShardManager - .props("config", new MockClusterWrapper(), - new MockConfiguration(), new DatastoreContext()); + .props("config", new MockClusterWrapper(), + new MockConfiguration(), new DatastoreContext()); final ActorRef subject = getSystem().actorOf(props); @@ -257,10 +270,10 @@ public class ShardManagerTest { public void testOnRecoveryPreviouslyKnownModulesAreDiscovered() throws Exception { new JavaTestKit(system) {{ final Props props = ShardManager - .props("config", new MockClusterWrapper(), - new MockConfiguration(), new DatastoreContext()); + .props("config", new MockClusterWrapper(), + new MockConfiguration(), new DatastoreContext()); final TestActorRef subject = - TestActorRef.create(system, props); + TestActorRef.create(system, props); subject.underlyingActor().onReceiveRecover(new ShardManager.SchemaContextModules(ImmutableSet.of("foo"))); @@ -272,13 +285,13 @@ public class ShardManagerTest { @Test public void testOnUpdateSchemaContextUpdateKnownModulesIfTheyContainASuperSetOfTheKnownModules() - throws Exception { + throws Exception { new JavaTestKit(system) {{ final Props props = ShardManager - .props("config", new MockClusterWrapper(), - new MockConfiguration(), new DatastoreContext()); + .props("config", new MockClusterWrapper(), + new MockConfiguration(), new DatastoreContext()); final TestActorRef subject = - TestActorRef.create(system, props); + TestActorRef.create(system, props); Collection knownModules = subject.underlyingActor().getKnownModules(); @@ -318,13 +331,13 @@ public class ShardManagerTest { @Test public void testOnUpdateSchemaContextDoNotUpdateKnownModulesIfTheyDoNotContainASuperSetOfKnownModules() - throws Exception { + throws Exception { new JavaTestKit(system) {{ final Props props = ShardManager - .props("config", new MockClusterWrapper(), - new MockConfiguration(), new DatastoreContext()); + .props("config", new MockClusterWrapper(), + new MockConfiguration(), new DatastoreContext()); final TestActorRef subject = - TestActorRef.create(system, props); + TestActorRef.create(system, props); Collection knownModules = subject.underlyingActor().getKnownModules(); @@ -386,7 +399,7 @@ public class ShardManagerTest { } @Override public Future doAsyncReplayMessages(final String persistenceId, long fromSequenceNr, long toSequenceNr, long max, - final Procedure replayCallback) { + final Procedure replayCallback) { if(journal.size() == 0){ return Futures.successful(null); } @@ -395,8 +408,8 @@ public class ShardManagerTest { public Void call() throws Exception { for (Map.Entry entry : journal.entrySet()) { PersistentRepr persistentMessage = - new PersistentImpl(entry.getValue(), entry.getKey(), persistenceId, - false, null, null); + new PersistentImpl(entry.getValue(), entry.getKey(), persistenceId, + false, null, null); replayCallback.apply(persistentMessage); } return null; @@ -409,7 +422,7 @@ public class ShardManagerTest { } @Override public Future doAsyncWriteMessages( - final Iterable persistentReprs) { + final Iterable persistentReprs) { return Futures.future(new Callable() { @Override public Void call() throws Exception { @@ -424,12 +437,12 @@ public class ShardManagerTest { } @Override public Future doAsyncWriteConfirmations( - Iterable persistentConfirmations) { + Iterable persistentConfirmations) { return Futures.successful(null); } @Override public Future doAsyncDeleteMessages(Iterable persistentIds, - boolean b) { + boolean b) { clear(); return Futures.successful(null); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java index 1cd0f85fa1..3c9d857fe8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java @@ -91,12 +91,12 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest { .successful(((SerializableMessage) responses[i]).toSerializable())); } - stubber.when(actorContext).executeRemoteOperationAsync(any(ActorSelection.class), + stubber.when(actorContext).executeOperationAsync(any(ActorSelection.class), isA(requestType)); } private void verifyCohortInvocations(int nCohorts, Class requestType) { - verify(actorContext, times(nCohorts)).executeRemoteOperationAsync( + verify(actorContext, times(nCohorts)).executeOperationAsync( any(ActorSelection.class), isA(requestType)); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java index e5392e0251..bdcca42d15 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java @@ -1,28 +1,17 @@ package org.opendaylight.controller.cluster.datastore; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.fail; import akka.actor.ActorPath; import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.Props; import akka.dispatch.Futures; - import com.google.common.base.Optional; import com.google.common.util.concurrent.CheckedFuture; - import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentMatcher; import org.mockito.Mock; import org.mockito.MockitoAnnotations; - -import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY; -import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY; -import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE; - import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType; import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException; @@ -52,22 +41,29 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCoh import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.model.api.SchemaContext; - import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; + import java.util.List; import java.util.concurrent.TimeUnit; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.argThat; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.argThat; import static org.mockito.Mockito.eq; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.isA; import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY; +import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE; +import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY; @SuppressWarnings("resource") public class TransactionProxyTest extends AbstractActorTest { @@ -224,11 +220,17 @@ public class TransactionProxyTest extends AbstractActorTest { ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class)); doReturn(getSystem().actorSelection(actorRef.path())). when(mockActorContext).actorSelection(actorRef.path().toString()); + + doReturn(Optional.of(getSystem().actorSelection(actorRef.path()))). + when(mockActorContext).findPrimaryShard(eq(DefaultShardStrategy.DEFAULT_SHARD)); + doReturn(createTransactionReply(actorRef)).when(mockActorContext). - executeShardOperation(eq(DefaultShardStrategy.DEFAULT_SHARD), + executeOperation(eq(getSystem().actorSelection(actorRef.path())), eqCreateTransaction(memberName, type)); + doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath( anyString(), eq(actorRef.path().toString())); + doReturn(actorRef.path()).when(mockActorContext).actorFor(actorRef.path().toString()); return actorRef; @@ -252,7 +254,7 @@ public class TransactionProxyTest extends AbstractActorTest { TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY); - doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync( + doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), eqReadData()); Optional> readOptional = transactionProxy.read( @@ -262,7 +264,7 @@ public class TransactionProxyTest extends AbstractActorTest { NormalizedNode expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - doReturn(readDataReply(expectedNode)).when(mockActorContext).executeRemoteOperationAsync( + doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), eqReadData()); readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); @@ -277,7 +279,7 @@ public class TransactionProxyTest extends AbstractActorTest { setupActorContextWithInitialCreateTransaction(READ_ONLY); doReturn(Futures.successful(new Object())).when(mockActorContext). - executeRemoteOperationAsync(any(ActorSelection.class), any()); + executeOperationAsync(any(ActorSelection.class), any()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY); @@ -290,7 +292,7 @@ public class TransactionProxyTest extends AbstractActorTest { setupActorContextWithInitialCreateTransaction(READ_ONLY); doReturn(Futures.failed(new TestException())).when(mockActorContext). - executeRemoteOperationAsync(any(ActorSelection.class), any()); + executeOperationAsync(any(ActorSelection.class), any()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY); @@ -300,12 +302,17 @@ public class TransactionProxyTest extends AbstractActorTest { private void testExceptionOnInitialCreateTransaction(Exception exToThrow, Invoker invoker) throws Throwable { + ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class)); - doThrow(exToThrow).when(mockActorContext).executeShardOperation( - anyString(), any()); + if (exToThrow instanceof PrimaryNotFoundException) { + doReturn(Optional.absent()).when(mockActorContext).findPrimaryShard(anyString()); + } else { + doReturn(Optional.of(getSystem().actorSelection(actorRef.path()))). + when(mockActorContext).findPrimaryShard(anyString()); + } + doThrow(exToThrow).when(mockActorContext).executeOperation(any(ActorSelection.class), any()); - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, - READ_ONLY); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY); propagateReadFailedExceptionCause(invoker.invoke(transactionProxy)); } @@ -341,13 +348,13 @@ public class TransactionProxyTest extends AbstractActorTest { NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync( + doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), eqWriteData(nodeToWrite)); doReturn(Futures.failed(new TestException())).when(mockActorContext). - executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqDeleteData()); + executeOperationAsync(eq(actorSelection(actorRef)), eqDeleteData()); - doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync( + doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), eqReadData()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, @@ -360,7 +367,7 @@ public class TransactionProxyTest extends AbstractActorTest { try { propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH)); } finally { - verify(mockActorContext, times(0)).executeRemoteOperationAsync( + verify(mockActorContext, times(0)).executeOperationAsync( eq(actorSelection(actorRef)), eqReadData()); } } @@ -371,10 +378,10 @@ public class TransactionProxyTest extends AbstractActorTest { NormalizedNode expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync( + doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), eqWriteData(expectedNode)); - doReturn(readDataReply(expectedNode)).when(mockActorContext).executeRemoteOperationAsync( + doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), eqReadData()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, @@ -406,14 +413,14 @@ public class TransactionProxyTest extends AbstractActorTest { TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY); - doReturn(dataExistsReply(false)).when(mockActorContext).executeRemoteOperationAsync( + doReturn(dataExistsReply(false)).when(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), eqDataExists()); Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet(); assertEquals("Exists response", false, exists); - doReturn(dataExistsReply(true)).when(mockActorContext).executeRemoteOperationAsync( + doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), eqDataExists()); exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet(); @@ -436,7 +443,7 @@ public class TransactionProxyTest extends AbstractActorTest { setupActorContextWithInitialCreateTransaction(READ_ONLY); doReturn(Futures.successful(new Object())).when(mockActorContext). - executeRemoteOperationAsync(any(ActorSelection.class), any()); + executeOperationAsync(any(ActorSelection.class), any()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY); @@ -449,7 +456,7 @@ public class TransactionProxyTest extends AbstractActorTest { setupActorContextWithInitialCreateTransaction(READ_ONLY); doReturn(Futures.failed(new TestException())).when(mockActorContext). - executeRemoteOperationAsync(any(ActorSelection.class), any()); + executeOperationAsync(any(ActorSelection.class), any()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY); @@ -463,13 +470,13 @@ public class TransactionProxyTest extends AbstractActorTest { NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync( + doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), eqWriteData(nodeToWrite)); doReturn(Futures.failed(new TestException())).when(mockActorContext). - executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqDeleteData()); + executeOperationAsync(eq(actorSelection(actorRef)), eqDeleteData()); - doReturn(dataExistsReply(false)).when(mockActorContext).executeRemoteOperationAsync( + doReturn(dataExistsReply(false)).when(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), eqDataExists()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, @@ -482,7 +489,7 @@ public class TransactionProxyTest extends AbstractActorTest { try { propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH)); } finally { - verify(mockActorContext, times(0)).executeRemoteOperationAsync( + verify(mockActorContext, times(0)).executeOperationAsync( eq(actorSelection(actorRef)), eqDataExists()); } } @@ -493,10 +500,10 @@ public class TransactionProxyTest extends AbstractActorTest { NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync( + doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), eqWriteData(nodeToWrite)); - doReturn(dataExistsReply(true)).when(mockActorContext).executeRemoteOperationAsync( + doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), eqDataExists()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, @@ -547,7 +554,7 @@ public class TransactionProxyTest extends AbstractActorTest { NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync( + doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), eqWriteData(nodeToWrite)); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, @@ -555,7 +562,7 @@ public class TransactionProxyTest extends AbstractActorTest { transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); - verify(mockActorContext).executeRemoteOperationAsync( + verify(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), eqWriteData(nodeToWrite)); verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), @@ -590,7 +597,7 @@ public class TransactionProxyTest extends AbstractActorTest { NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync( + doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), eqMergeData(nodeToWrite)); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, @@ -598,7 +605,7 @@ public class TransactionProxyTest extends AbstractActorTest { transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite); - verify(mockActorContext).executeRemoteOperationAsync( + verify(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), eqMergeData(nodeToWrite)); verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), @@ -609,7 +616,7 @@ public class TransactionProxyTest extends AbstractActorTest { public void testDelete() throws Exception { ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY); - doReturn(deleteDataReply()).when(mockActorContext).executeRemoteOperationAsync( + doReturn(deleteDataReply()).when(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), eqDeleteData()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, @@ -617,7 +624,7 @@ public class TransactionProxyTest extends AbstractActorTest { transactionProxy.delete(TestModel.TEST_PATH); - verify(mockActorContext).executeRemoteOperationAsync( + verify(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), eqDeleteData()); verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), @@ -656,13 +663,13 @@ public class TransactionProxyTest extends AbstractActorTest { NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync( + doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), eqReadData()); - doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync( + doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), eqWriteData(nodeToWrite)); - doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeRemoteOperationAsync( + doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS)); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, @@ -691,13 +698,13 @@ public class TransactionProxyTest extends AbstractActorTest { NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync( + doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), eqMergeData(nodeToWrite)); doReturn(Futures.failed(new TestException())).when(mockActorContext). - executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqWriteData(nodeToWrite)); + executeOperationAsync(eq(actorSelection(actorRef)), eqWriteData(nodeToWrite)); - doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeRemoteOperationAsync( + doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS)); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, @@ -726,11 +733,11 @@ public class TransactionProxyTest extends AbstractActorTest { NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync( + doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), eqMergeData(nodeToWrite)); doReturn(Futures.failed(new TestException())).when(mockActorContext). - executeRemoteOperationAsync(eq(actorSelection(actorRef)), + executeOperationAsync(eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS)); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, @@ -753,8 +760,9 @@ public class TransactionProxyTest extends AbstractActorTest { @Test public void testReadyWithInitialCreateTransactionFailure() throws Exception { - doThrow(new PrimaryNotFoundException("mock")).when(mockActorContext).executeShardOperation( - anyString(), any()); + doReturn(Optional.absent()).when(mockActorContext).findPrimaryShard(anyString()); +// doThrow(new PrimaryNotFoundException("mock")).when(mockActorContext).executeShardOperation( +// anyString(), any()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); @@ -783,11 +791,11 @@ public class TransactionProxyTest extends AbstractActorTest { NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync( + doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), eqWriteData(nodeToWrite)); doReturn(Futures.successful(new Object())).when(mockActorContext). - executeRemoteOperationAsync(eq(actorSelection(actorRef)), + executeOperationAsync(eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS)); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, @@ -820,7 +828,7 @@ public class TransactionProxyTest extends AbstractActorTest { public void testClose() throws Exception{ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE); - doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync( + doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), eqReadData()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, @@ -830,7 +838,7 @@ public class TransactionProxyTest extends AbstractActorTest { transactionProxy.close(); - verify(mockActorContext).sendRemoteOperationAsync( + verify(mockActorContext).sendOperationAsync( eq(actorSelection(actorRef)), isA(CloseTransaction.SERIALIZABLE_CLASS)); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/identifiers/ShardIdentifierTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/identifiers/ShardIdentifierTest.java index afcd045434..0b5e7132c7 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/identifiers/ShardIdentifierTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/identifiers/ShardIdentifierTest.java @@ -14,5 +14,14 @@ public class ShardIdentifierTest { assertEquals("member-1-shard-inventory-config", id.toString()); } + @Test + public void testFromShardIdString(){ + String shardIdStr = "member-1-shard-inventory-config"; + + ShardIdentifier id = ShardIdentifier.builder().fromShardIdString(shardIdStr).build(); + assertEquals("member-1", id.getMemberName()); + assertEquals("inventory", id.getShardName()); + assertEquals("config", id.getType()); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java index 5d8fb8393d..fa6d0b060f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java @@ -1,6 +1,5 @@ package org.opendaylight.controller.cluster.datastore.utils; -import java.util.concurrent.TimeUnit; import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.ActorSystem; @@ -8,7 +7,7 @@ import akka.actor.Props; import akka.actor.UntypedActor; import akka.japi.Creator; import akka.testkit.JavaTestKit; - +import com.google.common.base.Optional; import org.junit.Test; import org.opendaylight.controller.cluster.datastore.AbstractActorTest; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; @@ -16,12 +15,14 @@ import org.opendaylight.controller.cluster.datastore.Configuration; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; - import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; + +import java.util.concurrent.TimeUnit; + import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; public class ActorContextTest extends AbstractActorTest{ @@ -100,63 +101,6 @@ public class ActorContextTest extends AbstractActorTest{ } } - @Test - public void testExecuteLocalShardOperationWithShardFound(){ - new JavaTestKit(getSystem()) {{ - - new Within(duration("1 seconds")) { - @Override - protected void run() { - - ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class)); - - ActorRef shardManagerActorRef = getSystem() - .actorOf(MockShardManager.props(true, shardActorRef)); - - ActorContext actorContext = - new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class), - mock(Configuration.class)); - - Object out = actorContext.executeLocalShardOperation("default", "hello"); - - assertEquals("hello", out); - - - expectNoMsg(); - } - }; - }}; - - } - - @Test - public void testExecuteLocalShardOperationWithShardNotFound(){ - new JavaTestKit(getSystem()) {{ - - new Within(duration("1 seconds")) { - @Override - protected void run() { - - ActorRef shardManagerActorRef = getSystem() - .actorOf(MockShardManager.props(false, null)); - - ActorContext actorContext = - new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class), - mock(Configuration.class)); - - Object out = actorContext.executeLocalShardOperation("default", "hello"); - - assertNull(out); - - - expectNoMsg(); - } - }; - }}; - - } - - @Test public void testFindLocalShardWithShardFound(){ new JavaTestKit(getSystem()) {{ @@ -174,9 +118,9 @@ public class ActorContextTest extends AbstractActorTest{ new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class), mock(Configuration.class)); - Object out = actorContext.findLocalShard("default"); + Optional out = actorContext.findLocalShard("default"); - assertEquals(shardActorRef, out); + assertEquals(shardActorRef, out.get()); expectNoMsg(); @@ -201,11 +145,8 @@ public class ActorContextTest extends AbstractActorTest{ new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class), mock(Configuration.class)); - Object out = actorContext.findLocalShard("default"); - - assertNull(out); - - + Optional out = actorContext.findLocalShard("default"); + assertTrue(!out.isPresent()); expectNoMsg(); } }; @@ -232,7 +173,7 @@ public class ActorContextTest extends AbstractActorTest{ ActorSelection actor = actorContext.actorSelection(shardActorRef.path()); - Object out = actorContext.executeRemoteOperation(actor, "hello"); + Object out = actorContext.executeOperation(actor, "hello"); assertEquals("hello", out); @@ -261,7 +202,7 @@ public class ActorContextTest extends AbstractActorTest{ ActorSelection actor = actorContext.actorSelection(shardActorRef.path()); - Future future = actorContext.executeRemoteOperationAsync(actor, "hello"); + Future future = actorContext.executeOperationAsync(actor, "hello"); try { Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS)); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockActorContext.java index 8fa3a17f90..81b6bccaf0 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockActorContext.java @@ -12,6 +12,7 @@ import static org.junit.Assert.assertNotNull; import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.ActorSystem; +import com.google.common.base.Optional; public class MockActorContext extends ActorContext { @@ -30,19 +31,13 @@ public class MockActorContext extends ActorContext { super(actorSystem, shardManager, new MockClusterWrapper(), new MockConfiguration()); } - - @Override public Object executeShardOperation(String shardName, - Object message) { - return executeShardOperationResponse; - } - - @Override public Object executeRemoteOperation(ActorSelection actor, - Object message) { + @Override public Object executeOperation(ActorSelection actor, + Object message) { return executeRemoteOperationResponse; } - @Override public ActorSelection findPrimary(String shardName) { - return null; + @Override public Optional findPrimaryShard(String shardName) { + return Optional.absent(); } public void setExecuteShardOperationResponse(Object response){ @@ -74,14 +69,9 @@ public class MockActorContext extends ActorContext { } @Override - public Object executeLocalOperation(ActorRef actor, - Object message) { + public Object executeOperation(ActorRef actor, + Object message) { return this.executeLocalOperationResponse; } - @Override - public Object executeLocalShardOperation(String shardName, - Object message) { - return this.executeLocalShardOperationResponse; - } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/TestUtils.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/TestUtils.java index 4ddba2f1b9..3bad468950 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/TestUtils.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/TestUtils.java @@ -21,7 +21,7 @@ public class TestUtils { ActorContext testContext = new ActorContext(actorSystem, actorSystem.actorOf( Props.create(DoNothingActor.class)), new MockClusterWrapper(), new MockConfiguration()); Object messages = testContext - .executeLocalOperation(actorRef, "messages"); + .executeOperation(actorRef, "messages"); Assert.assertNotNull(messages); -- 2.36.6