X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FLocalProxyTransaction.java;h=02f8e5a9532abd1a8ad45c2824c205694177a762;hb=7933189a19f5c0f58b91baea300a2f512aac154f;hp=9f4b18eaaa73e07d91d12d2f3639c202a1eee436;hpb=225fa198c5964a505695afb60f4adbcd9e9bebf5;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java index 9f4b18eaaa..02f8e5a953 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java @@ -70,7 +70,7 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction { abstract DataTreeSnapshot readOnlyView(); - abstract void applyModifyTransactionRequest(ModifyTransactionRequest request, + abstract void applyForwardedModifyTransactionRequest(ModifyTransactionRequest request, @Nullable Consumer> callback); abstract void replayModifyTransactionRequest(ModifyTransactionRequest request, @@ -87,9 +87,8 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction { } @Override - final void doAbort() { - sendAbort(new AbortLocalTransactionRequest(identifier, localActor()), - response -> LOG.debug("Transaction {} abort completed with {}", identifier, response)); + final AbortLocalTransactionRequest abortRequest() { + return new AbortLocalTransactionRequest(identifier, localActor()); } @Override @@ -104,15 +103,27 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction { private boolean handleReadRequest(final TransactionRequest request, final @Nullable Consumer> callback) { + // Note we delay completion of read requests to limit the scope at which the client can run, as they have + // listeners, which we do not want to execute while we are reconnecting. if (request instanceof ReadTransactionRequest) { final YangInstanceIdentifier path = ((ReadTransactionRequest) request).getPath(); final Optional> result = readOnlyView().readNode(path); - callback.accept(new ReadTransactionSuccess(request.getTarget(), request.getSequence(), result)); + if (callback != null) { + // XXX: FB does not see that callback is final, on stack and has be check for non-null. + final Consumer> fbIsStupid = Preconditions.checkNotNull(callback); + executeInActor(() -> fbIsStupid.accept(new ReadTransactionSuccess(request.getTarget(), + request.getSequence(), result))); + } return true; } else if (request instanceof ExistsTransactionRequest) { final YangInstanceIdentifier path = ((ExistsTransactionRequest) request).getPath(); final boolean result = readOnlyView().readNode(path).isPresent(); - callback.accept(new ExistsTransactionSuccess(request.getTarget(), request.getSequence(), result)); + if (callback != null) { + // XXX: FB does not see that callback is final, on stack and has be check for non-null. + final Consumer> fbIsStupid = Preconditions.checkNotNull(callback); + executeInActor(() -> fbIsStupid.accept(new ExistsTransactionSuccess(request.getTarget(), + request.getSequence(), result))); + } return true; } else { return false; @@ -127,7 +138,7 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction { } else if (handleReadRequest(request, callback)) { // No-op } else if (request instanceof TransactionPurgeRequest) { - enqueuePurge(enqueuedTicks); + enqueuePurge(callback, enqueuedTicks); } else if (request instanceof IncrementTransactionSequenceRequest) { // Local transactions do not have non-replayable requests which would be visible to the backend, // hence we can skip sequence increments. @@ -147,11 +158,11 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction { */ void handleForwardedRemoteRequest(final TransactionRequest request, final Consumer> callback) { if (request instanceof ModifyTransactionRequest) { - applyModifyTransactionRequest((ModifyTransactionRequest) request, callback); + applyForwardedModifyTransactionRequest((ModifyTransactionRequest) request, callback); } else if (handleReadRequest(request, callback)) { // No-op } else if (request instanceof TransactionPurgeRequest) { - sendPurge(); + enqueuePurge(callback); } else { throw new IllegalArgumentException("Unhandled request " + request); } @@ -191,7 +202,7 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction { successor.abort(); } else if (request instanceof TransactionPurgeRequest) { LOG.debug("Forwarding purge {} to successor {}", request, successor); - successor.sendPurge(); + successor.enqueuePurge(callback); } else { throw new IllegalArgumentException("Unhandled request" + request); } @@ -203,7 +214,7 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction { if (request instanceof AbortLocalTransactionRequest) { successor.sendAbort(request, callback); } else if (request instanceof TransactionPurgeRequest) { - successor.sendPurge(); + successor.enqueuePurge(callback); } else { throw new IllegalArgumentException("Unhandled request" + request); }