Bug 5504: Add PreLeader raft state
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / shardmanager / ShardInformation.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.shardmanager;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Props;
12 import akka.serialization.Serialization;
13 import com.google.common.base.Preconditions;
14 import com.google.common.base.Strings;
15 import java.util.HashSet;
16 import java.util.Iterator;
17 import java.util.Map;
18 import java.util.Objects;
19 import java.util.Optional;
20 import java.util.Set;
21 import javax.annotation.Nullable;
22 import org.opendaylight.controller.cluster.access.concepts.MemberName;
23 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
24 import org.opendaylight.controller.cluster.datastore.Shard;
25 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
26 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
27 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
28 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
29 import org.opendaylight.controller.cluster.datastore.shardmanager.ShardManager.OnShardInitialized;
30 import org.opendaylight.controller.cluster.datastore.shardmanager.ShardManager.OnShardReady;
31 import org.opendaylight.controller.cluster.raft.RaftState;
32 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
33 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 final class ShardInformation {
38     private static final Logger LOG = LoggerFactory.getLogger(ShardInformation.class);
39
40     private final Set<OnShardInitialized> onShardInitializedSet = new HashSet<>();
41     private final Map<String, String> initialPeerAddresses;
42     private final ShardPeerAddressResolver addressResolver;
43     private final ShardIdentifier shardId;
44     private final String shardName;
45     private ActorRef actor;
46     private Optional<DataTree> localShardDataTree;
47     private boolean leaderAvailable = false;
48
49     // flag that determines if the actor is ready for business
50     private boolean actorInitialized = false;
51
52     private boolean followerSyncStatus = false;
53
54     private String role ;
55     private String leaderId;
56     private short leaderVersion;
57
58     private DatastoreContext datastoreContext;
59     private Shard.AbstractBuilder<?, ?> builder;
60     private boolean isActiveMember = true;
61
62     ShardInformation(String shardName, ShardIdentifier shardId,
63             Map<String, String> initialPeerAddresses, DatastoreContext datastoreContext,
64             Shard.AbstractBuilder<?, ?> builder, ShardPeerAddressResolver addressResolver) {
65         this.shardName = shardName;
66         this.shardId = shardId;
67         this.initialPeerAddresses = initialPeerAddresses;
68         this.datastoreContext = datastoreContext;
69         this.builder = builder;
70         this.addressResolver = addressResolver;
71     }
72
73     Props newProps(SchemaContext schemaContext) {
74         Preconditions.checkNotNull(builder);
75         Props props = builder.id(shardId).peerAddresses(initialPeerAddresses).datastoreContext(datastoreContext).
76                 schemaContext(schemaContext).props();
77         builder = null;
78         return props;
79     }
80
81     String getShardName() {
82         return shardName;
83     }
84
85     @Nullable
86     ActorRef getActor(){
87         return actor;
88     }
89
90     void setActor(ActorRef actor) {
91         this.actor = actor;
92     }
93
94     ShardIdentifier getShardId() {
95         return shardId;
96     }
97
98     void setLocalDataTree(Optional<DataTree> localShardDataTree) {
99         this.localShardDataTree = localShardDataTree;
100     }
101
102     Optional<DataTree> getLocalShardDataTree() {
103         return localShardDataTree;
104     }
105
106     DatastoreContext getDatastoreContext() {
107         return datastoreContext;
108     }
109
110     void setDatastoreContext(DatastoreContext datastoreContext, ActorRef sender) {
111         this.datastoreContext = datastoreContext;
112         if (actor != null) {
113             LOG.debug("Sending new DatastoreContext to {}", shardId);
114             actor.tell(this.datastoreContext, sender);
115         }
116     }
117
118     void updatePeerAddress(String peerId, String peerAddress, ActorRef sender){
119         LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress);
120
121         if(actor != null) {
122             if(LOG.isDebugEnabled()) {
123                 LOG.debug("Sending PeerAddressResolved for peer {} with address {} to {}",
124                         peerId, peerAddress, actor.path());
125             }
126
127             actor.tell(new PeerAddressResolved(peerId, peerAddress), sender);
128         }
129
130         notifyOnShardInitializedCallbacks();
131     }
132
133     void peerDown(MemberName memberName, String peerId, ActorRef sender) {
134         if(actor != null) {
135             actor.tell(new PeerDown(memberName, peerId), sender);
136         }
137     }
138
139     void peerUp(MemberName memberName, String peerId, ActorRef sender) {
140         if(actor != null) {
141             actor.tell(new PeerUp(memberName, peerId), sender);
142         }
143     }
144
145     boolean isShardReady() {
146         return !RaftState.Candidate.name().equals(role) && !Strings.isNullOrEmpty(role);
147     }
148
149     boolean isShardReadyWithLeaderId() {
150         return leaderAvailable && isShardReady() && !RaftState.IsolatedLeader.name().equals(role) &&
151                 !RaftState.PreLeader.name().equals(role) &&
152                     (isLeader() || addressResolver.resolve(leaderId) != null);
153     }
154
155     boolean isShardInitialized() {
156         return getActor() != null && actorInitialized;
157     }
158
159     boolean isLeader() {
160         return Objects.equals(leaderId, shardId.toString());
161     }
162
163     String getSerializedLeaderActor() {
164         if(isLeader()) {
165             return Serialization.serializedActorPath(getActor());
166         } else {
167             return addressResolver.resolve(leaderId);
168         }
169     }
170
171     void setActorInitialized() {
172         LOG.debug("Shard {} is initialized", shardId);
173
174         this.actorInitialized = true;
175
176         notifyOnShardInitializedCallbacks();
177     }
178
179     private void notifyOnShardInitializedCallbacks() {
180         if(onShardInitializedSet.isEmpty()) {
181             return;
182         }
183
184         boolean ready = isShardReadyWithLeaderId();
185
186         LOG.debug("Shard {} is {} - notifying {} OnShardInitialized callbacks", shardId,
187             ready ? "ready" : "initialized", onShardInitializedSet.size());
188
189         Iterator<OnShardInitialized> iter = onShardInitializedSet.iterator();
190         while(iter.hasNext()) {
191             OnShardInitialized onShardInitialized = iter.next();
192             if (!(onShardInitialized instanceof OnShardReady) || ready) {
193                 iter.remove();
194                 onShardInitialized.getTimeoutSchedule().cancel();
195                 onShardInitialized.getReplyRunnable().run();
196             }
197         }
198     }
199
200     void addOnShardInitialized(OnShardInitialized onShardInitialized) {
201         onShardInitializedSet.add(onShardInitialized);
202     }
203
204     void removeOnShardInitialized(OnShardInitialized onShardInitialized) {
205         onShardInitializedSet.remove(onShardInitialized);
206     }
207
208     void setRole(String newRole) {
209         this.role = newRole;
210
211         notifyOnShardInitializedCallbacks();
212     }
213
214     void setFollowerSyncStatus(boolean syncStatus){
215         this.followerSyncStatus = syncStatus;
216     }
217
218     boolean isInSync(){
219         if(RaftState.Follower.name().equals(this.role)){
220             return followerSyncStatus;
221         } else if(RaftState.Leader.name().equals(this.role)){
222             return true;
223         }
224
225         return false;
226     }
227
228     boolean setLeaderId(String leaderId) {
229         boolean changed = !Objects.equals(this.leaderId, leaderId);
230         this.leaderId = leaderId;
231         if(leaderId != null) {
232             this.leaderAvailable = true;
233         }
234         notifyOnShardInitializedCallbacks();
235
236         return changed;
237     }
238
239     String getLeaderId() {
240         return leaderId;
241     }
242
243     void setLeaderAvailable(boolean leaderAvailable) {
244         this.leaderAvailable = leaderAvailable;
245
246         if(leaderAvailable) {
247             notifyOnShardInitializedCallbacks();
248         }
249     }
250
251     short getLeaderVersion() {
252         return leaderVersion;
253     }
254
255     void setLeaderVersion(short leaderVersion) {
256         this.leaderVersion = leaderVersion;
257     }
258
259     boolean isActiveMember() {
260         return isActiveMember;
261     }
262
263     void setActiveMember(boolean isActiveMember) {
264         this.isActiveMember = isActiveMember;
265     }
266 }