Refactor odl-mdsal-distributed-datastore
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / LeaderFrontendState.java
index 8704f2ab0cd9d33e02ab6b53a65d1aa5733fc60b..2a1537bd28ab1ca8f91b23da68ce81581af24cf5 100644 (file)
@@ -7,16 +7,13 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import static java.util.Objects.requireNonNull;
+
 import com.google.common.base.MoreObjects;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Range;
-import com.google.common.collect.RangeSet;
-import com.google.common.collect.TreeRangeSet;
-import com.google.common.primitives.UnsignedLong;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
-import javax.annotation.Nullable;
-import javax.annotation.concurrent.NotThreadSafe;
+import org.eclipse.jdt.annotation.Nullable;
 import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
 import org.opendaylight.controller.cluster.access.commands.DeadHistoryException;
 import org.opendaylight.controller.cluster.access.commands.DestroyLocalHistoryRequest;
@@ -32,17 +29,18 @@ import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifie
 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
 import org.opendaylight.controller.cluster.access.concepts.RequestException;
 import org.opendaylight.controller.cluster.access.concepts.UnsupportedRequestException;
+import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort.State;
+import org.opendaylight.controller.cluster.datastore.utils.UnsignedLongRangeSet;
 import org.opendaylight.yangtools.concepts.Identifiable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Frontend state as observed by the shard leader. This class is responsible for tracking generations and sequencing
- * in the frontend/backend conversation.
+ * in the frontend/backend conversation. This class is NOT thread-safe.
  *
  * @author Robert Varga
  */
-@NotThreadSafe
 final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
     private static final Logger LOG = LoggerFactory.getLogger(LeaderFrontendState.class);
 
@@ -50,7 +48,7 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
     private final Map<LocalHistoryIdentifier, LocalFrontendHistory> localHistories;
 
     // RangeSet performs automatic merging, hence we keep minimal state tracking information
-    private final RangeSet<UnsignedLong> purgedHistories;
+    private final UnsignedLongRangeSet purgedHistories;
 
     // Used for all standalone transactions
     private final AbstractFrontendHistory standaloneHistory;
@@ -58,6 +56,8 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
     private final ClientIdentifier clientId;
     private final String persistenceId;
 
+    private long lastConnectTicks;
+    private long lastSeenTicks;
     private long expectedTxSequence;
     private Long lastSeenHistory = null;
 
@@ -71,19 +71,20 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
     // - per-RequestException throw counters
 
     LeaderFrontendState(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree) {
-        this(persistenceId, clientId, tree, TreeRangeSet.create(), StandaloneFrontendHistory.create(persistenceId,
-            clientId, tree), new HashMap<>());
+        this(persistenceId, clientId, tree, UnsignedLongRangeSet.create(),
+            StandaloneFrontendHistory.create(persistenceId, clientId, tree), new HashMap<>());
     }
 
     LeaderFrontendState(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree,
-        final RangeSet<UnsignedLong> purgedHistories, final AbstractFrontendHistory standaloneHistory,
+        final UnsignedLongRangeSet purgedHistories, final AbstractFrontendHistory standaloneHistory,
         final Map<LocalHistoryIdentifier, LocalFrontendHistory> localHistories) {
-        this.persistenceId = Preconditions.checkNotNull(persistenceId);
-        this.clientId = Preconditions.checkNotNull(clientId);
-        this.tree = Preconditions.checkNotNull(tree);
-        this.purgedHistories = Preconditions.checkNotNull(purgedHistories);
-        this.standaloneHistory = Preconditions.checkNotNull(standaloneHistory);
-        this.localHistories = Preconditions.checkNotNull(localHistories);
+        this.persistenceId = requireNonNull(persistenceId);
+        this.clientId = requireNonNull(clientId);
+        this.tree = requireNonNull(tree);
+        this.purgedHistories = requireNonNull(purgedHistories);
+        this.standaloneHistory = requireNonNull(standaloneHistory);
+        this.localHistories = requireNonNull(localHistories);
+        this.lastSeenTicks = tree.readTime();
     }
 
     @Override
@@ -107,7 +108,7 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
 
         try {
             if (request instanceof CreateLocalHistoryRequest) {
-                return handleCreateHistory((CreateLocalHistoryRequest) request);
+                return handleCreateHistory((CreateLocalHistoryRequest) request, envelope, now);
             } else if (request instanceof DestroyLocalHistoryRequest) {
                 return handleDestroyHistory((DestroyLocalHistoryRequest) request, envelope, now);
             } else if (request instanceof PurgeLocalHistoryRequest) {
@@ -121,35 +122,41 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
         }
     }
 
