X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShard.java;h=5b8a50e18896c0a3e10db64c10b1d6fdffdc41d8;hb=a51163ead1d60e66eeaf3691adb70b019ce60fb2;hp=f7b3461d333f9c157dbeb06208eed276080c297d;hpb=066c58eb317f11d8cbe84d31ac4d5a5b034f1f2c;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index f7b3461d33..5b8a50e188 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -121,7 +121,7 @@ public class Shard extends RaftActor { private ShardSnapshot restoreFromSnapshot; - + private final ShardTransactionMessageRetrySupport messageRetrySupport; protected Shard(AbstractBuilder builder) { super(builder.getId().toString(), builder.getPeerAddresses(), @@ -163,8 +163,7 @@ public class Shard extends RaftActor { snapshotCohort = new ShardSnapshotCohort(transactionActorFactory, store, LOG, this.name); - - + messageRetrySupport = new ShardTransactionMessageRetrySupport(this); } private void setTransactionCommitTimeout() { @@ -184,6 +183,8 @@ public class Shard extends RaftActor { super.postStop(); + messageRetrySupport.close(); + if(txCommitTimeoutCheckSchedule != null) { txCommitTimeoutCheckSchedule.cancel(); } @@ -265,6 +266,8 @@ public class Shard extends RaftActor { sender().tell(store.getDataTree(), self()); } else if(message instanceof ServerRemoved){ context().parent().forward(message, context()); + } else if(ShardTransactionMessageRetrySupport.TIMER_MESSAGE_CLASS.isInstance(message)) { + messageRetrySupport.onTimerMessage(message); } else { super.onReceiveCommand(message); } @@ -410,12 +413,6 @@ public class Shard extends RaftActor { commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this); } - private void noLeaderError(String errMessage) { - // TODO: rather than throwing an immediate exception, we could schedule a timer to try again to make - // it more resilient in case we're in the process of electing a new leader. - getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(errMessage, persistenceId())), getSelf()); - } - protected void handleBatchedModificationsLocal(BatchedModifications batched, ActorRef sender) { try { commitCoordinator.handleBatchedModifications(batched, sender, this); @@ -452,7 +449,8 @@ public class Shard extends RaftActor { LOG.debug("{}: Forwarding BatchedModifications to leader {}", persistenceId(), leader); leader.forward(batched, getContext()); } else { - noLeaderError("Could not commit transaction " + batched.getTransactionID()); + messageRetrySupport.addMessageToRetry(batched, getSender(), + "Could not commit transaction " + batched.getTransactionID()); } } } @@ -491,7 +489,8 @@ public class Shard extends RaftActor { message.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion()); leader.forward(message, getContext()); } else { - noLeaderError("Could not commit transaction " + message.getTransactionID()); + messageRetrySupport.addMessageToRetry(message, getSender(), + "Could not commit transaction " + message.getTransactionID()); } } } @@ -511,7 +510,8 @@ public class Shard extends RaftActor { readyLocal.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion()); leader.forward(readyLocal, getContext()); } else { - noLeaderError("Could not commit transaction " + forwardedReady.getTransactionID()); + messageRetrySupport.addMessageToRetry(forwardedReady, getSender(), + "Could not commit transaction " + forwardedReady.getTransactionID()); } } } @@ -711,6 +711,10 @@ public class Shard extends RaftActor { @Override protected void onLeaderChanged(String oldLeader, String newLeader) { shardMBean.incrementLeadershipChangeCount(); + + if(hasLeader()) { + messageRetrySupport.retryMessages(); + } } @Override