Code Review
/
controller.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
review
|
tree
raw
|
inline
| side by side
BUG-8618: rework AbstractProxyTransaction.flushState()
[controller.git]
/
opendaylight
/
md-sal
/
sal-distributed-datastore
/
src
/
main
/
java
/
org
/
opendaylight
/
controller
/
cluster
/
databroker
/
actors
/
dds
/
AbstractProxyTransaction.java
diff --git
a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java
b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java
index 07b89e09230949da6c4849b3fb5dc03d4c3c36d8..51f528150d22a717034cc4281618591645cca92f 100644
(file)
--- a/
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java
+++ b/
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java
@@
-11,7
+11,6
@@
import akka.actor.ActorRef;
import com.google.common.base.MoreObjects;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.MoreObjects;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
import com.google.common.base.Verify;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.base.Verify;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.CheckedFuture;
@@
-32,6
+31,7
@@
import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.ClosedTransactionException;
import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest;
import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.ClosedTransactionException;
import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest;
+import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess;
import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess;
@@
-138,7
+138,7
@@
abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
latch.await();
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting for latch of {}", successor);
latch.await();
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting for latch of {}", successor);
- throw
Throwables.propagate
(e);
+ throw
new RuntimeException
(e);
}
return successor;
}
}
return successor;
}
@@
-351,7
+351,10
@@
abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
// At this point the successor has completed transition and is possibly visible by the user thread, which is
// still stuck here. The successor has not seen final part of our state, nor the fact it is sealed.
// Propagate state and seal the successor.
// At this point the successor has completed transition and is possibly visible by the user thread, which is
// still stuck here. The successor has not seen final part of our state, nor the fact it is sealed.
// Propagate state and seal the successor.
- flushState(successor);
+ final java.util.Optional<ModifyTransactionRequest> optState = flushState();
+ if (optState.isPresent()) {
+ forwardToSuccessor(successor, optState.get(), null);
+ }
successor.predecessorSealed();
}
successor.predecessorSealed();
}
@@
-364,7
+367,7
@@
abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
void sealOnly() {
parent.onTransactionSealed(this);
final boolean success = STATE_UPDATER.compareAndSet(this, OPEN, SEALED);
void sealOnly() {
parent.onTransactionSealed(this);
final boolean success = STATE_UPDATER.compareAndSet(this, OPEN, SEALED);
- Verify.verify(success, "Attempted to replay seal on
{}
", this);
+ Verify.verify(success, "Attempted to replay seal on
%s
", this);
}
/**
}
/**
@@
-719,9
+722,13
@@
abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
*/
if (SEALED.equals(prevState)) {
LOG.debug("Proxy {} reconnected while being sealed, propagating state to successor {}", this, successor);
*/
if (SEALED.equals(prevState)) {
LOG.debug("Proxy {} reconnected while being sealed, propagating state to successor {}", this, successor);
- flushState(successor);
+ final long enqueuedTicks = parent.currentTime();
+ final java.util.Optional<ModifyTransactionRequest> optState = flushState();
+ if (optState.isPresent()) {
+ successor.handleReplayedRemoteRequest(optState.get(), null, enqueuedTicks);
+ }
if (successor.markSealed()) {
if (successor.markSealed()) {
- successor.sealAndSend(Optional.of(
parent.currentTime()
));
+ successor.sealAndSend(Optional.of(
enqueuedTicks
));
}
}
}
}
}
}
@@
-795,7
+802,7
@@
abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
abstract CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> doRead(YangInstanceIdentifier path);
@GuardedBy("this")
abstract CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> doRead(YangInstanceIdentifier path);
@GuardedBy("this")
- abstract
void flushState(AbstractProxyTransaction successor
);
+ abstract
java.util.Optional<ModifyTransactionRequest> flushState(
);
abstract TransactionRequest<?> abortRequest();
abstract TransactionRequest<?> abortRequest();