Remove PeerUp/Down messages
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / shardmanager / ShardInformation.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.shardmanager;
9
10 import static java.util.Objects.requireNonNull;
11
12 import akka.actor.ActorRef;
13 import akka.actor.Props;
14 import akka.serialization.Serialization;
15 import com.google.common.annotations.VisibleForTesting;
16 import com.google.common.base.Strings;
17 import java.util.HashSet;
18 import java.util.Iterator;
19 import java.util.Map;
20 import java.util.Objects;
21 import java.util.Optional;
22 import java.util.Set;
23 import org.eclipse.jdt.annotation.Nullable;
24 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
25 import org.opendaylight.controller.cluster.datastore.Shard;
26 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
27 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
28 import org.opendaylight.controller.cluster.datastore.shardmanager.ShardManager.OnShardInitialized;
29 import org.opendaylight.controller.cluster.datastore.shardmanager.ShardManager.OnShardReady;
30 import org.opendaylight.controller.cluster.raft.RaftState;
31 import org.opendaylight.yangtools.yang.data.api.schema.tree.ReadOnlyDataTree;
32 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 @VisibleForTesting
37 public final class ShardInformation {
38     private static final Logger LOG = LoggerFactory.getLogger(ShardInformation.class);
39
40     private final Set<OnShardInitialized> onShardInitializedSet = new HashSet<>();
41     private final Map<String, String> initialPeerAddresses;
42     private final ShardPeerAddressResolver addressResolver;
43     private final ShardIdentifier shardId;
44     private final String shardName;
45
46     // This reference indirection is required to have the ability to update the SchemaContext
47     // inside actor props. Otherwise we would be keeping an old SchemaContext there, preventing
48     // it from becoming garbage.
49     private final AtomicShardContextProvider schemaContextProvider = new AtomicShardContextProvider();
50     private ActorRef actor;
51
52     private Optional<ReadOnlyDataTree> localShardDataTree;
53     private boolean leaderAvailable = false;
54
55     // flag that determines if the actor is ready for business
56     private boolean actorInitialized = false;
57
58     private boolean followerSyncStatus = false;
59
60     private String role ;
61     private String leaderId;
62     private short leaderVersion;
63
64     private DatastoreContext datastoreContext;
65     private Shard.AbstractBuilder<?, ?> builder;
66     private boolean activeMember = true;
67
68     ShardInformation(final String shardName, final ShardIdentifier shardId,
69             final Map<String, String> initialPeerAddresses, final DatastoreContext datastoreContext,
70             final Shard.AbstractBuilder<?, ?> builder, final ShardPeerAddressResolver addressResolver) {
71         this.shardName = shardName;
72         this.shardId = shardId;
73         this.initialPeerAddresses = initialPeerAddresses;
74         this.datastoreContext = datastoreContext;
75         this.builder = builder;
76         this.addressResolver = addressResolver;
77     }
78
79     Props newProps() {
80         Props props = requireNonNull(builder).id(shardId).peerAddresses(initialPeerAddresses)
81                 .datastoreContext(datastoreContext).schemaContextProvider(schemaContextProvider).props();
82         builder = null;
83         return props;
84     }
85
86     String getShardName() {
87         return shardName;
88     }
89
90     @VisibleForTesting
91     @Nullable public ActorRef getActor() {
92         return actor;
93     }
94
95     void setActor(final ActorRef actor) {
96         this.actor = actor;
97     }
98
99     ShardIdentifier getShardId() {
100         return shardId;
101     }
102
103     void setLocalDataTree(final Optional<ReadOnlyDataTree> dataTree) {
104         this.localShardDataTree = dataTree;
105     }
106
107     Optional<ReadOnlyDataTree> getLocalShardDataTree() {
108         return localShardDataTree;
109     }
110
111     DatastoreContext getDatastoreContext() {
112         return datastoreContext;
113     }
114
115     void setDatastoreContext(final DatastoreContext newDatastoreContext, final ActorRef sender) {
116         this.datastoreContext = newDatastoreContext;
117         if (actor != null) {
118             LOG.debug("Sending new DatastoreContext to {}", shardId);
119             actor.tell(this.datastoreContext, sender);
120         }
121     }
122
123     void updatePeerAddress(final String peerId, final String peerAddress, final ActorRef sender) {
124         LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress);
125
126         if (actor != null) {
127             LOG.debug("Sending PeerAddressResolved for peer {} with address {} to {}", peerId,
128                     peerAddress, actor.path());
129
130             actor.tell(new PeerAddressResolved(peerId, peerAddress), sender);
131         }
132
133         notifyOnShardInitializedCallbacks();
134     }
135
136     boolean isShardReady() {
137         return !RaftState.Candidate.name().equals(role) && !Strings.isNullOrEmpty(role);
138     }
139
140     boolean isShardReadyWithLeaderId() {
141         return leaderAvailable && isShardReady() && !RaftState.IsolatedLeader.name().equals(role)
142                 && !RaftState.PreLeader.name().equals(role)
143                 && (isLeader() || addressResolver.resolve(leaderId) != null);
144     }
145
146     boolean isShardInitialized() {
147         return getActor() != null && actorInitialized;
148     }
149
150     boolean isLeader() {
151         return Objects.equals(leaderId, shardId.toString());
152     }
153
154     String getSerializedLeaderActor() {
155         if (isLeader()) {
156             return Serialization.serializedActorPath(getActor());
157         } else {
158             return addressResolver.resolve(leaderId);
159         }
160     }
161
162     void setActorInitialized() {
163         LOG.debug("Shard {} is initialized", shardId);
164
165         this.actorInitialized = true;
166
167         notifyOnShardInitializedCallbacks();
168     }
169
170     private void notifyOnShardInitializedCallbacks() {
171         if (onShardInitializedSet.isEmpty()) {
172             return;
173         }
174
175         boolean ready = isShardReadyWithLeaderId();
176
177         LOG.debug("Shard {} is {} - notifying {} OnShardInitialized callbacks", shardId,
178             ready ? "ready" : "initialized", onShardInitializedSet.size());
179
180         Iterator<OnShardInitialized> iter = onShardInitializedSet.iterator();
181         while (iter.hasNext()) {
182             OnShardInitialized onShardInitialized = iter.next();
183             if (!(onShardInitialized instanceof OnShardReady) || ready) {
184                 iter.remove();
185                 onShardInitialized.getTimeoutSchedule().cancel();
186                 onShardInitialized.getReplyRunnable().run();
187             }
188         }
189     }
190
191     void addOnShardInitialized(final OnShardInitialized onShardInitialized) {
192         onShardInitializedSet.add(onShardInitialized);
193     }
194
195     void removeOnShardInitialized(final OnShardInitialized onShardInitialized) {
196         onShardInitializedSet.remove(onShardInitialized);
197     }
198
199     void setRole(final String newRole) {
200         this.role = newRole;
201
202         notifyOnShardInitializedCallbacks();
203     }
204
205     String getRole() {
206         return role;
207     }
208
209     void setFollowerSyncStatus(final boolean syncStatus) {
210         this.followerSyncStatus = syncStatus;
211     }
212
213     boolean isInSync() {
214         if (RaftState.Follower.name().equals(this.role)) {
215             return followerSyncStatus;
216         } else if (RaftState.Leader.name().equals(this.role)) {
217             return true;
218         }
219
220         return false;
221     }
222
223     boolean setLeaderId(final String newLeaderId) {
224         final boolean changed = !Objects.equals(this.leaderId, newLeaderId);
225         this.leaderId = newLeaderId;
226         if (newLeaderId != null) {
227             this.leaderAvailable = true;
228         }
229         notifyOnShardInitializedCallbacks();
230
231         return changed;
232     }
233
234     String getLeaderId() {
235         return leaderId;
236     }
237
238     void setLeaderAvailable(final boolean leaderAvailable) {
239         this.leaderAvailable = leaderAvailable;
240
241         if (leaderAvailable) {
242             notifyOnShardInitializedCallbacks();
243         }
244     }
245
246     short getLeaderVersion() {
247         return leaderVersion;
248     }
249
250     void setLeaderVersion(final short leaderVersion) {
251         this.leaderVersion = leaderVersion;
252     }
253
254     boolean isActiveMember() {
255         return activeMember;
256     }
257
258     void setActiveMember(final boolean isActiveMember) {
259         this.activeMember = isActiveMember;
260     }
261
262     EffectiveModelContext getSchemaContext() {
263         return schemaContextProvider.getEffectiveModelContext();
264     }
265
266     void setSchemaContext(final EffectiveModelContext schemaContext) {
267         schemaContextProvider.set(requireNonNull(schemaContext));
268     }
269
270     @VisibleForTesting
271     Shard.AbstractBuilder<?, ?> getBuilder() {
272         return builder;
273     }
274
275     @Override
276     public String toString() {
277         return "ShardInformation [shardId=" + shardId + ", leaderAvailable=" + leaderAvailable + ", actorInitialized="
278                 + actorInitialized + ", followerSyncStatus=" + followerSyncStatus + ", role=" + role + ", leaderId="
279                 + leaderId + ", activeMember=" + activeMember + "]";
280     }
281
282
283 }