X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FLeaderFrontendState.java;h=d860bfa289e39f7b9a144aa652a5a030ec4c4546;hp=2b1812319dc32705818cf8b6bb7e75c55e5648ce;hb=8ed5603b4f3503559f2f85137c42d7605ebfd3e9;hpb=12a8fdc7dd7631c9cac90b8961120ba405ea0b95 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderFrontendState.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderFrontendState.java index 2b1812319d..d860bfa289 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderFrontendState.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderFrontendState.java @@ -22,7 +22,7 @@ import org.opendaylight.controller.cluster.access.commands.DeadHistoryException; import org.opendaylight.controller.cluster.access.commands.DestroyLocalHistoryRequest; import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest; import org.opendaylight.controller.cluster.access.commands.LocalHistorySuccess; -import org.opendaylight.controller.cluster.access.commands.OutOfOrderRequestException; +import org.opendaylight.controller.cluster.access.commands.OutOfSequenceEnvelopeException; import org.opendaylight.controller.cluster.access.commands.PurgeLocalHistoryRequest; import org.opendaylight.controller.cluster.access.commands.TransactionRequest; import org.opendaylight.controller.cluster.access.commands.TransactionSuccess; @@ -91,9 +91,9 @@ final class LeaderFrontendState implements Identifiable { return clientId; } - private void checkRequestSequence(final RequestEnvelope envelope) throws OutOfOrderRequestException { + private void checkRequestSequence(final RequestEnvelope envelope) throws OutOfSequenceEnvelopeException { if (expectedTxSequence != envelope.getTxSequence()) { - throw new OutOfOrderRequestException(expectedTxSequence); + throw new OutOfSequenceEnvelopeException(expectedTxSequence); } } @@ -107,12 +107,13 @@ final class LeaderFrontendState implements Identifiable { try { if (request instanceof CreateLocalHistoryRequest) { - return handleCreateHistory((CreateLocalHistoryRequest) request); + return handleCreateHistory((CreateLocalHistoryRequest) request, envelope, now); } else if (request instanceof DestroyLocalHistoryRequest) { return handleDestroyHistory((DestroyLocalHistoryRequest) request, envelope, now); } else if (request instanceof PurgeLocalHistoryRequest) { return handlePurgeHistory((PurgeLocalHistoryRequest)request, envelope, now); } else { + LOG.warn("{}: rejecting unsupported request {}", persistenceId, request); throw new UnsupportedRequestException(request); } } finally { @@ -120,30 +121,37 @@ final class LeaderFrontendState implements Identifiable { } } - private LocalHistorySuccess handleCreateHistory(final CreateLocalHistoryRequest request) throws RequestException { - final LocalHistoryIdentifier id = request.getTarget(); - final AbstractFrontendHistory existing = localHistories.get(id); + private LocalHistorySuccess handleCreateHistory(final CreateLocalHistoryRequest request, + final RequestEnvelope envelope, final long now) throws RequestException { + final LocalHistoryIdentifier historyId = request.getTarget(); + final AbstractFrontendHistory existing = localHistories.get(historyId); if (existing != null) { // History already exists: report success - LOG.debug("{}: history {} already exists", persistenceId, id); - return new LocalHistorySuccess(id, request.getSequence()); + LOG.debug("{}: history {} already exists", persistenceId, historyId); + return new LocalHistorySuccess(historyId, request.getSequence()); } // We have not found the history. Before we create it we need to check history ID sequencing so that we do not // end up resurrecting a purged history. - if (purgedHistories.contains(UnsignedLong.fromLongBits(id.getHistoryId()))) { + if (purgedHistories.contains(UnsignedLong.fromLongBits(historyId.getHistoryId()))) { LOG.debug("{}: rejecting purged request {}", persistenceId, request); - throw new DeadHistoryException(lastSeenHistory.longValue()); + throw new DeadHistoryException(purgedHistories); } // Update last history we have seen - if (lastSeenHistory == null || Long.compareUnsigned(lastSeenHistory, id.getHistoryId()) < 0) { - lastSeenHistory = id.getHistoryId(); + if (lastSeenHistory == null || Long.compareUnsigned(lastSeenHistory, historyId.getHistoryId()) < 0) { + lastSeenHistory = historyId.getHistoryId(); } - localHistories.put(id, LocalFrontendHistory.create(persistenceId, tree, id)); - LOG.debug("{}: created history {}", persistenceId, id); - return new LocalHistorySuccess(id, request.getSequence()); + // We have to send the response only after persistence has completed + final ShardDataTreeTransactionChain chain = tree.ensureTransactionChain(historyId, () -> { + LOG.debug("{}: persisted history {}", persistenceId, historyId); + envelope.sendSuccess(new LocalHistorySuccess(historyId, request.getSequence()), tree.readTime() - now); + }); + + localHistories.put(historyId, LocalFrontendHistory.create(persistenceId, tree, chain)); + LOG.debug("{}: created history {}", persistenceId, historyId); + return null; } private LocalHistorySuccess handleDestroyHistory(final DestroyLocalHistoryRequest request, @@ -171,7 +179,8 @@ final class LeaderFrontendState implements Identifiable { } LOG.debug("{}: purging history {}", persistenceId, id); - purgedHistories.add(Range.singleton(UnsignedLong.fromLongBits(id.getHistoryId()))); + final UnsignedLong ul = UnsignedLong.fromLongBits(id.getHistoryId()); + purgedHistories.add(Range.closedOpen(ul, UnsignedLong.ONE.plus(ul))); existing.purge(request.getSequence(), envelope, now); return null; } @@ -187,7 +196,12 @@ final class LeaderFrontendState implements Identifiable { if (lhId.getHistoryId() != 0) { history = localHistories.get(lhId); if (history == null) { - LOG.debug("{}: rejecting unknown history request {}", persistenceId, request); + if (purgedHistories.contains(UnsignedLong.fromLongBits(lhId.getHistoryId()))) { + LOG.warn("{}: rejecting request {} to purged history", persistenceId, request); + throw new DeadHistoryException(purgedHistories); + } + + LOG.warn("{}: rejecting unknown history request {}", persistenceId, request); throw new UnknownHistoryException(lastSeenHistory); } } else {