X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FTransactionProxy.java;h=715f48c3492156d1b14005462da2c26aacb1768c;hb=25bcd8a39d1c06f45d7f567d1f240276c7007310;hp=19d9a66a528eb417d5bff41948641b68e9c8e481;hpb=b06d2c5bbffa48b1e219ac92cf0be60528aff34a;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index 19d9a66a52..715f48c349 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -8,7 +8,6 @@ package org.opendaylight.controller.cluster.datastore; -import akka.actor.ActorPath; import akka.actor.ActorSelection; import akka.dispatch.OnComplete; @@ -158,7 +157,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { for(ActorSelection actor : remoteTransactionActors) { LOG.trace("Sending CloseTransaction to {}", actor); actorContext.sendOperationAsync(actor, - new CloseTransaction().toSerializable()); + new CloseTransaction().toSerializable()); } } } @@ -315,7 +314,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { LOG.debug("Tx {} Trying to get {} transactions ready for commit", identifier, remoteTransactionPaths.size()); } - List> cohortPathFutures = Lists.newArrayList(); + List> cohortFutures = Lists.newArrayList(); for(TransactionContext transactionContext : remoteTransactionPaths.values()) { @@ -323,14 +322,14 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { LOG.debug("Tx {} Readying transaction for shard {}", identifier, transactionContext.getShardName()); } - cohortPathFutures.add(transactionContext.readyTransaction()); + cohortFutures.add(transactionContext.readyTransaction()); } if(transactionChainProxy != null){ - transactionChainProxy.onTransactionReady(cohortPathFutures); + transactionChainProxy.onTransactionReady(cohortFutures); } - return new ThreePhaseCommitCohortProxy(actorContext, cohortPathFutures, + return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, identifier.toString()); } @@ -386,8 +385,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { } Object response = actorContext.executeOperation(primaryShard.get(), - new CreateTransaction(identifier.toString(), this.transactionType.ordinal(), - getTransactionChainId()).toSerializable()); + new CreateTransaction(identifier.toString(), this.transactionType.ordinal(), + getTransactionChainId()).toSerializable()); if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) { CreateTransactionReply reply = CreateTransactionReply.fromSerializable(response); @@ -409,8 +408,12 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { remoteTransactionActorsMB.set(true); } + // TxActor is always created where the leader of the shard is. + // Check if TxActor is created in the same node + boolean isTxActorLocal = actorContext.isLocalPath(transactionPath); + transactionContext = new TransactionContextImpl(shardName, transactionPath, - transactionActor, identifier, actorContext, schemaContext); + transactionActor, identifier, actorContext, schemaContext, isTxActorLocal); remoteTransactionPaths.put(shardName, transactionContext); } else { @@ -439,7 +442,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { void closeTransaction(); - Future readyTransaction(); + Future readyTransaction(); void writeData(YangInstanceIdentifier path, NormalizedNode data); @@ -484,25 +487,23 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { private final SchemaContext schemaContext; private final String actorPath; private final ActorSelection actor; + private final boolean isTxActorLocal; private TransactionContextImpl(String shardName, String actorPath, ActorSelection actor, TransactionIdentifier identifier, ActorContext actorContext, - SchemaContext schemaContext) { + SchemaContext schemaContext, boolean isTxActorLocal) { super(shardName, identifier); this.actorPath = actorPath; this.actor = actor; this.actorContext = actorContext; this.schemaContext = schemaContext; + this.isTxActorLocal = isTxActorLocal; } private ActorSelection getActor() { return actor; } - private String getResolvedCohortPath(String cohortPath) { - return actorContext.resolvePath(actorPath, cohortPath); - } - @Override public void closeTransaction() { if(LOG.isDebugEnabled()) { @@ -512,15 +513,16 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { } @Override - public Future readyTransaction() { + public Future readyTransaction() { if(LOG.isDebugEnabled()) { LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending", identifier, recordedOperationFutures.size()); } // Send the ReadyTransaction message to the Tx actor. + ReadyTransaction readyTransaction = new ReadyTransaction(); final Future replyFuture = actorContext.executeOperationAsync(getActor(), - new ReadyTransaction().toSerializable()); + isTxActorLocal ? readyTransaction : readyTransaction.toSerializable()); // Combine all the previously recorded put/merge/delete operation reply Futures and the // ReadyTransactionReply Future into one Future. If any one fails then the combined @@ -538,9 +540,9 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { // Transform the combined Future into a Future that returns the cohort actor path from // the ReadyTransactionReply. That's the end result of the ready operation. - return combinedFutures.transform(new AbstractFunction1, ActorPath>() { + return combinedFutures.transform(new AbstractFunction1, ActorSelection>() { @Override - public ActorPath apply(Iterable notUsed) { + public ActorSelection apply(Iterable notUsed) { if(LOG.isDebugEnabled()) { LOG.debug("Tx {} readyTransaction: pending recorded operations succeeded", identifier); @@ -554,22 +556,15 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { // Note the Future get call here won't block as it's complete. Object serializedReadyReply = replyFuture.value().get().get(); - if(serializedReadyReply.getClass().equals( - ReadyTransactionReply.SERIALIZABLE_CLASS)) { - ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable( - actorContext.getActorSystem(), serializedReadyReply); + if (serializedReadyReply instanceof ReadyTransactionReply) { + return actorContext.actorSelection(((ReadyTransactionReply)serializedReadyReply).getCohortPath()); - String resolvedCohortPath = getResolvedCohortPath( - reply.getCohortPath().toString()); + } else if(serializedReadyReply.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) { + ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(serializedReadyReply); + return actorContext.actorSelection(reply.getCohortPath()); - if(LOG.isDebugEnabled()) { - LOG.debug("Tx {} readyTransaction: resolved cohort path {}", - identifier, resolvedCohortPath); - } - return actorContext.actorFor(resolvedCohortPath); } else { // Throwing an exception here will fail the Future. - throw new IllegalArgumentException(String.format("Invalid reply type {}", serializedReadyReply.getClass())); } @@ -582,8 +577,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { if(LOG.isDebugEnabled()) { LOG.debug("Tx {} deleteData called path = {}", identifier, path); } + + DeleteData deleteData = new DeleteData(path); recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(), - new DeleteData(path).toSerializable())); + isTxActorLocal ? deleteData : deleteData.toSerializable())); } @Override @@ -591,8 +588,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { if(LOG.isDebugEnabled()) { LOG.debug("Tx {} mergeData called path = {}", identifier, path); } + + MergeData mergeData = new MergeData(path, data, schemaContext); recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(), - new MergeData(path, data, schemaContext).toSerializable())); + isTxActorLocal ? mergeData : mergeData.toSerializable())); } @Override @@ -600,8 +599,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { if(LOG.isDebugEnabled()) { LOG.debug("Tx {} writeData called path = {}", identifier, path); } + + WriteData writeData = new WriteData(path, data, schemaContext); recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(), - new WriteData(path, data, schemaContext).toSerializable())); + isTxActorLocal ? writeData : writeData.toSerializable())); } @Override @@ -631,6 +632,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { Future> combinedFutures = akka.dispatch.Futures.sequence( Lists.newArrayList(recordedOperationFutures), actorContext.getActorSystem().dispatcher()); + OnComplete> onComplete = new OnComplete>() { @Override public void onComplete(Throwable failure, Iterable notUsed) @@ -675,25 +677,27 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { if(LOG.isDebugEnabled()) { LOG.debug("Tx {} read operation succeeded", identifier, failure); } - if (readResponse.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) { - ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext, - path, readResponse); - if (reply.getNormalizedNode() == null) { - returnFuture.set(Optional.>absent()); - } else { - returnFuture.set(Optional.>of( - reply.getNormalizedNode())); - } + + if (readResponse instanceof ReadDataReply) { + ReadDataReply reply = (ReadDataReply) readResponse; + returnFuture.set(Optional.>fromNullable(reply.getNormalizedNode())); + + } else if (readResponse.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) { + ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext, path, readResponse); + returnFuture.set(Optional.>fromNullable(reply.getNormalizedNode())); + } else { returnFuture.setException(new ReadFailedException( - "Invalid response reading data for path " + path)); + "Invalid response reading data for path " + path)); } } } }; + ReadData readData = new ReadData(path); Future readFuture = actorContext.executeOperationAsync(getActor(), - new ReadData(path).toSerializable()); + isTxActorLocal ? readData : readData.toSerializable()); + readFuture.onComplete(onComplete, actorContext.getActorSystem().dispatcher()); } @@ -768,9 +772,13 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { if(LOG.isDebugEnabled()) { LOG.debug("Tx {} dataExists operation succeeded", identifier, failure); } - if (response.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) { - returnFuture.set(Boolean.valueOf(DataExistsReply. - fromSerializable(response).exists())); + + if (response instanceof DataExistsReply) { + returnFuture.set(Boolean.valueOf(((DataExistsReply) response).exists())); + + } else if (response.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) { + returnFuture.set(Boolean.valueOf(DataExistsReply.fromSerializable(response).exists())); + } else { returnFuture.setException(new ReadFailedException( "Invalid response checking exists for path " + path)); @@ -779,8 +787,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { } }; + DataExists dataExists = new DataExists(path); Future future = actorContext.executeOperationAsync(getActor(), - new DataExists(path).toSerializable()); + isTxActorLocal ? dataExists : dataExists.toSerializable()); + future.onComplete(onComplete, actorContext.getActorSystem().dispatcher()); } } @@ -805,7 +815,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { } @Override - public Future readyTransaction() { + public Future readyTransaction() { if(LOG.isDebugEnabled()) { LOG.debug("Tx {} readyTransaction called", identifier); }