2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore.shardmanager;
10 import akka.actor.ActorRef;
11 import akka.actor.Props;
12 import akka.serialization.Serialization;
13 import com.google.common.base.Preconditions;
14 import com.google.common.base.Strings;
15 import java.util.HashSet;
16 import java.util.Iterator;
18 import java.util.Objects;
19 import java.util.Optional;
21 import javax.annotation.Nullable;
22 import org.opendaylight.controller.cluster.access.concepts.MemberName;
23 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
24 import org.opendaylight.controller.cluster.datastore.Shard;
25 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
26 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
27 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
28 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
29 import org.opendaylight.controller.cluster.datastore.shardmanager.ShardManager.OnShardInitialized;
30 import org.opendaylight.controller.cluster.datastore.shardmanager.ShardManager.OnShardReady;
31 import org.opendaylight.controller.cluster.raft.RaftState;
32 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
33 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
37 final class ShardInformation {
38 private static final Logger LOG = LoggerFactory.getLogger(ShardInformation.class);
40 private final Set<OnShardInitialized> onShardInitializedSet = new HashSet<>();
41 private final Map<String, String> initialPeerAddresses;
42 private final ShardPeerAddressResolver addressResolver;
43 private final ShardIdentifier shardId;
44 private final String shardName;
46 // This reference indirection is required to have the ability to update the SchemaContext
47 // inside actor props. Otherwise we would be keeping an old SchemaContext there, preventing
48 // it from becoming garbage.
49 private final AtomicShardContextProvider schemaContextProvider = new AtomicShardContextProvider();
50 private ActorRef actor;
52 private Optional<DataTree> localShardDataTree;
53 private boolean leaderAvailable = false;
55 // flag that determines if the actor is ready for business
56 private boolean actorInitialized = false;
58 private boolean followerSyncStatus = false;
61 private String leaderId;
62 private short leaderVersion;
64 private DatastoreContext datastoreContext;
65 private Shard.AbstractBuilder<?, ?> builder;
66 private boolean activeMember = true;
68 ShardInformation(final String shardName, final ShardIdentifier shardId,
69 final Map<String, String> initialPeerAddresses, final DatastoreContext datastoreContext,
70 final Shard.AbstractBuilder<?, ?> builder, final ShardPeerAddressResolver addressResolver) {
71 this.shardName = shardName;
72 this.shardId = shardId;
73 this.initialPeerAddresses = initialPeerAddresses;
74 this.datastoreContext = datastoreContext;
75 this.builder = builder;
76 this.addressResolver = addressResolver;
80 Preconditions.checkNotNull(builder);
81 Props props = builder.id(shardId).peerAddresses(initialPeerAddresses).datastoreContext(datastoreContext)
82 .schemaContextProvider(schemaContextProvider).props();
87 String getShardName() {
96 void setActor(final ActorRef actor) {
100 ShardIdentifier getShardId() {
104 void setLocalDataTree(final Optional<DataTree> dataTree) {
105 this.localShardDataTree = dataTree;
108 Optional<DataTree> getLocalShardDataTree() {
109 return localShardDataTree;
112 DatastoreContext getDatastoreContext() {
113 return datastoreContext;
116 void setDatastoreContext(final DatastoreContext newDatastoreContext, final ActorRef sender) {
117 this.datastoreContext = newDatastoreContext;
119 LOG.debug("Sending new DatastoreContext to {}", shardId);
120 actor.tell(this.datastoreContext, sender);
124 void updatePeerAddress(final String peerId, final String peerAddress, final ActorRef sender) {
125 LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress);
128 LOG.debug("Sending PeerAddressResolved for peer {} with address {} to {}", peerId,
129 peerAddress, actor.path());
131 actor.tell(new PeerAddressResolved(peerId, peerAddress), sender);
134 notifyOnShardInitializedCallbacks();
137 void peerDown(final MemberName memberName, final String peerId, final ActorRef sender) {
139 actor.tell(new PeerDown(memberName, peerId), sender);
143 void peerUp(final MemberName memberName, final String peerId, final ActorRef sender) {
145 actor.tell(new PeerUp(memberName, peerId), sender);
149 boolean isShardReady() {
150 return !RaftState.Candidate.name().equals(role) && !Strings.isNullOrEmpty(role);
153 boolean isShardReadyWithLeaderId() {
154 return leaderAvailable && isShardReady() && !RaftState.IsolatedLeader.name().equals(role)
155 && !RaftState.PreLeader.name().equals(role)
156 && (isLeader() || addressResolver.resolve(leaderId) != null);
159 boolean isShardInitialized() {
160 return getActor() != null && actorInitialized;
164 return Objects.equals(leaderId, shardId.toString());
167 String getSerializedLeaderActor() {
169 return Serialization.serializedActorPath(getActor());
171 return addressResolver.resolve(leaderId);
175 void setActorInitialized() {
176 LOG.debug("Shard {} is initialized", shardId);
178 this.actorInitialized = true;
180 notifyOnShardInitializedCallbacks();
183 private void notifyOnShardInitializedCallbacks() {
184 if (onShardInitializedSet.isEmpty()) {
188 boolean ready = isShardReadyWithLeaderId();
190 LOG.debug("Shard {} is {} - notifying {} OnShardInitialized callbacks", shardId,
191 ready ? "ready" : "initialized", onShardInitializedSet.size());
193 Iterator<OnShardInitialized> iter = onShardInitializedSet.iterator();
194 while (iter.hasNext()) {
195 OnShardInitialized onShardInitialized = iter.next();
196 if (!(onShardInitialized instanceof OnShardReady) || ready) {
198 onShardInitialized.getTimeoutSchedule().cancel();
199 onShardInitialized.getReplyRunnable().run();
204 void addOnShardInitialized(final OnShardInitialized onShardInitialized) {
205 onShardInitializedSet.add(onShardInitialized);
208 void removeOnShardInitialized(final OnShardInitialized onShardInitialized) {
209 onShardInitializedSet.remove(onShardInitialized);
212 void setRole(final String newRole) {
215 notifyOnShardInitializedCallbacks();
222 void setFollowerSyncStatus(final boolean syncStatus) {
223 this.followerSyncStatus = syncStatus;
227 if (RaftState.Follower.name().equals(this.role)) {
228 return followerSyncStatus;
229 } else if (RaftState.Leader.name().equals(this.role)) {
236 boolean setLeaderId(final String newLeaderId) {
237 final boolean changed = !Objects.equals(this.leaderId, newLeaderId);
238 this.leaderId = newLeaderId;
239 if (newLeaderId != null) {
240 this.leaderAvailable = true;
242 notifyOnShardInitializedCallbacks();
247 String getLeaderId() {
251 void setLeaderAvailable(final boolean leaderAvailable) {
252 this.leaderAvailable = leaderAvailable;
254 if (leaderAvailable) {
255 notifyOnShardInitializedCallbacks();
259 short getLeaderVersion() {
260 return leaderVersion;
263 void setLeaderVersion(final short leaderVersion) {
264 this.leaderVersion = leaderVersion;
267 boolean isActiveMember() {
271 void setActiveMember(final boolean isActiveMember) {
272 this.activeMember = isActiveMember;
275 SchemaContext getSchemaContext() {
276 return schemaContextProvider.getSchemaContext();
279 void setSchemaContext(final SchemaContext schemaContext) {
280 schemaContextProvider.set(Preconditions.checkNotNull(schemaContext));
284 public String toString() {
285 return "ShardInformation [shardId=" + shardId + ", leaderAvailable=" + leaderAvailable + ", actorInitialized="
286 + actorInitialized + ", followerSyncStatus=" + followerSyncStatus + ", role=" + role + ", leaderId="
287 + leaderId + ", activeMember=" + activeMember + "]";