import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
+import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
import org.opendaylight.controller.cluster.datastore.messages.CreateShardReply;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
+import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
-import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
-import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
.build());
}
+ private void checkLocalShardExists(final String shardName, final ActorRef sender) {
+ if (localShards.containsKey(shardName)) {
+ String msg = String.format("Local shard %s already exists", shardName);
+ LOG.debug ("{}: {}", persistenceId(), msg);
+ sender.tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf());
+ }
+ }
+
private void onAddShardReplica (AddShardReplica shardReplicaMsg) {
final String shardName = shardReplicaMsg.getShardName();
// verify the local shard replica is already available in the controller node
- LOG.debug ("received AddShardReplica for shard {}", shardName);
- if (localShards.containsKey(shardName)) {
- LOG.debug ("Local shard {} already available in the controller node", shardName);
- getSender().tell(new akka.actor.Status.Failure(
- new IllegalArgumentException(String.format("Local shard %s already exists",
- shardName))), getSelf());
- return;
- }
+ LOG.debug ("onAddShardReplica: {}", shardReplicaMsg);
+
+ checkLocalShardExists(shardName, getSender());
+
// verify the shard with the specified name is present in the cluster configuration
if (!(this.configuration.isShardConfigured(shardName))) {
- LOG.debug ("No module configuration exists for shard {}", shardName);
- getSender().tell(new akka.actor.Status.Failure(new IllegalArgumentException(
- String.format("No module configuration exists for shard %s",
- shardName))), getSelf());
+ String msg = String.format("No module configuration exists for shard %s", shardName);
+ LOG.debug ("{}: {}", persistenceId(), msg);
+ getSender().tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf());
return;
}
// Create the localShard
if (schemaContext == null) {
- LOG.debug ("schemaContext is not updated to create localShardActor");
- getSender().tell(new akka.actor.Status.Failure(
- new IllegalStateException(String.format(
- "schemaContext not available to create localShardActor for %s",
- shardName))), getSelf());
+ String msg = String.format(
+ "No SchemaContext is available in order to create a local shard instance for %s", shardName);
+ LOG.debug ("{}: {}", persistenceId(), msg);
+ getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf());
return;
}
Map<String, String> peerAddresses = getPeerAddresses(shardName);
if (peerAddresses.isEmpty()) {
- LOG.debug ("Shard peers not available for replicating shard data from leader");
- getSender().tell(new akka.actor.Status.Failure(
- new IllegalStateException(String.format(
- "Cannot add replica for shard %s because no peer is available",
- shardName))), getSelf());
+ String msg = String.format("Cannot add replica for shard %s because no peer is available", shardName);
+ LOG.debug ("{}: {}", persistenceId(), msg);
+ getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf());
return;
}
- Timeout findPrimaryTimeout = new Timeout(datastoreContext
- .getShardInitializationTimeout().duration().$times(2));
+ Timeout findPrimaryTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2));
final ActorRef sender = getSender();
- Future<Object> futureObj = ask(getSelf(), new RemoteFindPrimary(shardName, true),
- findPrimaryTimeout);
+ Future<Object> futureObj = ask(getSelf(), new RemoteFindPrimary(shardName, true), findPrimaryTimeout);
futureObj.onComplete(new OnComplete<Object>() {
@Override
public void onComplete(Throwable failure, Object response) {
if (failure != null) {
- LOG.debug ("Failed to receive response for FindPrimary of shard {}",
- shardName, failure);
+ LOG.debug ("{}: Received failure from FindPrimary for shard {}", persistenceId(), shardName, failure);
sender.tell(new akka.actor.Status.Failure(new RuntimeException(
String.format("Failed to find leader for shard %s", shardName), failure)),
getSelf());
} else {
if (!(response instanceof RemotePrimaryShardFound)) {
- LOG.debug ("Shard leader not available for creating local shard replica {}",
- shardName);
- sender.tell(new akka.actor.Status.Failure(
- new IllegalStateException(String.format(
- "Invalid response type, %s, received from FindPrimary for shard %s",
- response.getClass().getName(), shardName))), getSelf());
+ String msg = String.format("Failed to find leader for shard %s: received response: %s",
+ shardName, response);
+ LOG.debug ("{}: {}", persistenceId(), msg);
+ sender.tell(new akka.actor.Status.Failure(new RuntimeException(msg)), getSelf());
return;
}
+
RemotePrimaryShardFound message = (RemotePrimaryShardFound)response;
addShard (shardName, message, sender);
}
}, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
}
- private void addShard(final String shardName, final RemotePrimaryShardFound response,
- final ActorRef sender) {
- ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
- shardName);
- String localShardAddress = peerAddressResolver.getShardActorAddress(shardName,
- cluster.getCurrentMemberName());
+ private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) {
+ checkLocalShardExists(shardName, sender);
+
+ ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
+ String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
final ShardInformation shardInfo = new ShardInformation(shardName, shardId,
getPeerAddresses(shardName), getInitShardDataStoreContext(),
new DefaultShardPropsCreator(), peerAddressResolver);
shardInfo.setActor(newShardActor(schemaContext, shardInfo));
//inform ShardLeader to add this shard as a replica by sending an AddServer message
- LOG.debug ("sending AddServer message to peer {} for shard {}",
- response.getPrimaryPath(), shardId);
+ LOG.debug ("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
+ response.getPrimaryPath(), shardId);
Timeout addServerTimeout = new Timeout(datastoreContext
.getShardLeaderElectionTimeout().duration().$times(4));
@Override
public void onComplete(Throwable failure, Object addServerResponse) {
if (failure != null) {
- LOG.debug ("AddServer request to {} for {} failed",
- response.getPrimaryPath(), shardName, failure);
+ LOG.debug ("{}: AddServer request to {} for {} failed", persistenceId(),
+ response.getPrimaryPath(), shardName, failure);
+
// Remove the shard
localShards.remove(shardName);
if (shardInfo.getActor() != null) {
shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
}
+
sender.tell(new akka.actor.Status.Failure(new RuntimeException(
String.format("AddServer request to leader %s for shard %s failed",
response.getPrimaryPath(), shardName), failure)), getSelf());
private void onAddServerReply (String shardName, ShardInformation shardInfo,
AddServerReply replyMsg, ActorRef sender, String leaderPath) {
+ LOG.debug ("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath);
+
if (replyMsg.getStatus() == ServerChangeStatus.OK) {
- LOG.debug ("Leader shard successfully added the replica shard {}",
- shardName);
+ LOG.debug ("{}: Leader shard successfully added the replica shard {}", persistenceId(), shardName);
+
// Make the local shard voting capable
shardInfo.setDatastoreContext(datastoreContext, getSelf());
- ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
- shardName);
- mBean.addLocalShard(shardId.toString());
+
+ mBean.addLocalShard(shardInfo.getShardId().toString());
sender.tell(new akka.actor.Status.Success(true), getSelf());
} else {
- LOG.warn ("Cannot add shard replica {} status {}",
- shardName, replyMsg.getStatus());
- LOG.debug ("removing the local shard replica for shard {}",
- shardName);
+ LOG.warn ("{}: Leader failed to add shard replica {} with status {} - removing the local shard",
+ persistenceId(), shardName, replyMsg.getStatus());
+
//remove the local replica created
localShards.remove(shardName);
if (shardInfo.getActor() != null) {
shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
}
switch (replyMsg.getStatus()) {
- //case ServerChangeStatus.TIMEOUT:
case TIMEOUT:
sender.tell(new akka.actor.Status.Failure(new RuntimeException(
String.format("The shard leader %s timed out trying to replicate the initial data to the new shard %s. Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data",
leaderPath, shardName))), getSelf());
break;
- //case ServerChangeStatus.NO_LEADER:
case NO_LEADER:
sender.tell(new akka.actor.Status.Failure(new RuntimeException(String.format(
"There is no shard leader available for shard %s", shardName))), getSelf());
private void onRemoveShardReplica (RemoveShardReplica shardReplicaMsg) {
String shardName = shardReplicaMsg.getShardName();
- boolean deleteStatus = false;
// verify the local shard replica is available in the controller node
if (!localShards.containsKey(shardName)) {
- LOG.debug ("Local shard replica {} is not available in the controller node", shardName);
- getSender().tell(new akka.actor.Status.Failure(
- new IllegalArgumentException(String.format("Local shard %s not available",
- shardName))), getSelf());
+ String msg = String.format("Local shard %s does not", shardName);
+ LOG.debug ("{}: {}", persistenceId(), msg);
+ getSender().tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf());
return;
}
// call RemoveShard for the shardName