Add unit test for FrontedMetadata memory leaks
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / shardmanager / ShardInformation.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.shardmanager;
9
10 import static java.util.Objects.requireNonNull;
11
12 import akka.actor.ActorRef;
13 import akka.actor.Props;
14 import akka.serialization.Serialization;
15 import com.google.common.annotations.VisibleForTesting;
16 import com.google.common.base.Preconditions;
17 import com.google.common.base.Strings;
18 import java.util.HashSet;
19 import java.util.Iterator;
20 import java.util.Map;
21 import java.util.Objects;
22 import java.util.Optional;
23 import java.util.Set;
24 import org.eclipse.jdt.annotation.Nullable;
25 import org.opendaylight.controller.cluster.access.concepts.MemberName;
26 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
27 import org.opendaylight.controller.cluster.datastore.Shard;
28 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
29 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
30 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
31 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
32 import org.opendaylight.controller.cluster.datastore.shardmanager.ShardManager.OnShardInitialized;
33 import org.opendaylight.controller.cluster.datastore.shardmanager.ShardManager.OnShardReady;
34 import org.opendaylight.controller.cluster.raft.RaftState;
35 import org.opendaylight.yangtools.yang.data.api.schema.tree.ReadOnlyDataTree;
36 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39
40 final class ShardInformation {
41     private static final Logger LOG = LoggerFactory.getLogger(ShardInformation.class);
42
43     private final Set<OnShardInitialized> onShardInitializedSet = new HashSet<>();
44     private final Map<String, String> initialPeerAddresses;
45     private final ShardPeerAddressResolver addressResolver;
46     private final ShardIdentifier shardId;
47     private final String shardName;
48
49     // This reference indirection is required to have the ability to update the SchemaContext
50     // inside actor props. Otherwise we would be keeping an old SchemaContext there, preventing
51     // it from becoming garbage.
52     private final AtomicShardContextProvider schemaContextProvider = new AtomicShardContextProvider();
53     private ActorRef actor;
54
55     private Optional<ReadOnlyDataTree> localShardDataTree;
56     private boolean leaderAvailable = false;
57
58     // flag that determines if the actor is ready for business
59     private boolean actorInitialized = false;
60
61     private boolean followerSyncStatus = false;
62
63     private String role ;
64     private String leaderId;
65     private short leaderVersion;
66
67     private DatastoreContext datastoreContext;
68     private Shard.AbstractBuilder<?, ?> builder;
69     private boolean activeMember = true;
70
71     ShardInformation(final String shardName, final ShardIdentifier shardId,
72             final Map<String, String> initialPeerAddresses, final DatastoreContext datastoreContext,
73             final Shard.AbstractBuilder<?, ?> builder, final ShardPeerAddressResolver addressResolver) {
74         this.shardName = shardName;
75         this.shardId = shardId;
76         this.initialPeerAddresses = initialPeerAddresses;
77         this.datastoreContext = datastoreContext;
78         this.builder = builder;
79         this.addressResolver = addressResolver;
80     }
81
82     Props newProps() {
83         Props props = requireNonNull(builder).id(shardId).peerAddresses(initialPeerAddresses)
84                 .datastoreContext(datastoreContext).schemaContextProvider(schemaContextProvider).props();
85         builder = null;
86         return props;
87     }
88
89     String getShardName() {
90         return shardName;
91     }
92
93     @Nullable ActorRef getActor() {
94         return actor;
95     }
96
97     void setActor(final ActorRef actor) {
98         this.actor = actor;
99     }
100
101     ShardIdentifier getShardId() {
102         return shardId;
103     }
104
105     void setLocalDataTree(final Optional<ReadOnlyDataTree> dataTree) {
106         this.localShardDataTree = dataTree;
107     }
108
109     Optional<ReadOnlyDataTree> getLocalShardDataTree() {
110         return localShardDataTree;
111     }
112
113     DatastoreContext getDatastoreContext() {
114         return datastoreContext;
115     }
116
117     void setDatastoreContext(final DatastoreContext newDatastoreContext, final ActorRef sender) {
118         this.datastoreContext = newDatastoreContext;
119         if (actor != null) {
120             LOG.debug("Sending new DatastoreContext to {}", shardId);
121             actor.tell(this.datastoreContext, sender);
122         }
123     }
124
125     void updatePeerAddress(final String peerId, final String peerAddress, final ActorRef sender) {
126         LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress);
127
128         if (actor != null) {
129             LOG.debug("Sending PeerAddressResolved for peer {} with address {} to {}", peerId,
130                     peerAddress, actor.path());
131
132             actor.tell(new PeerAddressResolved(peerId, peerAddress), sender);
133         }
134
135         notifyOnShardInitializedCallbacks();
136     }
137
138     void peerDown(final MemberName memberName, final String peerId, final ActorRef sender) {
139         if (actor != null) {
140             actor.tell(new PeerDown(memberName, peerId), sender);
141         }
142     }
143
144     void peerUp(final MemberName memberName, final String peerId, final ActorRef sender) {
145         if (actor != null) {
146             actor.tell(new PeerUp(memberName, peerId), sender);
147         }
148     }
149
150     boolean isShardReady() {
151         return !RaftState.Candidate.name().equals(role) && !Strings.isNullOrEmpty(role);
152     }
153
154     boolean isShardReadyWithLeaderId() {
155         return leaderAvailable && isShardReady() && !RaftState.IsolatedLeader.name().equals(role)
156                 && !RaftState.PreLeader.name().equals(role)
157                 && (isLeader() || addressResolver.resolve(leaderId) != null);
158     }
159
160     boolean isShardInitialized() {
161         return getActor() != null && actorInitialized;
162     }
163
164     boolean isLeader() {
165         return Objects.equals(leaderId, shardId.toString());
166     }
167
168     String getSerializedLeaderActor() {
169         if (isLeader()) {
170             return Serialization.serializedActorPath(getActor());
171         } else {
172             return addressResolver.resolve(leaderId);
173         }
174     }
175
176     void setActorInitialized() {
177         LOG.debug("Shard {} is initialized", shardId);
178
179         this.actorInitialized = true;
180
181         notifyOnShardInitializedCallbacks();
182     }
183
184     private void notifyOnShardInitializedCallbacks() {
185         if (onShardInitializedSet.isEmpty()) {
186             return;
187         }
188
189         boolean ready = isShardReadyWithLeaderId();
190
191         LOG.debug("Shard {} is {} - notifying {} OnShardInitialized callbacks", shardId,
192             ready ? "ready" : "initialized", onShardInitializedSet.size());
193
194         Iterator<OnShardInitialized> iter = onShardInitializedSet.iterator();
195         while (iter.hasNext()) {
196             OnShardInitialized onShardInitialized = iter.next();
197             if (!(onShardInitialized instanceof OnShardReady) || ready) {
198                 iter.remove();
199                 onShardInitialized.getTimeoutSchedule().cancel();
200                 onShardInitialized.getReplyRunnable().run();
201             }
202         }
203     }
204
205     void addOnShardInitialized(final OnShardInitialized onShardInitialized) {
206         onShardInitializedSet.add(onShardInitialized);
207     }
208
209     void removeOnShardInitialized(final OnShardInitialized onShardInitialized) {
210         onShardInitializedSet.remove(onShardInitialized);
211     }
212
213     void setRole(final String newRole) {
214         this.role = newRole;
215
216         notifyOnShardInitializedCallbacks();
217     }
218
219     String getRole() {
220         return role;
221     }
222
223     void setFollowerSyncStatus(final boolean syncStatus) {
224         this.followerSyncStatus = syncStatus;
225     }
226
227     boolean isInSync() {
228         if (RaftState.Follower.name().equals(this.role)) {
229             return followerSyncStatus;
230         } else if (RaftState.Leader.name().equals(this.role)) {
231             return true;
232         }
233
234         return false;
235     }
236
237     boolean setLeaderId(final String newLeaderId) {
238         final boolean changed = !Objects.equals(this.leaderId, newLeaderId);
239         this.leaderId = newLeaderId;
240         if (newLeaderId != null) {
241             this.leaderAvailable = true;
242         }
243         notifyOnShardInitializedCallbacks();
244
245         return changed;
246     }
247
248     String getLeaderId() {
249         return leaderId;
250     }
251
252     void setLeaderAvailable(final boolean leaderAvailable) {
253         this.leaderAvailable = leaderAvailable;
254
255         if (leaderAvailable) {
256             notifyOnShardInitializedCallbacks();
257         }
258     }
259
260     short getLeaderVersion() {
261         return leaderVersion;
262     }
263
264     void setLeaderVersion(final short leaderVersion) {
265         this.leaderVersion = leaderVersion;
266     }
267
268     boolean isActiveMember() {
269         return activeMember;
270     }
271
272     void setActiveMember(final boolean isActiveMember) {
273         this.activeMember = isActiveMember;
274     }
275
276     SchemaContext getSchemaContext() {
277         return schemaContextProvider.getSchemaContext();
278     }
279
280     void setSchemaContext(final SchemaContext schemaContext) {
281         schemaContextProvider.set(Preconditions.checkNotNull(schemaContext));
282     }
283
284     @VisibleForTesting
285     Shard.AbstractBuilder<?, ?> getBuilder() {
286         return builder;
287     }
288
289     @Override
290     public String toString() {
291         return "ShardInformation [shardId=" + shardId + ", leaderAvailable=" + leaderAvailable + ", actorInitialized="
292                 + actorInitialized + ", followerSyncStatus=" + followerSyncStatus + ", role=" + role + ", leaderId="
293                 + leaderId + ", activeMember=" + activeMember + "]";
294     }
295
296
297 }