X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FLeaderFrontendState.java;h=409268501a4f5e6914b0ca61f7aadcebe8c35037;hb=583f30d1c7a8199b401c9393745c62fe27b5ced8;hp=297759b5c86ffd798b0612ce32e96fe6f7e6af25;hpb=cd2a6fa0d8fa6281be28d3c7b9828ecf4e932811;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderFrontendState.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderFrontendState.java index 297759b5c8..409268501a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderFrontendState.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderFrontendState.java @@ -47,10 +47,10 @@ final class LeaderFrontendState implements Identifiable { private static final Logger LOG = LoggerFactory.getLogger(LeaderFrontendState.class); // Histories which have not been purged - private final Map localHistories = new HashMap<>(); + private final Map localHistories; // RangeSet performs automatic merging, hence we keep minimal state tracking information - private final RangeSet purgedHistories = TreeRangeSet.create(); + private final RangeSet purgedHistories; // Used for all standalone transactions private final AbstractFrontendHistory standaloneHistory; @@ -61,7 +61,6 @@ final class LeaderFrontendState implements Identifiable { private long expectedTxSequence; private Long lastSeenHistory = null; - // TODO: explicit failover notification // Record the ActorRef for the originating actor and when we switch to being a leader send a notification // to the frontend client -- that way it can immediately start sending requests @@ -72,10 +71,19 @@ final class LeaderFrontendState implements Identifiable { // - per-RequestException throw counters LeaderFrontendState(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree) { + this(persistenceId, clientId, tree, TreeRangeSet.create(), StandaloneFrontendHistory.create(persistenceId, + clientId, tree), new HashMap<>()); + } + + LeaderFrontendState(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree, + final RangeSet purgedHistories, final AbstractFrontendHistory standaloneHistory, + final Map localHistories) { this.persistenceId = Preconditions.checkNotNull(persistenceId); this.clientId = Preconditions.checkNotNull(clientId); this.tree = Preconditions.checkNotNull(tree); - standaloneHistory = new StandaloneFrontendHistory(persistenceId, tree.ticker(), clientId, tree); + this.purgedHistories = Preconditions.checkNotNull(purgedHistories); + this.standaloneHistory = Preconditions.checkNotNull(standaloneHistory); + this.localHistories = Preconditions.checkNotNull(localHistories); } @Override @@ -101,9 +109,9 @@ final class LeaderFrontendState implements Identifiable { if (request instanceof CreateLocalHistoryRequest) { return handleCreateHistory((CreateLocalHistoryRequest) request); } else if (request instanceof DestroyLocalHistoryRequest) { - return handleDestroyHistory((DestroyLocalHistoryRequest) request, now); + return handleDestroyHistory((DestroyLocalHistoryRequest) request, envelope, now); } else if (request instanceof PurgeLocalHistoryRequest) { - return handlePurgeHistory((PurgeLocalHistoryRequest)request, now); + return handlePurgeHistory((PurgeLocalHistoryRequest)request, envelope, now); } else { throw new UnsupportedRequestException(request); } @@ -133,12 +141,13 @@ final class LeaderFrontendState implements Identifiable { lastSeenHistory = id.getHistoryId(); } - localHistories.put(id, new LocalFrontendHistory(persistenceId, tree.ticker(), tree.ensureTransactionChain(id))); + localHistories.put(id, LocalFrontendHistory.create(persistenceId, tree, id)); LOG.debug("{}: created history {}", persistenceId, id); return new LocalHistorySuccess(id, request.getSequence()); } - private LocalHistorySuccess handleDestroyHistory(final DestroyLocalHistoryRequest request, final long now) + private LocalHistorySuccess handleDestroyHistory(final DestroyLocalHistoryRequest request, + final RequestEnvelope envelope, final long now) throws RequestException { final LocalHistoryIdentifier id = request.getTarget(); final LocalFrontendHistory existing = localHistories.get(id); @@ -148,29 +157,23 @@ final class LeaderFrontendState implements Identifiable { return new LocalHistorySuccess(id, request.getSequence()); } - return existing.destroy(request.getSequence(), now); + existing.destroy(request.getSequence(), envelope, now); + return null; } - private LocalHistorySuccess handlePurgeHistory(final PurgeLocalHistoryRequest request, final long now) - throws RequestException { + private LocalHistorySuccess handlePurgeHistory(final PurgeLocalHistoryRequest request, + final RequestEnvelope envelope, final long now) throws RequestException { final LocalHistoryIdentifier id = request.getTarget(); final LocalFrontendHistory existing = localHistories.remove(id); - if (existing != null) { - purgedHistories.add(Range.singleton(UnsignedLong.fromLongBits(id.getHistoryId()))); - - if (!existing.isDestroyed()) { - LOG.warn("{}: purging undestroyed history {}", persistenceId, id); - existing.destroy(request.getSequence(), now); - } - - // FIXME: record a PURGE tombstone in the journal - - LOG.debug("{}: purged history {}", persistenceId, id); - } else { + if (existing == null) { LOG.debug("{}: history {} has already been purged", persistenceId, id); + return new LocalHistorySuccess(id, request.getSequence()); } - return new LocalHistorySuccess(id, request.getSequence()); + LOG.debug("{}: purging history {}", persistenceId, id); + purgedHistories.add(Range.singleton(UnsignedLong.fromLongBits(id.getHistoryId()))); + existing.purge(request.getSequence(), envelope, now); + return null; } @Nullable TransactionSuccess handleTransactionRequest(final TransactionRequest request,