-    private LocalHistorySuccess handleCreateHistory(final CreateLocalHistoryRequest request) throws RequestException {
-        final LocalHistoryIdentifier id = request.getTarget();
-        final AbstractFrontendHistory existing = localHistories.get(id);
+    private LocalHistorySuccess handleCreateHistory(final CreateLocalHistoryRequest request,
+            final RequestEnvelope envelope, final long now) throws RequestException {
+        final LocalHistoryIdentifier historyId = request.getTarget();
+        final AbstractFrontendHistory existing = localHistories.get(historyId);
         if (existing != null) {
             // History already exists: report success
-            LOG.debug("{}: history {} already exists", persistenceId, id);
-            return new LocalHistorySuccess(id, request.getSequence());
+            LOG.debug("{}: history {} already exists", persistenceId, historyId);
+            return new LocalHistorySuccess(historyId, request.getSequence());
         }
 
         // We have not found the history. Before we create it we need to check history ID sequencing so that we do not
         // end up resurrecting a purged history.
-        if (purgedHistories.contains(UnsignedLong.fromLongBits(id.getHistoryId()))) {
+        if (purgedHistories.contains(historyId.getHistoryId())) {
             LOG.debug("{}: rejecting purged request {}", persistenceId, request);
-            throw new DeadHistoryException(purgedHistories);
+            throw new DeadHistoryException(purgedHistories.toImmutable());
         }
 
         // Update last history we have seen
-        if (lastSeenHistory == null || Long.compareUnsigned(lastSeenHistory, id.getHistoryId()) < 0) {
-            lastSeenHistory = id.getHistoryId();
+        if (lastSeenHistory == null || Long.compareUnsigned(lastSeenHistory, historyId.getHistoryId()) < 0) {
+            lastSeenHistory = historyId.getHistoryId();
         }
 
-        localHistories.put(id, LocalFrontendHistory.create(persistenceId, tree, id));
-        LOG.debug("{}: created history {}", persistenceId, id);
-        return new LocalHistorySuccess(id, request.getSequence());
+        // We have to send the response only after persistence has completed
+        final ShardDataTreeTransactionChain chain = tree.ensureTransactionChain(historyId, () -> {
+            LOG.debug("{}: persisted history {}", persistenceId, historyId);
+            envelope.sendSuccess(new LocalHistorySuccess(historyId, request.getSequence()), tree.readTime() - now);
+        });
+
+        localHistories.put(historyId, LocalFrontendHistory.create(persistenceId, tree, chain));
+        LOG.debug("{}: created history {}", persistenceId, historyId);
+        return null;
     }
 
     private LocalHistorySuccess handleDestroyHistory(final DestroyLocalHistoryRequest request,
-            final RequestEnvelope envelope, final long now)
-            throws RequestException {
+            final RequestEnvelope envelope, final long now) {
         final LocalHistoryIdentifier id = request.getTarget();
         final LocalFrontendHistory existing = localHistories.get(id);
         if (existing == null) {
@@ -163,7 +170,7 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
     }
 
     private LocalHistorySuccess handlePurgeHistory(final PurgeLocalHistoryRequest request,
-            final RequestEnvelope envelope, final long now) throws RequestException {
+            final RequestEnvelope envelope, final long now) {
         final LocalHistoryIdentifier id = request.getTarget();
         final LocalFrontendHistory existing = localHistories.remove(id);
         if (existing == null) {
@@ -172,7 +179,7 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
         }
 
         LOG.debug("{}: purging history {}", persistenceId, id);
-        purgedHistories.add(Range.singleton(UnsignedLong.fromLongBits(id.getHistoryId())));
+        purgedHistories.add(id.getHistoryId());
         existing.purge(request.getSequence(), envelope, now);
         return null;
     }
@@ -188,9 +195,9 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
             if (lhId.getHistoryId() != 0) {
                 history = localHistories.get(lhId);
                 if (history == null) {
-                    if (purgedHistories.contains(UnsignedLong.fromLongBits(lhId.getHistoryId()))) {
+                    if (purgedHistories.contains(lhId.getHistoryId())) {
                         LOG.warn("{}: rejecting request {} to purged history", persistenceId, request);
-                        throw new DeadHistoryException(purgedHistories);
+                        throw new DeadHistoryException(purgedHistories.toImmutable());
                     }
 
                     LOG.warn("{}: rejecting unknown history request {}", persistenceId, request);
@@ -208,15 +215,49 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
 
     void reconnect() {
         expectedTxSequence = 0;
+        lastConnectTicks = tree.readTime();
     }
 
     void retire() {
-        // FIXME: flush all state
+        // Hunt down any transactions associated with this frontend
+        final Iterator<SimpleShardDataTreeCohort> it = tree.cohortIterator();
+        while (it.hasNext()) {
+            final SimpleShardDataTreeCohort cohort = it.next();
+            if (clientId.equals(cohort.getIdentifier().getHistoryId().getClientId())) {
+                if (cohort.getState() != State.COMMIT_PENDING) {
+                    LOG.debug("{}: Retiring transaction {}", persistenceId, cohort.getIdentifier());
+                    it.remove();
+                } else {
+                    LOG.debug("{}: Transaction {} already committing, not retiring it", persistenceId,
+                        cohort.getIdentifier());
+                }
+            }
+        }
+
+        // Clear out all transaction chains
+        localHistories.values().forEach(AbstractFrontendHistory::retire);
+        localHistories.clear();
+        standaloneHistory.retire();
+    }
+
+    long getLastConnectTicks() {
+        return lastConnectTicks;
+    }
+
+    long getLastSeenTicks() {
+        return lastSeenTicks;
+    }
+
+    void touch() {
+        this.lastSeenTicks = tree.readTime();
     }
 
     @Override
     public String toString() {
-        return MoreObjects.toStringHelper(LeaderFrontendState.class).add("clientId", clientId)
-                .add("purgedHistories", purgedHistories).toString();
+        return MoreObjects.toStringHelper(LeaderFrontendState.class)
+                .add("clientId", clientId)
+                .add("nanosAgo", tree.readTime() - lastSeenTicks)
+                .add("purgedHistories", purgedHistories)
+                .toString();
     }
 }