Code Review
/
controller.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
review
|
tree
raw
|
inline
| side by side
BUG-8704: rework seal mechanics to not wait during replay
[controller.git]
/
opendaylight
/
md-sal
/
sal-distributed-datastore
/
src
/
main
/
java
/
org
/
opendaylight
/
controller
/
cluster
/
databroker
/
actors
/
dds
/
RemoteProxyTransaction.java
diff --git
a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java
b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java
index 3b5f80ffe8bc02ee267620f7bfafdbcd1cf01a2a..1ba96426df75feb7e38e4db0f217c8c4f98708b8 100644
(file)
--- a/
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java
+++ b/
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java
@@
-146,9
+146,7
@@
final class RemoteProxyTransaction extends AbstractProxyTransaction {
}
private void ensureFlushedBuider() {
}
private void ensureFlushedBuider() {
- if (builderBusy) {
- flushBuilder();
- }
+ ensureFlushedBuider(Optional.absent());
}
private void ensureFlushedBuider(final Optional<Long> enqueuedTicks) {
}
private void ensureFlushedBuider(final Optional<Long> enqueuedTicks) {
@@
-157,10
+155,6
@@
final class RemoteProxyTransaction extends AbstractProxyTransaction {
}
}
}
}
- private void flushBuilder() {
- flushBuilder(Optional.absent());
- }
-
private void flushBuilder(final Optional<Long> enqueuedTicks) {
final ModifyTransactionRequest request = builder.build();
builderBusy = false;
private void flushBuilder(final Optional<Long> enqueuedTicks) {
final ModifyTransactionRequest request = builder.build();
builderBusy = false;
@@
-253,28
+247,33
@@
final class RemoteProxyTransaction extends AbstractProxyTransaction {
ModifyTransactionRequest abortRequest() {
ensureInitializedBuilder();
builder.setAbort();
ModifyTransactionRequest abortRequest() {
ensureInitializedBuilder();
builder.setAbort();
- final ModifyTransactionRequest ret = builder.build();
builderBusy = false;
builderBusy = false;
- return
ret
;
+ return
builder.build()
;
}
@Override
ModifyTransactionRequest commitRequest(final boolean coordinated) {
ensureInitializedBuilder();
builder.setCommit(coordinated);
}
@Override
ModifyTransactionRequest commitRequest(final boolean coordinated) {
ensureInitializedBuilder();
builder.setCommit(coordinated);
+ builderBusy = false;
+ return builder.build();
+ }
- final ModifyTransactionRequest ret = builder.build();
+ private ModifyTransactionRequest readyRequest() {
+ ensureInitializedBuilder();
+ builder.setReady();
builderBusy = false;
builderBusy = false;
- return
ret
;
+ return
builder.build()
;
}
@Override
}
@Override
-
void doSeal(
) {
+
boolean sealAndSend(final Optional<Long> enqueuedTicks
) {
if (sendReadyOnSeal) {
ensureInitializedBuilder();
builder.setReady();
if (sendReadyOnSeal) {
ensureInitializedBuilder();
builder.setReady();
- flushBuilder();
+ flushBuilder(
enqueuedTicks
);
}
}
+ return super.sealAndSend(enqueuedTicks);
}
@Override
}
@Override
@@
-300,7
+299,11
@@
final class RemoteProxyTransaction extends AbstractProxyTransaction {
final java.util.Optional<PersistenceProtocol> maybeProto = req.getPersistenceProtocol();
if (maybeProto.isPresent()) {
final java.util.Optional<PersistenceProtocol> maybeProto = req.getPersistenceProtocol();
if (maybeProto.isPresent()) {
- ensureSealed();
+ // Persistence protocol implies we are sealed, propagate the marker, but hold off doing other actions
+ // until we know what we are going to do.
+ if (markSealed()) {
+ sealOnly();
+ }
final TransactionRequest<?> tmp;
switch (maybeProto.get()) {
final TransactionRequest<?> tmp;
switch (maybeProto.get()) {
@@
-326,7
+329,11
@@
final class RemoteProxyTransaction extends AbstractProxyTransaction {
});
break;
case READY:
});
break;
case READY:
- //no op
+ tmp = readyRequest();
+ sendRequest(tmp, resp -> {
+ recordSuccessfulRequest(tmp);
+ callback.accept(resp);
+ });
break;
default:
throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get());
break;
default:
throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get());
@@
-424,7
+431,11
@@
final class RemoteProxyTransaction extends AbstractProxyTransaction {
final java.util.Optional<PersistenceProtocol> maybeProto = req.getPersistenceProtocol();
if (maybeProto.isPresent()) {
final java.util.Optional<PersistenceProtocol> maybeProto = req.getPersistenceProtocol();
if (maybeProto.isPresent()) {
- ensureSealed();
+ // Persistence protocol implies we are sealed, propagate the marker, but hold off doing other actions
+ // until we know what we are going to do.
+ if (markSealed()) {
+ sealOnly();
+ }
final TransactionRequest<?> tmp;
switch (maybeProto.get()) {
final TransactionRequest<?> tmp;
switch (maybeProto.get()) {
@@
-450,7
+461,11
@@
final class RemoteProxyTransaction extends AbstractProxyTransaction {
}, enqueuedTicks);
break;
case READY:
}, enqueuedTicks);
break;
case READY:
- //no op
+ tmp = readyRequest();
+ enqueueRequest(tmp, resp -> {
+ recordSuccessfulRequest(tmp);
+ cb.accept(resp);
+ }, enqueuedTicks);
break;
default:
throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get());
break;
default:
throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get());