atomic-storage: remove type dependency at segment level I/O
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / SyncStatusTracker.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft.behaviors;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static java.util.Objects.requireNonNull;
12
13 import akka.actor.ActorRef;
14 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
15 import org.slf4j.Logger;
16 import org.slf4j.LoggerFactory;
17
18 /**
19  * The SyncStatusTracker tracks if a Follower is in sync with any given Leader or not
20  * When an update is received from the Leader and the update happens to be the first update
21  * from that Leader then the SyncStatusTracker will not mark the Follower as not in-sync till the
22  * Followers commitIndex matches the commitIndex that the Leader sent in it's very first update.
23  * Subsequently when an update is received the tracker will consider the Follower to be out of
24  * sync if it is behind by 'syncThreshold' commits.
25  */
26 public class SyncStatusTracker {
27     private static final class LeaderInfo {
28         final long minimumCommitIndex;
29         final String leaderId;
30
31         LeaderInfo(final String leaderId, final long minimumCommitIndex) {
32             this.leaderId = requireNonNull(leaderId);
33             this.minimumCommitIndex = minimumCommitIndex;
34         }
35     }
36
37     private static final Logger LOG = LoggerFactory.getLogger(SyncStatusTracker.class);
38
39     private static final boolean IN_SYNC = true;
40     private static final boolean NOT_IN_SYNC = false;
41
42     private final long syncThreshold;
43     private final ActorRef actor;
44     private final String id;
45
46     private LeaderInfo syncTarget;
47     private boolean syncStatus;
48
49     public SyncStatusTracker(final ActorRef actor, final String id, final long syncThreshold) {
50         this.actor = requireNonNull(actor, "actor should not be null");
51         this.id = requireNonNull(id, "id should not be null");
52         checkArgument(syncThreshold >= 0, "syncThreshold should be greater than or equal to 0");
53         this.syncThreshold = syncThreshold;
54     }
55
56     public void update(final String leaderId, final long leaderCommit, final long commitIndex) {
57         requireNonNull(leaderId, "leaderId should not be null");
58
59         if (syncTarget == null || !leaderId.equals(syncTarget.leaderId)) {
60             LOG.debug("{}: Last sync leader does not match current leader {}, need to catch up to {}", id,
61                 leaderId, leaderCommit);
62             changeSyncStatus(NOT_IN_SYNC, true);
63             syncTarget = new LeaderInfo(leaderId, leaderCommit);
64             return;
65         }
66
67         final long lag = leaderCommit - commitIndex;
68         if (lag > syncThreshold) {
69             LOG.debug("{}: Lagging {} entries behind leader {}", id, lag, leaderId);
70             changeSyncStatus(NOT_IN_SYNC, false);
71         } else if (commitIndex >= syncTarget.minimumCommitIndex) {
72             LOG.debug("{}: Lagging {} entries behind leader {} and reached {} (of expected {})", id, lag, leaderId,
73                 commitIndex, syncTarget.minimumCommitIndex);
74             changeSyncStatus(IN_SYNC, false);
75         }
76     }
77
78     private void changeSyncStatus(final boolean newSyncStatus, final boolean forceStatusChange) {
79         if (forceStatusChange || newSyncStatus != syncStatus) {
80             actor.tell(new FollowerInitialSyncUpStatus(newSyncStatus, id), ActorRef.noSender());
81             syncStatus = newSyncStatus;
82         } else {
83             LOG.trace("{}: No change in sync status of, dampening message", id);
84         }
85     }
86 }