Bug 1577: Gates access to Shard actor until its initialized
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionProxy.java
index 6cf16b44268c6c16e26e0658632f61994ee33971..19d9a66a528eb417d5bff41948641b68e9c8e481 100644 (file)
@@ -22,6 +22,7 @@ import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.SettableFuture;
 
+import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
@@ -156,7 +157,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             if(remoteTransactionActorsMB.get()) {
                 for(ActorSelection actor : remoteTransactionActors) {
                     LOG.trace("Sending CloseTransaction to {}", actor);
-                    actorContext.sendRemoteOperationAsync(actor,
+                    actorContext.sendOperationAsync(actor,
                             new CloseTransaction().toSerializable());
                 }
             }
@@ -379,9 +380,14 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         }
 
         try {
-            Object response = actorContext.executeShardOperation(shardName,
-                new CreateTransaction(identifier.toString(), this.transactionType.ordinal(),
-                    getTransactionChainId()).toSerializable());
+            Optional<ActorSelection> primaryShard = actorContext.findPrimaryShard(shardName);
+            if (!primaryShard.isPresent()) {
+                throw new PrimaryNotFoundException("Primary could not be found for shard " + shardName);
+            }
+
+            Object response = actorContext.executeOperation(primaryShard.get(),
+                    new CreateTransaction(identifier.toString(), this.transactionType.ordinal(),
+                            getTransactionChainId()).toSerializable());
             if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
                 CreateTransactionReply reply =
                     CreateTransactionReply.fromSerializable(response);
@@ -502,7 +508,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             if(LOG.isDebugEnabled()) {
                 LOG.debug("Tx {} closeTransaction called", identifier);
             }
-            actorContext.sendRemoteOperationAsync(getActor(), new CloseTransaction().toSerializable());
+            actorContext.sendOperationAsync(getActor(), new CloseTransaction().toSerializable());
         }
 
         @Override
@@ -513,7 +519,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             }
             // Send the ReadyTransaction message to the Tx actor.
 
-            final Future<Object> replyFuture = actorContext.executeRemoteOperationAsync(getActor(),
+            final Future<Object> replyFuture = actorContext.executeOperationAsync(getActor(),
                     new ReadyTransaction().toSerializable());
 
             // Combine all the previously recorded put/merge/delete operation reply Futures and the
@@ -576,8 +582,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             if(LOG.isDebugEnabled()) {
                 LOG.debug("Tx {} deleteData called path = {}", identifier, path);
             }
-            recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
-                    new DeleteData(path).toSerializable() ));
+            recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(),
+                    new DeleteData(path).toSerializable()));
         }
 
         @Override
@@ -585,7 +591,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             if(LOG.isDebugEnabled()) {
                 LOG.debug("Tx {} mergeData called path = {}", identifier, path);
             }
-            recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
+            recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(),
                     new MergeData(path, data, schemaContext).toSerializable()));
         }
 
@@ -594,7 +600,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             if(LOG.isDebugEnabled()) {
                 LOG.debug("Tx {} writeData called path = {}", identifier, path);
             }
-            recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
+            recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(),
                     new WriteData(path, data, schemaContext).toSerializable()));
         }
 
@@ -686,7 +692,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                 }
             };
 
-            Future<Object> readFuture = actorContext.executeRemoteOperationAsync(getActor(),
+            Future<Object> readFuture = actorContext.executeOperationAsync(getActor(),
                     new ReadData(path).toSerializable());
             readFuture.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
         }
@@ -773,7 +779,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                 }
             };
 
-            Future<Object> future = actorContext.executeRemoteOperationAsync(getActor(),
+            Future<Object> future = actorContext.executeOperationAsync(getActor(),
                     new DataExists(path).toSerializable());
             future.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
         }