Bug 4105: Change commit retry mechanism in EntityOwnershipShard
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
index dd1c0ad6ff464959bb6fdf97220a8862d52edf8d..5025369907e46c7e9b226639f6066220590f333d 100644 (file)
@@ -162,14 +162,8 @@ public class Shard extends RaftActor {
                 datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS) / 2;
     }
 
-    public static Props props(final ShardIdentifier name,
-        final Map<String, String> peerAddresses,
-        final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
-        Preconditions.checkNotNull(name, "name should not be null");
-        Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");
-        Preconditions.checkNotNull(datastoreContext, "dataStoreContext should not be null");
-        Preconditions.checkNotNull(schemaContext, "schemaContext should not be null");
-
+    public static Props props(final ShardIdentifier name, final Map<String, String> peerAddresses,
+            final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
         return Props.create(new ShardCreator(name, peerAddresses, datastoreContext, schemaContext));
     }
 
@@ -291,7 +285,7 @@ public class Shard extends RaftActor {
                 leaderPayloadVersion);
     }
 
-    private void onDatastoreContext(DatastoreContext context) {
+    protected void onDatastoreContext(DatastoreContext context) {
         datastoreContext = context;
 
         commitCoordinator.setQueueCapacity(datastoreContext.getShardTransactionCommitQueueCapacity());
@@ -414,6 +408,16 @@ public class Shard extends RaftActor {
         getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(errMessage, persistenceId())), getSelf());
     }
 
+    protected void handleBatchedModificationsLocal(BatchedModifications batched, ActorRef sender) {
+        try {
+            commitCoordinator.handleBatchedModifications(batched, sender, this);
+        } catch (Exception e) {
+            LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(),
+                    batched.getTransactionID(), e);
+            sender.tell(new akka.actor.Status.Failure(e), getSelf());
+        }
+    }
+
     private void handleBatchedModifications(BatchedModifications batched) {
         // This message is sent to prepare the modifications transaction directly on the Shard as an
         // optimization to avoid the extra overhead of a separate ShardTransaction actor. On the last
@@ -430,13 +434,7 @@ public class Shard extends RaftActor {
         if(isLeader()) {
             failIfIsolatedLeader(getSender());
 
-            try {
-                commitCoordinator.handleBatchedModifications(batched, getSender(), this);
-            } catch (Exception e) {
-                LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(),
-                        batched.getTransactionID(), e);
-                getSender().tell(new akka.actor.Status.Failure(e), getSelf());
-            }
+            handleBatchedModificationsLocal(batched, getSender());
         } else {
             ActorSelection leader = getLeader();
             if(leader != null) {
@@ -452,7 +450,7 @@ public class Shard extends RaftActor {
     }
 
     private boolean failIfIsolatedLeader(ActorRef sender) {
-        if(getRaftState() == RaftState.IsolatedLeader) {
+        if(isIsolatedLeader()) {
             sender.tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format(
                     "Shard %s was the leader but has lost contact with all of its followers. Either all" +
                     " other follower nodes are down or this node is isolated by a network partition.",
@@ -463,6 +461,10 @@ public class Shard extends RaftActor {
         return false;
     }
 
+    protected boolean isIsolatedLeader() {
+        return getRaftState() == RaftState.IsolatedLeader;
+    }
+
     private void handleReadyLocalTransaction(final ReadyLocalTransaction message) {
         if (isLeader()) {
             failIfIsolatedLeader(getSender());
@@ -690,22 +692,33 @@ public class Shard extends RaftActor {
         return commitCoordinator;
     }
 
+    public DatastoreContext getDatastoreContext() {
+        return datastoreContext;
+    }
 
-    private static class ShardCreator implements Creator<Shard> {
-
+    protected abstract static class AbstractShardCreator implements Creator<Shard> {
         private static final long serialVersionUID = 1L;
 
-        final ShardIdentifier name;
-        final Map<String, String> peerAddresses;
-        final DatastoreContext datastoreContext;
-        final SchemaContext schemaContext;
+        protected final ShardIdentifier name;
+        protected final Map<String, String> peerAddresses;
+        protected final DatastoreContext datastoreContext;
+        protected final SchemaContext schemaContext;
+
+        protected AbstractShardCreator(final ShardIdentifier name, final Map<String, String> peerAddresses,
+                final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
+            this.name = Preconditions.checkNotNull(name, "name should not be null");
+            this.peerAddresses = Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");
+            this.datastoreContext = Preconditions.checkNotNull(datastoreContext, "dataStoreContext should not be null");
+            this.schemaContext = Preconditions.checkNotNull(schemaContext, "schemaContext should not be null");
+        }
+    }
+
+    private static class ShardCreator extends AbstractShardCreator {
+        private static final long serialVersionUID = 1L;
 
         ShardCreator(final ShardIdentifier name, final Map<String, String> peerAddresses,
                 final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
-            this.name = name;
-            this.peerAddresses = peerAddresses;
-            this.datastoreContext = datastoreContext;
-            this.schemaContext = schemaContext;
+            super(name, peerAddresses, datastoreContext, schemaContext);
         }
 
         @Override