X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FAbstractClientHistory.java;h=1be84643350acdd197f5ab5dfe10dfcf558b146e;hb=d6ed0a044d591d65847714451d97d80345154089;hp=519763ac021989df012cb6ac05eba80140acdfb3;hpb=b4d95acff78952020e9fbde4372d13b461fd7469;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java index 519763ac02..1be8464335 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java @@ -49,7 +49,7 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia AtomicReferenceFieldUpdater.newUpdater(AbstractClientHistory.class, State.class, "state"); @GuardedBy("this") - private final Map openTransactions = new HashMap<>(); + private final Map> openTransactions = new HashMap<>(); @GuardedBy("this") private final Map readyTransactions = new HashMap<>(); @@ -99,7 +99,7 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia LOG.debug("Force-closing history {}", getIdentifier(), cause); synchronized (this) { - for (ClientTransaction t : openTransactions.values()) { + for (AbstractClientHandle t : openTransactions.values()) { t.localAbort(cause); } openTransactions.clear(); @@ -136,32 +136,41 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia LOG.debug("Create history response {}", response); } - final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier transactionId, final Long shard) { + private ProxyHistory ensureHistoryProxy(final TransactionIdentifier transactionId, final Long shard) { while (true) { - final ProxyHistory history; try { - history = histories.computeIfAbsent(shard, this::createHistoryProxy); + return histories.computeIfAbsent(shard, this::createHistoryProxy); } catch (InversibleLockException e) { LOG.trace("Waiting for transaction {} shard {} connection to resolve", transactionId, shard); e.awaitResolution(); LOG.trace("Retrying transaction {} shard {} connection", transactionId, shard); - continue; } + } + } + + final AbstractProxyTransaction createSnapshotProxy(final TransactionIdentifier transactionId, final Long shard) { + return ensureHistoryProxy(transactionId, shard).createTransactionProxy(transactionId, true); + } + + final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier transactionId, final Long shard) { + return ensureHistoryProxy(transactionId, shard).createTransactionProxy(transactionId, false); + } - return history.createTransactionProxy(transactionId); + private void checkNotClosed() { + if (state == State.CLOSED) { + throw new TransactionChainClosedException(String.format("Local history %s is closed", identifier)); } } /** - * Allocate a {@link ClientTransaction}. + * Allocate a new {@link ClientTransaction}. * * @return A new {@link ClientTransaction} * @throws TransactionChainClosedException if this history is closed + * @throws IllegalStateException if a previous dependent transaction has not been closed */ public final ClientTransaction createTransaction() { - if (state == State.CLOSED) { - throw new TransactionChainClosedException(String.format("Local history %s is closed", identifier)); - } + checkNotClosed(); synchronized (this) { final ClientTransaction ret = doCreateTransaction(); @@ -170,6 +179,26 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia } } + /** + * Create a new {@link ClientSnapshot}. + * + * @return A new {@link ClientSnapshot} + * @throws TransactionChainClosedException if this history is closed + * @throws IllegalStateException if a previous dependent transaction has not been closed + */ + public final ClientSnapshot takeSnapshot() { + checkNotClosed(); + + synchronized (this) { + final ClientSnapshot ret = doCreateSnapshot(); + openTransactions.put(ret.getIdentifier(), ret); + return ret; + } + } + + @GuardedBy("this") + abstract ClientSnapshot doCreateSnapshot(); + @GuardedBy("this") abstract ClientTransaction doCreateTransaction(); @@ -179,10 +208,12 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia * @param txId Transaction identifier * @param cohort Transaction commit cohort */ - synchronized AbstractTransactionCommitCohort onTransactionReady(final TransactionIdentifier txId, + synchronized AbstractTransactionCommitCohort onTransactionReady(final ClientTransaction tx, final AbstractTransactionCommitCohort cohort) { - final ClientTransaction tx = openTransactions.remove(txId); - Preconditions.checkState(tx != null, "Failed to find open transaction for %s", txId); + final TransactionIdentifier txId = tx.getIdentifier(); + if (openTransactions.remove(txId) == null) { + LOG.warn("Transaction {} not recorded, proceeding with readiness", txId); + } final AbstractTransactionCommitCohort previous = readyTransactions.putIfAbsent(txId, cohort); Preconditions.checkState(previous == null, "Duplicate cohort %s for transaction %s, already have %s", @@ -196,11 +227,11 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia * Callback invoked from {@link ClientTransaction} when a child transaction has been aborted without touching * backend. * - * @param txId transaction identifier + * @param snapshot transaction identifier */ - synchronized void onTransactionAbort(final TransactionIdentifier txId) { - if (openTransactions.remove(txId) == null) { - LOG.warn("Could not find aborting transaction {}", txId); + synchronized void onTransactionAbort(final AbstractClientHandle snapshot) { + if (openTransactions.remove(snapshot.getIdentifier()) == null) { + LOG.warn("Could not find aborting transaction {}", snapshot.getIdentifier()); } }