8ebb7e0cc9bfa7e83b2b5e72e9c3003631687070
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / shardmanager / ShardInformation.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.shardmanager;
9
10 import static java.util.Objects.requireNonNull;
11
12 import akka.actor.ActorRef;
13 import akka.actor.Props;
14 import akka.serialization.Serialization;
15 import com.google.common.annotations.VisibleForTesting;
16 import com.google.common.base.Strings;
17 import java.util.HashSet;
18 import java.util.Iterator;
19 import java.util.Map;
20 import java.util.Objects;
21 import java.util.Optional;
22 import java.util.Set;
23 import org.eclipse.jdt.annotation.Nullable;
24 import org.opendaylight.controller.cluster.access.concepts.MemberName;
25 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
26 import org.opendaylight.controller.cluster.datastore.Shard;
27 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
28 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
29 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
30 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
31 import org.opendaylight.controller.cluster.datastore.shardmanager.ShardManager.OnShardInitialized;
32 import org.opendaylight.controller.cluster.datastore.shardmanager.ShardManager.OnShardReady;
33 import org.opendaylight.controller.cluster.raft.RaftState;
34 import org.opendaylight.yangtools.yang.data.api.schema.tree.ReadOnlyDataTree;
35 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38
39 @VisibleForTesting
40 public final class ShardInformation {
41     private static final Logger LOG = LoggerFactory.getLogger(ShardInformation.class);
42
43     private final Set<OnShardInitialized> onShardInitializedSet = new HashSet<>();
44     private final Map<String, String> initialPeerAddresses;
45     private final ShardPeerAddressResolver addressResolver;
46     private final ShardIdentifier shardId;
47     private final String shardName;
48
49     // This reference indirection is required to have the ability to update the SchemaContext
50     // inside actor props. Otherwise we would be keeping an old SchemaContext there, preventing
51     // it from becoming garbage.
52     private final AtomicShardContextProvider schemaContextProvider = new AtomicShardContextProvider();
53     private ActorRef actor;
54
55     private Optional<ReadOnlyDataTree> localShardDataTree;
56     private boolean leaderAvailable = false;
57
58     // flag that determines if the actor is ready for business
59     private boolean actorInitialized = false;
60
61     private boolean followerSyncStatus = false;
62
63     private String role ;
64     private String leaderId;
65     private short leaderVersion;
66
67     private DatastoreContext datastoreContext;
68     private Shard.AbstractBuilder<?, ?> builder;
69     private boolean activeMember = true;
70
71     ShardInformation(final String shardName, final ShardIdentifier shardId,
72             final Map<String, String> initialPeerAddresses, final DatastoreContext datastoreContext,
73             final Shard.AbstractBuilder<?, ?> builder, final ShardPeerAddressResolver addressResolver) {
74         this.shardName = shardName;
75         this.shardId = shardId;
76         this.initialPeerAddresses = initialPeerAddresses;
77         this.datastoreContext = datastoreContext;
78         this.builder = builder;
79         this.addressResolver = addressResolver;
80     }
81
82     Props newProps() {
83         Props props = requireNonNull(builder).id(shardId).peerAddresses(initialPeerAddresses)
84                 .datastoreContext(datastoreContext).schemaContextProvider(schemaContextProvider).props();
85         builder = null;
86         return props;
87     }
88
89     String getShardName() {
90         return shardName;
91     }
92
93     @VisibleForTesting
94     @Nullable public ActorRef getActor() {
95         return actor;
96     }
97
98     void setActor(final ActorRef actor) {
99         this.actor = actor;
100     }
101
102     ShardIdentifier getShardId() {
103         return shardId;
104     }
105
106     void setLocalDataTree(final Optional<ReadOnlyDataTree> dataTree) {
107         this.localShardDataTree = dataTree;
108     }
109
110     Optional<ReadOnlyDataTree> getLocalShardDataTree() {
111         return localShardDataTree;
112     }
113
114     DatastoreContext getDatastoreContext() {
115         return datastoreContext;
116     }
117
118     void setDatastoreContext(final DatastoreContext newDatastoreContext, final ActorRef sender) {
119         this.datastoreContext = newDatastoreContext;
120         if (actor != null) {
121             LOG.debug("Sending new DatastoreContext to {}", shardId);
122             actor.tell(this.datastoreContext, sender);
123         }
124     }
125
126     void updatePeerAddress(final String peerId, final String peerAddress, final ActorRef sender) {
127         LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress);
128
129         if (actor != null) {
130             LOG.debug("Sending PeerAddressResolved for peer {} with address {} to {}", peerId,
131                     peerAddress, actor.path());
132
133             actor.tell(new PeerAddressResolved(peerId, peerAddress), sender);
134         }
135
136         notifyOnShardInitializedCallbacks();
137     }
138
139     void peerDown(final MemberName memberName, final String peerId, final ActorRef sender) {
140         if (actor != null) {
141             actor.tell(new PeerDown(memberName, peerId), sender);
142         }
143     }
144
145     void peerUp(final MemberName memberName, final String peerId, final ActorRef sender) {
146         if (actor != null) {
147             actor.tell(new PeerUp(memberName, peerId), sender);
148         }
149     }
150
151     boolean isShardReady() {
152         return !RaftState.Candidate.name().equals(role) && !Strings.isNullOrEmpty(role);
153     }
154
155     boolean isShardReadyWithLeaderId() {
156         return leaderAvailable && isShardReady() && !RaftState.IsolatedLeader.name().equals(role)
157                 && !RaftState.PreLeader.name().equals(role)
158                 && (isLeader() || addressResolver.resolve(leaderId) != null);
159     }
160
161     boolean isShardInitialized() {
162         return getActor() != null && actorInitialized;
163     }
164
165     boolean isLeader() {
166         return Objects.equals(leaderId, shardId.toString());
167     }
168
169     String getSerializedLeaderActor() {
170         if (isLeader()) {
171             return Serialization.serializedActorPath(getActor());
172         } else {
173             return addressResolver.resolve(leaderId);
174         }
175     }
176
177     void setActorInitialized() {
178         LOG.debug("Shard {} is initialized", shardId);
179
180         this.actorInitialized = true;
181
182         notifyOnShardInitializedCallbacks();
183     }
184
185     private void notifyOnShardInitializedCallbacks() {
186         if (onShardInitializedSet.isEmpty()) {
187             return;
188         }
189
190         boolean ready = isShardReadyWithLeaderId();
191
192         LOG.debug("Shard {} is {} - notifying {} OnShardInitialized callbacks", shardId,
193             ready ? "ready" : "initialized", onShardInitializedSet.size());
194
195         Iterator<OnShardInitialized> iter = onShardInitializedSet.iterator();
196         while (iter.hasNext()) {
197             OnShardInitialized onShardInitialized = iter.next();
198             if (!(onShardInitialized instanceof OnShardReady) || ready) {
199                 iter.remove();
200                 onShardInitialized.getTimeoutSchedule().cancel();
201                 onShardInitialized.getReplyRunnable().run();
202             }
203         }
204     }
205
206     void addOnShardInitialized(final OnShardInitialized onShardInitialized) {
207         onShardInitializedSet.add(onShardInitialized);
208     }
209
210     void removeOnShardInitialized(final OnShardInitialized onShardInitialized) {
211         onShardInitializedSet.remove(onShardInitialized);
212     }
213
214     void setRole(final String newRole) {
215         this.role = newRole;
216
217         notifyOnShardInitializedCallbacks();
218     }
219
220     String getRole() {
221         return role;
222     }
223
224     void setFollowerSyncStatus(final boolean syncStatus) {
225         this.followerSyncStatus = syncStatus;
226     }
227
228     boolean isInSync() {
229         if (RaftState.Follower.name().equals(this.role)) {
230             return followerSyncStatus;
231         } else if (RaftState.Leader.name().equals(this.role)) {
232             return true;
233         }
234
235         return false;
236     }
237
238     boolean setLeaderId(final String newLeaderId) {
239         final boolean changed = !Objects.equals(this.leaderId, newLeaderId);
240         this.leaderId = newLeaderId;
241         if (newLeaderId != null) {
242             this.leaderAvailable = true;
243         }
244         notifyOnShardInitializedCallbacks();
245
246         return changed;
247     }
248
249     String getLeaderId() {
250         return leaderId;
251     }
252
253     void setLeaderAvailable(final boolean leaderAvailable) {
254         this.leaderAvailable = leaderAvailable;
255
256         if (leaderAvailable) {
257             notifyOnShardInitializedCallbacks();
258         }
259     }
260
261     short getLeaderVersion() {
262         return leaderVersion;
263     }
264
265     void setLeaderVersion(final short leaderVersion) {
266         this.leaderVersion = leaderVersion;
267     }
268
269     boolean isActiveMember() {
270         return activeMember;
271     }
272
273     void setActiveMember(final boolean isActiveMember) {
274         this.activeMember = isActiveMember;
275     }
276
277     EffectiveModelContext getSchemaContext() {
278         return schemaContextProvider.getEffectiveModelContext();
279     }
280
281     void setSchemaContext(final EffectiveModelContext schemaContext) {
282         schemaContextProvider.set(requireNonNull(schemaContext));
283     }
284
285     @VisibleForTesting
286     Shard.AbstractBuilder<?, ?> getBuilder() {
287         return builder;
288     }
289
290     @Override
291     public String toString() {
292         return "ShardInformation [shardId=" + shardId + ", leaderAvailable=" + leaderAvailable + ", actorInitialized="
293                 + actorInitialized + ", followerSyncStatus=" + followerSyncStatus + ", role=" + role + ", leaderId="
294                 + leaderId + ", activeMember=" + activeMember + "]";
295     }
296
297
298 }