Cleanup ShardInformation
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / shardmanager / ShardInformation.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.shardmanager;
9
10 import static java.util.Objects.requireNonNull;
11
12 import akka.actor.ActorRef;
13 import akka.actor.Props;
14 import akka.serialization.Serialization;
15 import com.google.common.annotations.VisibleForTesting;
16 import com.google.common.base.Strings;
17 import java.util.HashSet;
18 import java.util.Iterator;
19 import java.util.Map;
20 import java.util.Objects;
21 import java.util.Optional;
22 import java.util.Set;
23 import org.eclipse.jdt.annotation.Nullable;
24 import org.opendaylight.controller.cluster.access.concepts.MemberName;
25 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
26 import org.opendaylight.controller.cluster.datastore.Shard;
27 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
28 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
29 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
30 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
31 import org.opendaylight.controller.cluster.datastore.shardmanager.ShardManager.OnShardInitialized;
32 import org.opendaylight.controller.cluster.datastore.shardmanager.ShardManager.OnShardReady;
33 import org.opendaylight.controller.cluster.raft.RaftState;
34 import org.opendaylight.yangtools.yang.data.api.schema.tree.ReadOnlyDataTree;
35 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38
39 @VisibleForTesting
40 public final class ShardInformation {
41     private static final Logger LOG = LoggerFactory.getLogger(ShardInformation.class);
42
43     private final Set<OnShardInitialized> onShardInitializedSet = new HashSet<>();
44     private final Map<String, String> initialPeerAddresses;
45     private final ShardPeerAddressResolver addressResolver;
46     private final ShardIdentifier shardId;
47     private final String shardName;
48
49     // This reference indirection is required to have the ability to update the SchemaContext
50     // inside actor props. Otherwise we would be keeping an old SchemaContext there, preventing
51     // it from becoming garbage.
52     private final AtomicShardContextProvider schemaContextProvider = new AtomicShardContextProvider();
53     private ActorRef actor;
54
55     private Optional<ReadOnlyDataTree> localShardDataTree;
56     private boolean leaderAvailable = false;
57
58     // flag that determines if the actor is ready for business
59     private boolean actorInitialized = false;
60
61     private boolean followerSyncStatus = false;
62
63     private String role ;
64     private String leaderId;
65     private short leaderVersion;
66
67     private DatastoreContext datastoreContext;
68     private Shard.AbstractBuilder<?, ?> builder;
69     private boolean activeMember = true;
70
71     ShardInformation(final String shardName, final ShardIdentifier shardId,
72             final Map<String, String> initialPeerAddresses, final DatastoreContext datastoreContext,
73             final Shard.AbstractBuilder<?, ?> builder, final ShardPeerAddressResolver addressResolver) {
74         this.shardName = shardName;
75         this.shardId = shardId;
76         this.initialPeerAddresses = initialPeerAddresses;
77         this.datastoreContext = datastoreContext;
78         this.builder = builder;
79         this.addressResolver = addressResolver;
80     }
81
82     Props newProps() {
83         Props props = requireNonNull(builder).id(shardId).peerAddresses(initialPeerAddresses)
84                 .datastoreContext(datastoreContext).schemaContextProvider(schemaContextProvider).props();
85         builder = null;
86         return props;
87     }
88
89     String getShardName() {
90         return shardName;
91     }
92
93     @VisibleForTesting
94     @Nullable public ActorRef getActor() {
95         return actor;
96     }
97
98     void setActor(final ActorRef actor) {
99         this.actor = actor;
100     }
101
102     ShardIdentifier getShardId() {
103         return shardId;
104     }
105
106     void setLocalDataTree(final Optional<ReadOnlyDataTree> dataTree) {
107         localShardDataTree = dataTree;
108     }
109
110     Optional<ReadOnlyDataTree> getLocalShardDataTree() {
111         return localShardDataTree;
112     }
113
114     DatastoreContext getDatastoreContext() {
115         return datastoreContext;
116     }
117
118     void setDatastoreContext(final DatastoreContext newDatastoreContext, final ActorRef sender) {
119         datastoreContext = newDatastoreContext;
120         if (actor != null) {
121             LOG.debug("Sending new DatastoreContext to {}", shardId);
122             actor.tell(datastoreContext, sender);
123         }
124     }
125
126     void updatePeerAddress(final String peerId, final String peerAddress, final ActorRef sender) {
127         LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress);
128
129         if (actor != null) {
130             LOG.debug("Sending PeerAddressResolved for peer {} with address {} to {}", peerId,
131                     peerAddress, actor.path());
132
133             actor.tell(new PeerAddressResolved(peerId, peerAddress), sender);
134         }
135
136         notifyOnShardInitializedCallbacks();
137     }
138
139     void peerDown(final MemberName memberName, final String peerId, final ActorRef sender) {
140         if (actor != null) {
141             actor.tell(new PeerDown(memberName, peerId), sender);
142         }
143     }
144
145     void peerUp(final MemberName memberName, final String peerId, final ActorRef sender) {
146         if (actor != null) {
147             actor.tell(new PeerUp(memberName, peerId), sender);
148         }
149     }
150
151     boolean isShardReady() {
152         return !RaftState.Candidate.name().equals(role) && !Strings.isNullOrEmpty(role);
153     }
154
155     boolean isShardReadyWithLeaderId() {
156         return leaderAvailable && isShardReady() && !RaftState.IsolatedLeader.name().equals(role)
157                 && !RaftState.PreLeader.name().equals(role)
158                 && (isLeader() || addressResolver.resolve(leaderId) != null);
159     }
160
161     boolean isShardInitialized() {
162         return getActor() != null && actorInitialized;
163     }
164
165     boolean isLeader() {
166         return Objects.equals(leaderId, shardId.toString());
167     }
168
169     String getSerializedLeaderActor() {
170         return isLeader() ? Serialization.serializedActorPath(getActor()) : addressResolver.resolve(leaderId);
171     }
172
173     void setActorInitialized() {
174         LOG.debug("Shard {} is initialized", shardId);
175
176         actorInitialized = true;
177
178         notifyOnShardInitializedCallbacks();
179     }
180
181     private void notifyOnShardInitializedCallbacks() {
182         if (onShardInitializedSet.isEmpty()) {
183             return;
184         }
185
186         boolean ready = isShardReadyWithLeaderId();
187
188         LOG.debug("Shard {} is {} - notifying {} OnShardInitialized callbacks", shardId,
189             ready ? "ready" : "initialized", onShardInitializedSet.size());
190
191         Iterator<OnShardInitialized> iter = onShardInitializedSet.iterator();
192         while (iter.hasNext()) {
193             OnShardInitialized onShardInitialized = iter.next();
194             if (!(onShardInitialized instanceof OnShardReady) || ready) {
195                 iter.remove();
196                 onShardInitialized.getTimeoutSchedule().cancel();
197                 onShardInitialized.getReplyRunnable().run();
198             }
199         }
200     }
201
202     void addOnShardInitialized(final OnShardInitialized onShardInitialized) {
203         onShardInitializedSet.add(onShardInitialized);
204     }
205
206     void removeOnShardInitialized(final OnShardInitialized onShardInitialized) {
207         onShardInitializedSet.remove(onShardInitialized);
208     }
209
210     void setRole(final String newRole) {
211         role = newRole;
212
213         notifyOnShardInitializedCallbacks();
214     }
215
216     String getRole() {
217         return role;
218     }
219
220     void setFollowerSyncStatus(final boolean syncStatus) {
221         followerSyncStatus = syncStatus;
222     }
223
224     boolean isInSync() {
225         if (RaftState.Follower.name().equals(role)) {
226             return followerSyncStatus;
227         } else if (RaftState.Leader.name().equals(role)) {
228             return true;
229         }
230
231         return false;
232     }
233
234     boolean setLeaderId(final String newLeaderId) {
235         final boolean changed = !Objects.equals(leaderId, newLeaderId);
236         leaderId = newLeaderId;
237         if (newLeaderId != null) {
238             leaderAvailable = true;
239         }
240         notifyOnShardInitializedCallbacks();
241
242         return changed;
243     }
244
245     String getLeaderId() {
246         return leaderId;
247     }
248
249     void setLeaderAvailable(final boolean leaderAvailable) {
250         this.leaderAvailable = leaderAvailable;
251
252         if (leaderAvailable) {
253             notifyOnShardInitializedCallbacks();
254         }
255     }
256
257     short getLeaderVersion() {
258         return leaderVersion;
259     }
260
261     void setLeaderVersion(final short leaderVersion) {
262         this.leaderVersion = leaderVersion;
263     }
264
265     boolean isActiveMember() {
266         return activeMember;
267     }
268
269     void setActiveMember(final boolean isActiveMember) {
270         activeMember = isActiveMember;
271     }
272
273     EffectiveModelContext getSchemaContext() {
274         return schemaContextProvider.getEffectiveModelContext();
275     }
276
277     void setSchemaContext(final EffectiveModelContext schemaContext) {
278         schemaContextProvider.set(requireNonNull(schemaContext));
279     }
280
281     @VisibleForTesting
282     Shard.AbstractBuilder<?, ?> getBuilder() {
283         return builder;
284     }
285
286     @Override
287     public String toString() {
288         return "ShardInformation [shardId=" + shardId + ", leaderAvailable=" + leaderAvailable + ", actorInitialized="
289                 + actorInitialized + ", followerSyncStatus=" + followerSyncStatus + ", role=" + role + ", leaderId="
290                 + leaderId + ", activeMember=" + activeMember + "]";
291     }
292
293
294 }