1. Shard manager creates Shard and mark them un-initialized.
Shard completes recovery and onRecoveryComplete, sends a
message to Shard manager to mark it initialized.
If a request for Shard comes to Shard manager and the
shard is not initialized, it sends ActorNotInitialized
message.
2. Normalizes and refactors ActorContext.
3. Adds AbstractUntypedPersistentActorWithMetering to meter
ShardManager.
Change-Id: Ibf15a2ef56422bda53067039d2271a719b6b2ce3
Signed-off-by: Abhishek Kumar <abhishk2@cisco.com>
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.common.actor;
+
+/**
+ * Actor with its behaviour metered. Metering is enabled by configuration.
+ */
+public abstract class AbstractUntypedPersistentActorWithMetering extends AbstractUntypedPersistentActor {
+
+ public AbstractUntypedPersistentActorWithMetering() {
+ if (isMetricsCaptureEnabled())
+ getContext().become(new MeteringBehavior(this));
+ }
+
+ private boolean isMetricsCaptureEnabled(){
+ CommonConfig config = new CommonConfig(getContext().system().settings().config());
+ return config.isMetricCaptureEnabled();
+ }
+}
import akka.actor.ActorSystem;
import akka.dispatch.OnComplete;
import akka.util.Timeout;
+import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
Preconditions.checkNotNull(path, "path should not be null");
Preconditions.checkNotNull(listener, "listener should not be null");
- if(LOG.isDebugEnabled()) {
- LOG.debug("Registering listener: {} for path: {} scope: {}", listener, path, scope);
- }
- ActorRef dataChangeListenerActor = actorContext.getActorSystem().actorOf(
- DataChangeListener.props(listener ));
+
+ LOG.debug("Registering listener: {} for path: {} scope: {}", listener, path, scope);
String shardName = ShardStrategyFactory.getStrategy(path).findShard(path);
- Future future = actorContext.executeLocalShardOperationAsync(shardName,
- new RegisterChangeListener(path, dataChangeListenerActor.path(), scope),
- new Timeout(actorContext.getOperationDuration().$times(
- REGISTER_DATA_CHANGE_LISTENER_TIMEOUT_FACTOR)));
+ Optional<ActorRef> shard = actorContext.findLocalShard(shardName);
- if (future != null) {
- final DataChangeListenerRegistrationProxy listenerRegistrationProxy =
+ //if shard is NOT local
+ if (!shard.isPresent()) {
+ LOG.debug("No local shard for shardName {} was found so returning a noop registration", shardName);
+ return new NoOpDataChangeListenerRegistration(listener);
+ }
+ //if shard is local
+ ActorRef dataChangeListenerActor = actorContext.getActorSystem().actorOf(DataChangeListener.props(listener));
+ Future future = actorContext.executeOperationAsync(shard.get(),
+ new RegisterChangeListener(path, dataChangeListenerActor.path(), scope),
+ new Timeout(actorContext.getOperationDuration().$times(REGISTER_DATA_CHANGE_LISTENER_TIMEOUT_FACTOR)));
+
+ final DataChangeListenerRegistrationProxy listenerRegistrationProxy =
new DataChangeListenerRegistrationProxy(listener, dataChangeListenerActor);
- future.onComplete(new OnComplete(){
+ future.onComplete(new OnComplete() {
- @Override public void onComplete(Throwable failure, Object result)
+ @Override
+ public void onComplete(Throwable failure, Object result)
throws Throwable {
- if(failure != null){
- LOG.error("Failed to register listener at path " + path.toString(), failure);
- return;
- }
- RegisterChangeListenerReply reply = (RegisterChangeListenerReply) result;
- listenerRegistrationProxy.setListenerRegistrationActor(actorContext
- .actorSelection(reply.getListenerRegistrationPath()));
+ if (failure != null) {
+ LOG.error("Failed to register listener at path " + path.toString(), failure);
+ return;
}
- }, actorContext.getActorSystem().dispatcher());
- return listenerRegistrationProxy;
- }
- if(LOG.isDebugEnabled()) {
- LOG.debug(
- "No local shard for shardName {} was found so returning a noop registration",
- shardName);
- }
- return new NoOpDataChangeListenerRegistration(listener);
+ RegisterChangeListenerReply reply = (RegisterChangeListenerReply) result;
+ listenerRegistrationProxy.setListenerRegistrationActor(actorContext
+ .actorSelection(reply.getListenerRegistrationPath()));
+ }
+ }, actorContext.getActorSystem().dispatcher());
+
+ return listenerRegistrationProxy;
+
}
@Override
import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
+import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
getLeader().forward(message, getContext());
} else {
getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(
- "Could not find leader so transaction cannot be created")), getSelf());
+ "Could not find shard leader so transaction cannot be created. This typically happens" +
+ " when system is coming up or recovering and a leader is being elected. Try again" +
+ " later.")), getSelf());
}
} else if (message instanceof PeerAddressResolved) {
PeerAddressResolved resolved = (PeerAddressResolved) message;
recoveryCoordinator = null;
currentLogRecoveryBatch = null;
updateJournalStats();
+
+ //notify shard manager
+ getContext().parent().tell(new ActorInitialized(), getSelf());
}
@Override
import akka.persistence.RecoveryFailure;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
+import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
+import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
* <li> Monitor the cluster members and store their addresses
* <ul>
*/
-public class ShardManager extends AbstractUntypedPersistentActor {
+public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
protected final LoggingAdapter LOG =
Logging.getLogger(getContext().system(), this);
findLocalShard((FindLocalShard) message);
} else if (message instanceof UpdateSchemaContext) {
updateSchemaContext(message);
+ } else if(message instanceof ActorInitialized) {
+ onActorInitialized(message);
} else if (message instanceof ClusterEvent.MemberUp){
memberUp((ClusterEvent.MemberUp) message);
} else if(message instanceof ClusterEvent.MemberRemoved) {
}
+ private void onActorInitialized(Object message) {
+ final ActorRef sender = getSender();
+
+ if (sender == null) {
+ return; //why is a non-actor sending this message? Just ignore.
+ }
+
+ String actorName = sender.path().name();
+ //find shard name from actor name; actor name is stringified shardId
+ ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(actorName).build();
+
+ if (shardId.getShardName() == null) {
+ return;
+ }
+ markShardAsInitialized(shardId.getShardName());
+ }
+
+ @VisibleForTesting protected void markShardAsInitialized(String shardName) {
+ LOG.debug("Initializing shard [{}]", shardName);
+ ShardInformation shardInformation = localShards.get(shardName);
+ if (shardInformation != null) {
+ shardInformation.setShardInitialized(true);
+ }
+ }
+
@Override protected void handleRecover(Object message) throws Exception {
if(message instanceof SchemaContextModules){
}
private void findLocalShard(FindLocalShard message) {
- ShardInformation shardInformation =
- localShards.get(message.getShardName());
+ ShardInformation shardInformation = localShards.get(message.getShardName());
- if(shardInformation != null){
- getSender().tell(new LocalShardFound(shardInformation.getActor()), getSelf());
+ if(shardInformation == null){
+ getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
return;
}
- getSender().tell(new LocalShardNotFound(message.getShardName()),
- getSelf());
+ sendResponse(shardInformation, new LocalShardFound(shardInformation.getActor()));
+ }
+
+ private void sendResponse(ShardInformation shardInformation, Object message) {
+ if (!shardInformation.isShardInitialized()) {
+ getSender().tell(new ActorNotInitialized(), getSelf());
+ return;
+ }
+
+ getSender().tell(message, getSelf());
}
private void memberRemoved(ClusterEvent.MemberRemoved message) {
private void memberUp(ClusterEvent.MemberUp message) {
String memberName = message.member().roles().head();
- memberNameToAddress.put(memberName , message.member().address());
+ memberNameToAddress.put(memberName, message.member().address());
for(ShardInformation info : localShards.values()){
String shardName = info.getShardName();
}
private void findPrimary(FindPrimary message) {
+ final ActorRef sender = getSender();
String shardName = message.getShardName();
// First see if the there is a local replica for the shard
ShardInformation info = localShards.get(shardName);
- if(info != null) {
+ if (info != null) {
ActorPath shardPath = info.getActorPath();
- if (shardPath != null) {
- getSender()
- .tell(
- new PrimaryFound(shardPath.toString()).toSerializable(),
- getSelf());
- return;
- }
+ sendResponse(info, new PrimaryFound(shardPath.toString()).toSerializable());
+ return;
}
- List<String> members =
- configuration.getMembersFromShardName(shardName);
+ List<String> members = configuration.getMembersFromShardName(shardName);
if(cluster.getCurrentMemberName() != null) {
members.remove(cluster.getCurrentMemberName());
}
+ /**
+ * FIXME: Instead of sending remote shard actor path back to sender,
+ * forward FindPrimary message to remote shard manager
+ */
// There is no way for us to figure out the primary (for now) so assume
// that one of the remote nodes is a primary
for(String memberName : members) {
private final ActorRef actor;
private final ActorPath actorPath;
private final Map<ShardIdentifier, String> peerAddresses;
+ private boolean shardInitialized = false; //flag that determines if the actor is ready for business
private ShardInformation(String shardName, ActorRef actor,
Map<ShardIdentifier, String> peerAddresses) {
}
}
+
+ public boolean isShardInitialized() {
+ return shardInitialized;
+ }
+
+ public void setShardInitialized(boolean shardInitialized) {
+ this.shardInitialized = shardInitialized;
+ }
}
private static class ShardManagerCreator implements Creator<ShardManager> {
}
ActorSelection cohort = actorContext.actorSelection(actorPath);
- futureList.add(actorContext.executeRemoteOperationAsync(cohort, message));
+ futureList.add(actorContext.executeOperationAsync(cohort, message));
}
return Futures.sequence(futureList, actorContext.getActorSystem().dispatcher());
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
+import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
if(remoteTransactionActorsMB.get()) {
for(ActorSelection actor : remoteTransactionActors) {
LOG.trace("Sending CloseTransaction to {}", actor);
- actorContext.sendRemoteOperationAsync(actor,
+ actorContext.sendOperationAsync(actor,
new CloseTransaction().toSerializable());
}
}
}
try {
- Object response = actorContext.executeShardOperation(shardName,
- new CreateTransaction(identifier.toString(), this.transactionType.ordinal(),
- getTransactionChainId()).toSerializable());
+ Optional<ActorSelection> primaryShard = actorContext.findPrimaryShard(shardName);
+ if (!primaryShard.isPresent()) {
+ throw new PrimaryNotFoundException("Primary could not be found for shard " + shardName);
+ }
+
+ Object response = actorContext.executeOperation(primaryShard.get(),
+ new CreateTransaction(identifier.toString(), this.transactionType.ordinal(),
+ getTransactionChainId()).toSerializable());
if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
CreateTransactionReply reply =
CreateTransactionReply.fromSerializable(response);
if(LOG.isDebugEnabled()) {
LOG.debug("Tx {} closeTransaction called", identifier);
}
- actorContext.sendRemoteOperationAsync(getActor(), new CloseTransaction().toSerializable());
+ actorContext.sendOperationAsync(getActor(), new CloseTransaction().toSerializable());
}
@Override
}
// Send the ReadyTransaction message to the Tx actor.
- final Future<Object> replyFuture = actorContext.executeRemoteOperationAsync(getActor(),
+ final Future<Object> replyFuture = actorContext.executeOperationAsync(getActor(),
new ReadyTransaction().toSerializable());
// Combine all the previously recorded put/merge/delete operation reply Futures and the
if(LOG.isDebugEnabled()) {
LOG.debug("Tx {} deleteData called path = {}", identifier, path);
}
- recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
- new DeleteData(path).toSerializable() ));
+ recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(),
+ new DeleteData(path).toSerializable()));
}
@Override
if(LOG.isDebugEnabled()) {
LOG.debug("Tx {} mergeData called path = {}", identifier, path);
}
- recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
+ recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(),
new MergeData(path, data, schemaContext).toSerializable()));
}
if(LOG.isDebugEnabled()) {
LOG.debug("Tx {} writeData called path = {}", identifier, path);
}
- recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
+ recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(),
new WriteData(path, data, schemaContext).toSerializable()));
}
}
};
- Future<Object> readFuture = actorContext.executeRemoteOperationAsync(getActor(),
+ Future<Object> readFuture = actorContext.executeOperationAsync(getActor(),
new ReadData(path).toSerializable());
readFuture.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
}
}
};
- Future<Object> future = actorContext.executeRemoteOperationAsync(getActor(),
+ Future<Object> future = actorContext.executeOperationAsync(getActor(),
new DataExists(path).toSerializable());
future.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.exceptions;
+
+public class NotInitializedException extends RuntimeException {
+ public NotInitializedException(String message) {
+ super(message);
+ }
+}
import com.google.common.base.Preconditions;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
public class ShardIdentifier {
private final String shardName;
private final String memberName;
private final String type;
+ //format and pattern should be in sync
+ private final String format = "%s-shard-%s-%s";
+ private static final Pattern pattern = Pattern.compile("(\\S+)-shard-(\\S+)-(\\S+)");
public ShardIdentifier(String shardName, String memberName, String type) {
}
@Override public String toString() {
- StringBuilder builder = new StringBuilder();
- builder.append(memberName).append("-shard-").append(shardName).append("-").append(type);
- return builder.toString();
+ //ensure the output of toString matches the pattern above
+ return new StringBuilder(memberName)
+ .append("-shard-")
+ .append(shardName)
+ .append("-")
+ .append(type)
+ .toString();
}
public static Builder builder(){
return new Builder();
}
+ public String getShardName() {
+ return shardName;
+ }
+
+ public String getMemberName() {
+ return memberName;
+ }
+
+ public String getType() {
+ return type;
+ }
+
public static class Builder {
private String shardName;
private String memberName;
return this;
}
+ public Builder fromShardIdString(String shardId){
+ Matcher matcher = pattern.matcher(shardId);
+
+ if (matcher.matches()) {
+ memberName = matcher.group(1);
+ shardName = matcher.group(2);
+ type = matcher.group(3);
+ }
+ return this;
+ }
}
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import java.io.Serializable;
+
+public class ActorInitialized implements Serializable {
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import java.io.Serializable;
+
+public class ActorNotInitialized implements Serializable {
+}
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
-import akka.pattern.Patterns;
import akka.util.Timeout;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.Configuration;
-import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
+import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
+import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
}
/**
- * Finds the primary for a given shard
+ * Finds the primary shard for the given shard name
*
* @param shardName
* @return
*/
- public ActorSelection findPrimary(String shardName) {
- String path = findPrimaryPath(shardName);
- return actorSystem.actorSelection(path);
+ public Optional<ActorSelection> findPrimaryShard(String shardName) {
+ String path = findPrimaryPathOrNull(shardName);
+ if (path == null){
+ return Optional.absent();
+ }
+ return Optional.of(actorSystem.actorSelection(path));
}
/**
* @return a reference to a local shard actor which represents the shard
* specified by the shardName
*/
- public ActorRef findLocalShard(String shardName) {
- Object result = executeLocalOperation(shardManager,
- new FindLocalShard(shardName));
+ public Optional<ActorRef> findLocalShard(String shardName) {
+ Object result = executeOperation(shardManager, new FindLocalShard(shardName));
if (result instanceof LocalShardFound) {
LocalShardFound found = (LocalShardFound) result;
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("Local shard found {}", found.getPath());
- }
- return found.getPath();
+ LOG.debug("Local shard found {}", found.getPath());
+ return Optional.of(found.getPath());
}
- return null;
+ return Optional.absent();
}
- public String findPrimaryPath(String shardName) {
- Object result = executeLocalOperation(shardManager,
- new FindPrimary(shardName).toSerializable());
+ private String findPrimaryPathOrNull(String shardName) {
+ Object result = executeOperation(shardManager, new FindPrimary(shardName).toSerializable());
if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
PrimaryFound found = PrimaryFound.fromSerializable(result);
- if(LOG.isDebugEnabled()) {
- LOG.debug("Primary found {}", found.getPrimaryPath());
- }
+ LOG.debug("Primary found {}", found.getPrimaryPath());
return found.getPrimaryPath();
+
+ } else if (result.getClass().equals(ActorNotInitialized.class)){
+ throw new NotInitializedException(
+ String.format("Found primary shard[%s] but its not initialized yet. Please try again later", shardName)
+ );
+
+ } else {
+ return null;
}
- throw new PrimaryNotFoundException("Could not find primary for shardName " + shardName);
}
* @param message
* @return The response of the operation
*/
- public Object executeLocalOperation(ActorRef actor, Object message) {
- Future<Object> future = ask(actor, message, operationTimeout);
+ public Object executeOperation(ActorRef actor, Object message) {
+ Future<Object> future = executeOperationAsync(actor, message, operationTimeout);
try {
return Await.result(future, operationDuration);
} catch (Exception e) {
- throw new TimeoutException("Sending message " + message.getClass().toString() + " to actor " + actor.toString() + " failed" , e);
+ throw new TimeoutException("Sending message " + message.getClass().toString() +
+ " to actor " + actor.toString() + " failed. Try again later.", e);
}
}
+ public Future<Object> executeOperationAsync(ActorRef actor, Object message, Timeout timeout) {
+ Preconditions.checkArgument(actor != null, "actor must not be null");
+ Preconditions.checkArgument(message != null, "message must not be null");
+
+ LOG.debug("Sending message {} to {}", message.getClass().toString(), actor.toString());
+ return ask(actor, message, timeout);
+ }
+
/**
* Execute an operation on a remote actor and wait for it's response
*
* @param message
* @return
*/
- public Object executeRemoteOperation(ActorSelection actor, Object message) {
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("Sending remote message {} to {}", message.getClass().toString(),
- actor.toString());
- }
- Future<Object> future = ask(actor, message, operationTimeout);
+ public Object executeOperation(ActorSelection actor, Object message) {
+ Future<Object> future = executeOperationAsync(actor, message);
try {
return Await.result(future, operationDuration);
} catch (Exception e) {
throw new TimeoutException("Sending message " + message.getClass().toString() +
- " to actor " + actor.toString() + " failed" , e);
+ " to actor " + actor.toString() + " failed. Try again later.", e);
}
}
* @param message the message to send
* @return a Future containing the eventual result
*/
- public Future<Object> executeRemoteOperationAsync(ActorSelection actor, Object message) {
+ public Future<Object> executeOperationAsync(ActorSelection actor, Object message) {
+ Preconditions.checkArgument(actor != null, "actor must not be null");
+ Preconditions.checkArgument(message != null, "message must not be null");
+
+ LOG.debug("Sending message {} to {}", message.getClass().toString(), actor.toString());
- if(LOG.isDebugEnabled()) {
- LOG.debug("Sending remote message {} to {}", message.getClass().toString(), actor.toString());
- }
return ask(actor, message, operationTimeout);
}
* @param actor the ActorSelection
* @param message the message to send
*/
- public void sendRemoteOperationAsync(ActorSelection actor, Object message) {
- actor.tell(message, ActorRef.noSender());
- }
-
- public void sendShardOperationAsync(String shardName, Object message) {
- ActorSelection primary = findPrimary(shardName);
-
- primary.tell(message, ActorRef.noSender());
- }
+ public void sendOperationAsync(ActorSelection actor, Object message) {
+ Preconditions.checkArgument(actor != null, "actor must not be null");
+ Preconditions.checkArgument(message != null, "message must not be null");
+ LOG.debug("Sending message {} to {}", message.getClass().toString(), actor.toString());
- /**
- * Execute an operation on the primary for a given shard
- * <p>
- * This method first finds the primary for a given shard ,then sends
- * the message to the remote shard and waits for a response
- * </p>
- *
- * @param shardName
- * @param message
- * @return
- * @throws org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException if the message to the remote shard times out
- * @throws org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException if the primary shard is not found
- */
- public Object executeShardOperation(String shardName, Object message) {
- ActorSelection primary = findPrimary(shardName);
-
- return executeRemoteOperation(primary, message);
- }
-
- /**
- * Execute an operation on the the local shard only
- * <p>
- * This method first finds the address of the local shard if any. It then
- * executes the operation on it.
- * </p>
- *
- * @param shardName the name of the shard on which the operation needs to be executed
- * @param message the message that needs to be sent to the shard
- * @return the message that was returned by the local actor on which the
- * the operation was executed. If a local shard was not found then
- * null is returned
- * @throws org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException
- * if the operation does not complete in a specified time duration
- */
- public Object executeLocalShardOperation(String shardName, Object message) {
- ActorRef local = findLocalShard(shardName);
-
- if(local != null) {
- return executeLocalOperation(local, message);
- }
-
- return null;
- }
-
-
- /**
- * Execute an operation on the the local shard only asynchronously
- *
- * <p>
- * This method first finds the address of the local shard if any. It then
- * executes the operation on it.
- * </p>
- *
- * @param shardName the name of the shard on which the operation needs to be executed
- * @param message the message that needs to be sent to the shard
- * @param timeout the amount of time that this method should wait for a response before timing out
- * @return null if the shard could not be located else a future on which the caller can wait
- *
- */
- public Future executeLocalShardOperationAsync(String shardName, Object message, Timeout timeout) {
- ActorRef local = findLocalShard(shardName);
- if(local == null){
- return null;
- }
- return Patterns.ask(local, message, timeout);
+ actor.tell(message, ActorRef.noSender());
}
-
-
public void shutdown() {
shardManager.tell(PoisonPill.getInstance(), null);
actorSystem.shutdown();
*/
public void broadcast(Object message){
for(String shardName : configuration.getAllShardNames()){
- try {
- sendShardOperationAsync(shardName, message);
- } catch(Exception e){
- LOG.warn("broadcast failed to send message " + message.getClass().getSimpleName() + " to shard " + shardName, e);
+
+ Optional<ActorSelection> primary = findPrimaryShard(shardName);
+ if (primary.isPresent()) {
+ primary.get().tell(message, ActorRef.noSender());
+ } else {
+ LOG.warn("broadcast failed to send message {} to shard {}. Primary not found",
+ message.getClass().getSimpleName(), shardName);
}
}
}
ActorContext
testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)), new MockClusterWrapper(), new MockConfiguration());
Object messages = testContext
- .executeLocalOperation(actorRef, "messages");
+ .executeOperation(actorRef, "messages");
Assert.assertNotNull(messages);
ActorContext
testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)),new MockClusterWrapper(), new MockConfiguration());
Object messages = testContext
- .executeLocalOperation(actorRef, "messages");
+ .executeOperation(actorRef, "messages");
assertNotNull(messages);
ActorContext
testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)),new MockClusterWrapper(), new MockConfiguration());
Object messages = testContext
- .executeLocalOperation(actorRef, "messages");
+ .executeOperation(actorRef, "messages");
assertNotNull(messages);
import akka.dispatch.ExecutionContexts;
import akka.dispatch.Futures;
import akka.util.Timeout;
+import com.google.common.base.Optional;
import com.google.common.util.concurrent.MoreExecutors;
import org.junit.After;
import org.junit.Before;
ListenerRegistration registration =
distributedDataStore.registerChangeListener(TestModel.TEST_PATH, new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
- @Override
- public void onDataChanged(AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
- throw new UnsupportedOperationException("onDataChanged");
- }
- }, AsyncDataBroker.DataChangeScope.BASE);
+ @Override
+ public void onDataChanged(AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
+ throw new UnsupportedOperationException("onDataChanged");
+ }
+ }, AsyncDataBroker.DataChangeScope.BASE);
// Since we do not expect the shard to be local registration will return a NoOpRegistration
assertTrue(registration instanceof NoOpDataChangeListenerRegistration);
Future future = mock(Future.class);
when(actorContext.getOperationDuration()).thenReturn(FiniteDuration.apply(5, TimeUnit.SECONDS));
when(actorContext.getActorSystem()).thenReturn(getSystem());
+ when(actorContext.findLocalShard(anyString())).thenReturn(Optional.of(doNothingActorRef));
when(actorContext
- .executeLocalShardOperationAsync(anyString(), anyObject(), any(Timeout.class))).thenReturn(future);
+ .executeOperationAsync(eq(doNothingActorRef), anyObject(), any(Timeout.class))).thenReturn(future);
ListenerRegistration registration =
distributedDataStore.registerChangeListener(TestModel.TEST_PATH,
when(actorSystem.dispatcher()).thenReturn(executor);
when(actorSystem.actorOf(any(Props.class))).thenReturn(doNothingActorRef);
when(actorContext.getActorSystem()).thenReturn(actorSystem);
+ when(actorContext.findLocalShard(anyString())).thenReturn(Optional.of(doNothingActorRef));
when(actorContext
- .executeLocalShardOperationAsync(anyString(), anyObject(), any(Timeout.class))).thenReturn(f);
+ .executeOperationAsync(eq(doNothingActorRef), anyObject(), any(Timeout.class))).thenReturn(f);
when(actorContext.actorSelection(any(ActorPath.class))).thenReturn(actorSelection);
ListenerRegistration registration =
when(actorSystem.dispatcher()).thenReturn(executor);
when(actorSystem.actorOf(any(Props.class))).thenReturn(doNothingActorRef);
when(actorContext.getActorSystem()).thenReturn(actorSystem);
+ when(actorContext.findLocalShard(anyString())).thenReturn(Optional.of(doNothingActorRef));
when(actorContext
- .executeLocalShardOperationAsync(anyString(), anyObject(), any(Timeout.class))).thenReturn(f);
+ .executeOperationAsync(eq(doNothingActorRef), anyObject(), any(Timeout.class))).thenReturn(f);
when(actorContext.actorSelection(any(ActorPath.class))).thenReturn(actorSelection);
ListenerRegistration registration =
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
public class ShardManagerTest {
private static ActorSystem system;
+ Configuration mockConfig = new MockConfiguration();
+ private static ActorRef defaultShardMockActor;
@BeforeClass
public static void setUpClass() {
myJournal.put("class", "org.opendaylight.controller.cluster.datastore.ShardManagerTest$MyJournal");
myJournal.put("plugin-dispatcher", "akka.actor.default-dispatcher");
Config config = ConfigFactory.load()
- .withValue("akka.persistence.journal.plugin",
- ConfigValueFactory.fromAnyRef("my-journal"))
- .withValue("my-journal", ConfigValueFactory.fromMap(myJournal));
+ .withValue("akka.persistence.journal.plugin",
+ ConfigValueFactory.fromAnyRef("my-journal"))
+ .withValue("my-journal", ConfigValueFactory.fromMap(myJournal));
MyJournal.clear();
system = ActorSystem.create("test", config);
+
+ String name = new ShardIdentifier(Shard.DEFAULT_NAME, "member-1","config").toString();
+ defaultShardMockActor = system.actorOf(Props.create(DoNothingActor.class), name);
+
+
}
@AfterClass
new JavaTestKit(system) {
{
final Props props = ShardManager
- .props("config", new MockClusterWrapper(),
- new MockConfiguration(), new DatastoreContext());
+ .props("config", new MockClusterWrapper(),
+ new MockConfiguration(), new DatastoreContext());
final ActorRef subject = getSystem().actorOf(props);
subject.tell(new FindPrimary("inventory").toSerializable(), getRef());
expectMsgEquals(duration("2 seconds"),
- new PrimaryNotFound("inventory").toSerializable());
+ new PrimaryNotFound("inventory").toSerializable());
}};
}
new JavaTestKit(system) {{
final Props props = ShardManager
- .props("config", new MockClusterWrapper(),
- new MockConfiguration(), new DatastoreContext());
+ .props("config", new MockClusterWrapper(),
+ new MockConfiguration(), new DatastoreContext());
final ActorRef subject = getSystem().actorOf(props);
subject.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+ subject.tell(new ActorInitialized(), defaultShardMockActor);
subject.tell(new FindPrimary(Shard.DEFAULT_NAME).toSerializable(), getRef());
expectMsgClass(duration("1 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
- }};
+ }
+ };
}
@Test
new JavaTestKit(system) {{
final Props props = ShardManager
- .props("config", new MockClusterWrapper(),
- new MockConfiguration(), new DatastoreContext());
+ .props("config", new MockClusterWrapper(),
+ new MockConfiguration(), new DatastoreContext());
final ActorRef subject = getSystem().actorOf(props);
new JavaTestKit(system) {{
final Props props = ShardManager
- .props("config", mockClusterWrapper,
- new MockConfiguration(), new DatastoreContext());
+ .props("config", mockClusterWrapper,
+ new MockConfiguration(), new DatastoreContext());
final ActorRef subject = getSystem().actorOf(props);
subject.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+ subject.tell(new ActorInitialized(), defaultShardMockActor);
subject.tell(new FindLocalShard(Shard.DEFAULT_NAME), getRef());
}.get(); // this extracts the received message
assertTrue(out.path().toString(),
- out.path().toString().contains("member-1-shard-default-config"));
+ out.path().toString().contains("member-1-shard-default-config"));
}};
}
new JavaTestKit(system) {{
final Props props = ShardManager
- .props("config", new MockClusterWrapper(),
- new MockConfiguration(), new DatastoreContext());
+ .props("config", new MockClusterWrapper(),
+ new MockConfiguration(), new DatastoreContext());
final ActorRef subject = getSystem().actorOf(props);
new JavaTestKit(system) {{
final Props props = ShardManager
- .props("config", new MockClusterWrapper(),
- new MockConfiguration(), new DatastoreContext());
+ .props("config", new MockClusterWrapper(),
+ new MockConfiguration(), new DatastoreContext());
final ActorRef subject = getSystem().actorOf(props);
@Test
public void testOnRecoveryJournalIsEmptied(){
MyJournal.addToJournal(1L, new ShardManager.SchemaContextModules(
- ImmutableSet.of("foo")));
+ ImmutableSet.of("foo")));
assertEquals(1, MyJournal.get().size());
new JavaTestKit(system) {{
final Props props = ShardManager
- .props("config", new MockClusterWrapper(),
- new MockConfiguration(), new DatastoreContext());
+ .props("config", new MockClusterWrapper(),
+ new MockConfiguration(), new DatastoreContext());
final ActorRef subject = getSystem().actorOf(props);
public void testOnRecoveryPreviouslyKnownModulesAreDiscovered() throws Exception {
new JavaTestKit(system) {{
final Props props = ShardManager
- .props("config", new MockClusterWrapper(),
- new MockConfiguration(), new DatastoreContext());
+ .props("config", new MockClusterWrapper(),
+ new MockConfiguration(), new DatastoreContext());
final TestActorRef<ShardManager> subject =
- TestActorRef.create(system, props);
+ TestActorRef.create(system, props);
subject.underlyingActor().onReceiveRecover(new ShardManager.SchemaContextModules(ImmutableSet.of("foo")));
@Test
public void testOnUpdateSchemaContextUpdateKnownModulesIfTheyContainASuperSetOfTheKnownModules()
- throws Exception {
+ throws Exception {
new JavaTestKit(system) {{
final Props props = ShardManager
- .props("config", new MockClusterWrapper(),
- new MockConfiguration(), new DatastoreContext());
+ .props("config", new MockClusterWrapper(),
+ new MockConfiguration(), new DatastoreContext());
final TestActorRef<ShardManager> subject =
- TestActorRef.create(system, props);
+ TestActorRef.create(system, props);
Collection<String> knownModules = subject.underlyingActor().getKnownModules();
@Test
public void testOnUpdateSchemaContextDoNotUpdateKnownModulesIfTheyDoNotContainASuperSetOfKnownModules()
- throws Exception {
+ throws Exception {
new JavaTestKit(system) {{
final Props props = ShardManager
- .props("config", new MockClusterWrapper(),
- new MockConfiguration(), new DatastoreContext());
+ .props("config", new MockClusterWrapper(),
+ new MockConfiguration(), new DatastoreContext());
final TestActorRef<ShardManager> subject =
- TestActorRef.create(system, props);
+ TestActorRef.create(system, props);
Collection<String> knownModules = subject.underlyingActor().getKnownModules();
}
@Override public Future<Void> doAsyncReplayMessages(final String persistenceId, long fromSequenceNr, long toSequenceNr, long max,
- final Procedure<PersistentRepr> replayCallback) {
+ final Procedure<PersistentRepr> replayCallback) {
if(journal.size() == 0){
return Futures.successful(null);
}
public Void call() throws Exception {
for (Map.Entry<Long, Object> entry : journal.entrySet()) {
PersistentRepr persistentMessage =
- new PersistentImpl(entry.getValue(), entry.getKey(), persistenceId,
- false, null, null);
+ new PersistentImpl(entry.getValue(), entry.getKey(), persistenceId,
+ false, null, null);
replayCallback.apply(persistentMessage);
}
return null;
}
@Override public Future<Void> doAsyncWriteMessages(
- final Iterable<PersistentRepr> persistentReprs) {
+ final Iterable<PersistentRepr> persistentReprs) {
return Futures.future(new Callable<Void>() {
@Override
public Void call() throws Exception {
}
@Override public Future<Void> doAsyncWriteConfirmations(
- Iterable<PersistentConfirmation> persistentConfirmations) {
+ Iterable<PersistentConfirmation> persistentConfirmations) {
return Futures.successful(null);
}
@Override public Future<Void> doAsyncDeleteMessages(Iterable<PersistentId> persistentIds,
- boolean b) {
+ boolean b) {
clear();
return Futures.successful(null);
}
.successful(((SerializableMessage) responses[i]).toSerializable()));
}
- stubber.when(actorContext).executeRemoteOperationAsync(any(ActorSelection.class),
+ stubber.when(actorContext).executeOperationAsync(any(ActorSelection.class),
isA(requestType));
}
private void verifyCohortInvocations(int nCohorts, Class<?> requestType) {
- verify(actorContext, times(nCohorts)).executeRemoteOperationAsync(
+ verify(actorContext, times(nCohorts)).executeOperationAsync(
any(ActorSelection.class), isA(requestType));
}
package org.opendaylight.controller.cluster.datastore;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.fail;
import akka.actor.ActorPath;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.Props;
import akka.dispatch.Futures;
-
import com.google.common.base.Optional;
import com.google.common.util.concurrent.CheckedFuture;
-
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatcher;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
-
-import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
-import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
-import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
-
import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
+
import java.util.List;
import java.util.concurrent.TimeUnit;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.argThat;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.argThat;
import static org.mockito.Mockito.eq;
-import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.isA;
import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
+import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
+import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
@SuppressWarnings("resource")
public class TransactionProxyTest extends AbstractActorTest {
ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
doReturn(getSystem().actorSelection(actorRef.path())).
when(mockActorContext).actorSelection(actorRef.path().toString());
+
+ doReturn(Optional.of(getSystem().actorSelection(actorRef.path()))).
+ when(mockActorContext).findPrimaryShard(eq(DefaultShardStrategy.DEFAULT_SHARD));
+
doReturn(createTransactionReply(actorRef)).when(mockActorContext).
- executeShardOperation(eq(DefaultShardStrategy.DEFAULT_SHARD),
+ executeOperation(eq(getSystem().actorSelection(actorRef.path())),
eqCreateTransaction(memberName, type));
+
doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(
anyString(), eq(actorRef.path().toString()));
+
doReturn(actorRef.path()).when(mockActorContext).actorFor(actorRef.path().toString());
return actorRef;
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_ONLY);
- doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
+ doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqReadData());
Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(readDataReply(expectedNode)).when(mockActorContext).executeRemoteOperationAsync(
+ doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqReadData());
readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
setupActorContextWithInitialCreateTransaction(READ_ONLY);
doReturn(Futures.successful(new Object())).when(mockActorContext).
- executeRemoteOperationAsync(any(ActorSelection.class), any());
+ executeOperationAsync(any(ActorSelection.class), any());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_ONLY);
setupActorContextWithInitialCreateTransaction(READ_ONLY);
doReturn(Futures.failed(new TestException())).when(mockActorContext).
- executeRemoteOperationAsync(any(ActorSelection.class), any());
+ executeOperationAsync(any(ActorSelection.class), any());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_ONLY);
private void testExceptionOnInitialCreateTransaction(Exception exToThrow, Invoker invoker)
throws Throwable {
+ ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
- doThrow(exToThrow).when(mockActorContext).executeShardOperation(
- anyString(), any());
+ if (exToThrow instanceof PrimaryNotFoundException) {
+ doReturn(Optional.absent()).when(mockActorContext).findPrimaryShard(anyString());
+ } else {
+ doReturn(Optional.of(getSystem().actorSelection(actorRef.path()))).
+ when(mockActorContext).findPrimaryShard(anyString());
+ }
+ doThrow(exToThrow).when(mockActorContext).executeOperation(any(ActorSelection.class), any());
- TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
- READ_ONLY);
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
}
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+ doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
doReturn(Futures.failed(new TestException())).when(mockActorContext).
- executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqDeleteData());
+ executeOperationAsync(eq(actorSelection(actorRef)), eqDeleteData());
- doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
+ doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqReadData());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
try {
propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
} finally {
- verify(mockActorContext, times(0)).executeRemoteOperationAsync(
+ verify(mockActorContext, times(0)).executeOperationAsync(
eq(actorSelection(actorRef)), eqReadData());
}
}
NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+ doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqWriteData(expectedNode));
- doReturn(readDataReply(expectedNode)).when(mockActorContext).executeRemoteOperationAsync(
+ doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqReadData());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_ONLY);
- doReturn(dataExistsReply(false)).when(mockActorContext).executeRemoteOperationAsync(
+ doReturn(dataExistsReply(false)).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqDataExists());
Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
assertEquals("Exists response", false, exists);
- doReturn(dataExistsReply(true)).when(mockActorContext).executeRemoteOperationAsync(
+ doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqDataExists());
exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
setupActorContextWithInitialCreateTransaction(READ_ONLY);
doReturn(Futures.successful(new Object())).when(mockActorContext).
- executeRemoteOperationAsync(any(ActorSelection.class), any());
+ executeOperationAsync(any(ActorSelection.class), any());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_ONLY);
setupActorContextWithInitialCreateTransaction(READ_ONLY);
doReturn(Futures.failed(new TestException())).when(mockActorContext).
- executeRemoteOperationAsync(any(ActorSelection.class), any());
+ executeOperationAsync(any(ActorSelection.class), any());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
READ_ONLY);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+ doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
doReturn(Futures.failed(new TestException())).when(mockActorContext).
- executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqDeleteData());
+ executeOperationAsync(eq(actorSelection(actorRef)), eqDeleteData());
- doReturn(dataExistsReply(false)).when(mockActorContext).executeRemoteOperationAsync(
+ doReturn(dataExistsReply(false)).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqDataExists());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
try {
propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
} finally {
- verify(mockActorContext, times(0)).executeRemoteOperationAsync(
+ verify(mockActorContext, times(0)).executeOperationAsync(
eq(actorSelection(actorRef)), eqDataExists());
}
}
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+ doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
- doReturn(dataExistsReply(true)).when(mockActorContext).executeRemoteOperationAsync(
+ doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqDataExists());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+ doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
- verify(mockActorContext).executeRemoteOperationAsync(
+ verify(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+ doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
- verify(mockActorContext).executeRemoteOperationAsync(
+ verify(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
public void testDelete() throws Exception {
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
- doReturn(deleteDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+ doReturn(deleteDataReply()).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqDeleteData());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
transactionProxy.delete(TestModel.TEST_PATH);
- verify(mockActorContext).executeRemoteOperationAsync(
+ verify(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqDeleteData());
verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
+ doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqReadData());
- doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+ doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
- doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeRemoteOperationAsync(
+ doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+ doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
doReturn(Futures.failed(new TestException())).when(mockActorContext).
- executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
+ executeOperationAsync(eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
- doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeRemoteOperationAsync(
+ doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+ doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
doReturn(Futures.failed(new TestException())).when(mockActorContext).
- executeRemoteOperationAsync(eq(actorSelection(actorRef)),
+ executeOperationAsync(eq(actorSelection(actorRef)),
isA(ReadyTransaction.SERIALIZABLE_CLASS));
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
@Test
public void testReadyWithInitialCreateTransactionFailure() throws Exception {
- doThrow(new PrimaryNotFoundException("mock")).when(mockActorContext).executeShardOperation(
- anyString(), any());
+ doReturn(Optional.absent()).when(mockActorContext).findPrimaryShard(anyString());
+// doThrow(new PrimaryNotFoundException("mock")).when(mockActorContext).executeShardOperation(
+// anyString(), any());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
WRITE_ONLY);
NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync(
+ doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
doReturn(Futures.successful(new Object())).when(mockActorContext).
- executeRemoteOperationAsync(eq(actorSelection(actorRef)),
+ executeOperationAsync(eq(actorSelection(actorRef)),
isA(ReadyTransaction.SERIALIZABLE_CLASS));
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
public void testClose() throws Exception{
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE);
- doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
+ doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqReadData());
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
transactionProxy.close();
- verify(mockActorContext).sendRemoteOperationAsync(
+ verify(mockActorContext).sendOperationAsync(
eq(actorSelection(actorRef)), isA(CloseTransaction.SERIALIZABLE_CLASS));
}
}
assertEquals("member-1-shard-inventory-config", id.toString());
}
+ @Test
+ public void testFromShardIdString(){
+ String shardIdStr = "member-1-shard-inventory-config";
+
+ ShardIdentifier id = ShardIdentifier.builder().fromShardIdString(shardIdStr).build();
+ assertEquals("member-1", id.getMemberName());
+ assertEquals("inventory", id.getShardName());
+ assertEquals("config", id.getType());
+ }
}
package org.opendaylight.controller.cluster.datastore.utils;
-import java.util.concurrent.TimeUnit;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.UntypedActor;
import akka.japi.Creator;
import akka.testkit.JavaTestKit;
-
+import com.google.common.base.Optional;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
-
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
+
+import java.util.concurrent.TimeUnit;
+
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
public class ActorContextTest extends AbstractActorTest{
}
}
- @Test
- public void testExecuteLocalShardOperationWithShardFound(){
- new JavaTestKit(getSystem()) {{
-
- new Within(duration("1 seconds")) {
- @Override
- protected void run() {
-
- ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
-
- ActorRef shardManagerActorRef = getSystem()
- .actorOf(MockShardManager.props(true, shardActorRef));
-
- ActorContext actorContext =
- new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
- mock(Configuration.class));
-
- Object out = actorContext.executeLocalShardOperation("default", "hello");
-
- assertEquals("hello", out);
-
-
- expectNoMsg();
- }
- };
- }};
-
- }
-
- @Test
- public void testExecuteLocalShardOperationWithShardNotFound(){
- new JavaTestKit(getSystem()) {{
-
- new Within(duration("1 seconds")) {
- @Override
- protected void run() {
-
- ActorRef shardManagerActorRef = getSystem()
- .actorOf(MockShardManager.props(false, null));
-
- ActorContext actorContext =
- new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
- mock(Configuration.class));
-
- Object out = actorContext.executeLocalShardOperation("default", "hello");
-
- assertNull(out);
-
-
- expectNoMsg();
- }
- };
- }};
-
- }
-
-
@Test
public void testFindLocalShardWithShardFound(){
new JavaTestKit(getSystem()) {{
new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
mock(Configuration.class));
- Object out = actorContext.findLocalShard("default");
+ Optional<ActorRef> out = actorContext.findLocalShard("default");
- assertEquals(shardActorRef, out);
+ assertEquals(shardActorRef, out.get());
expectNoMsg();
new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
mock(Configuration.class));
- Object out = actorContext.findLocalShard("default");
-
- assertNull(out);
-
-
+ Optional<ActorRef> out = actorContext.findLocalShard("default");
+ assertTrue(!out.isPresent());
expectNoMsg();
}
};
ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
- Object out = actorContext.executeRemoteOperation(actor, "hello");
+ Object out = actorContext.executeOperation(actor, "hello");
assertEquals("hello", out);
ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
- Future<Object> future = actorContext.executeRemoteOperationAsync(actor, "hello");
+ Future<Object> future = actorContext.executeOperationAsync(actor, "hello");
try {
Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS));
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
+import com.google.common.base.Optional;
public class MockActorContext extends ActorContext {
super(actorSystem, shardManager, new MockClusterWrapper(), new MockConfiguration());
}
-
- @Override public Object executeShardOperation(String shardName,
- Object message) {
- return executeShardOperationResponse;
- }
-
- @Override public Object executeRemoteOperation(ActorSelection actor,
- Object message) {
+ @Override public Object executeOperation(ActorSelection actor,
+ Object message) {
return executeRemoteOperationResponse;
}
- @Override public ActorSelection findPrimary(String shardName) {
- return null;
+ @Override public Optional<ActorSelection> findPrimaryShard(String shardName) {
+ return Optional.absent();
}
public void setExecuteShardOperationResponse(Object response){
}
@Override
- public Object executeLocalOperation(ActorRef actor,
- Object message) {
+ public Object executeOperation(ActorRef actor,
+ Object message) {
return this.executeLocalOperationResponse;
}
- @Override
- public Object executeLocalShardOperation(String shardName,
- Object message) {
- return this.executeLocalShardOperationResponse;
- }
}
ActorContext testContext = new ActorContext(actorSystem, actorSystem.actorOf(
Props.create(DoNothingActor.class)), new MockClusterWrapper(), new MockConfiguration());
Object messages = testContext
- .executeLocalOperation(actorRef, "messages");
+ .executeOperation(actorRef, "messages");
Assert.assertNotNull(messages);