From: Robert Varga Date: Thu, 4 May 2017 21:52:57 +0000 (+0200) Subject: Do not retain initial SchemaContext X-Git-Tag: release/carbon-sr1~63 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=7426d405093265655b05c6a3eb197362266edf2e Do not retain initial SchemaContext While looking over a memory dump I have noticed that we retain SchemaContext inside Shard$Builder, which is being retained via Props (which are used to restart the actor). This reference is not updated as the SchemaContext is updated, which means we are wasting memory and are causing Shard to come up with an ancient SchemaContext after a failure. Fix this by having an AtomicReference holder for SchemaContext and have Shard have a Supplier. Change-Id: I73fcae46f249d3679522eb7dbbb059e43c5af6c7 Signed-off-by: Robert Varga --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index fbd5b6456a..c8be1bed4d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -98,6 +98,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailed import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree; import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType; import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.opendaylight.yangtools.yang.model.api.SchemaContextProvider; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; @@ -892,7 +893,7 @@ public class Shard extends RaftActor { private ShardIdentifier id; private Map peerAddresses = Collections.emptyMap(); private DatastoreContext datastoreContext; - private SchemaContext schemaContext; + private SchemaContextProvider schemaContextProvider; private DatastoreSnapshot.ShardSnapshot restoreFromSnapshot; private TipProducingDataTree dataTree; private volatile boolean sealed; @@ -928,9 +929,9 @@ public class Shard extends RaftActor { return self(); } - public T schemaContext(final SchemaContext newSchemaContext) { + public T schemaContextProvider(final SchemaContextProvider schemaContextProvider) { checkSealed(); - this.schemaContext = newSchemaContext; + this.schemaContextProvider = Preconditions.checkNotNull(schemaContextProvider); return self(); } @@ -959,7 +960,7 @@ public class Shard extends RaftActor { } public SchemaContext getSchemaContext() { - return schemaContext; + return Verify.verifyNotNull(schemaContextProvider.getSchemaContext()); } public DatastoreSnapshot.ShardSnapshot getRestoreFromSnapshot() { @@ -986,7 +987,7 @@ public class Shard extends RaftActor { Preconditions.checkNotNull(id, "id should not be null"); Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null"); Preconditions.checkNotNull(datastoreContext, "dataStoreContext should not be null"); - Preconditions.checkNotNull(schemaContext, "schemaContext should not be null"); + Preconditions.checkNotNull(schemaContextProvider, "schemaContextProvider should not be null"); } public Props props() { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/AtomicShardContextProvider.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/AtomicShardContextProvider.java new file mode 100644 index 0000000000..6a4e982440 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/AtomicShardContextProvider.java @@ -0,0 +1,22 @@ +/* + * Copyright (c) 2017 Pantheon Technologies, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.shardmanager; + +import com.google.common.base.Verify; +import java.util.concurrent.atomic.AtomicReference; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.opendaylight.yangtools.yang.model.api.SchemaContextProvider; + +final class AtomicShardContextProvider extends AtomicReference implements SchemaContextProvider { + private static final long serialVersionUID = 1L; + + @Override + public SchemaContext getSchemaContext() { + return Verify.verifyNotNull(get()); + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardInformation.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardInformation.java index 0c892295a7..8fc77acf96 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardInformation.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardInformation.java @@ -42,7 +42,13 @@ final class ShardInformation { private final ShardPeerAddressResolver addressResolver; private final ShardIdentifier shardId; private final String shardName; + + // This reference indirection is required to have the ability to update the SchemaContext + // inside actor props. Otherwise we would be keeping an old SchemaContext there, preventing + // it from becoming garbage. + private final AtomicShardContextProvider schemaContextProvider = new AtomicShardContextProvider(); private ActorRef actor; + private Optional localShardDataTree; private boolean leaderAvailable = false; @@ -59,9 +65,9 @@ final class ShardInformation { private Shard.AbstractBuilder builder; private boolean isActiveMember = true; - ShardInformation(String shardName, ShardIdentifier shardId, - Map initialPeerAddresses, DatastoreContext datastoreContext, - Shard.AbstractBuilder builder, ShardPeerAddressResolver addressResolver) { + ShardInformation(final String shardName, final ShardIdentifier shardId, + final Map initialPeerAddresses, final DatastoreContext datastoreContext, + final Shard.AbstractBuilder builder, final ShardPeerAddressResolver addressResolver) { this.shardName = shardName; this.shardId = shardId; this.initialPeerAddresses = initialPeerAddresses; @@ -70,10 +76,10 @@ final class ShardInformation { this.addressResolver = addressResolver; } - Props newProps(SchemaContext schemaContext) { + Props newProps() { Preconditions.checkNotNull(builder); Props props = builder.id(shardId).peerAddresses(initialPeerAddresses).datastoreContext(datastoreContext) - .schemaContext(schemaContext).props(); + .schemaContextProvider(schemaContextProvider).props(); builder = null; return props; } @@ -87,7 +93,7 @@ final class ShardInformation { return actor; } - void setActor(ActorRef actor) { + void setActor(final ActorRef actor) { this.actor = actor; } @@ -95,7 +101,7 @@ final class ShardInformation { return shardId; } - void setLocalDataTree(Optional localShardDataTree) { + void setLocalDataTree(final Optional localShardDataTree) { this.localShardDataTree = localShardDataTree; } @@ -107,7 +113,7 @@ final class ShardInformation { return datastoreContext; } - void setDatastoreContext(DatastoreContext datastoreContext, ActorRef sender) { + void setDatastoreContext(final DatastoreContext datastoreContext, final ActorRef sender) { this.datastoreContext = datastoreContext; if (actor != null) { LOG.debug("Sending new DatastoreContext to {}", shardId); @@ -115,7 +121,7 @@ final class ShardInformation { } } - void updatePeerAddress(String peerId, String peerAddress, ActorRef sender) { + void updatePeerAddress(final String peerId, final String peerAddress, final ActorRef sender) { LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress); if (actor != null) { @@ -128,13 +134,13 @@ final class ShardInformation { notifyOnShardInitializedCallbacks(); } - void peerDown(MemberName memberName, String peerId, ActorRef sender) { + void peerDown(final MemberName memberName, final String peerId, final ActorRef sender) { if (actor != null) { actor.tell(new PeerDown(memberName, peerId), sender); } } - void peerUp(MemberName memberName, String peerId, ActorRef sender) { + void peerUp(final MemberName memberName, final String peerId, final ActorRef sender) { if (actor != null) { actor.tell(new PeerUp(memberName, peerId), sender); } @@ -195,15 +201,15 @@ final class ShardInformation { } } - void addOnShardInitialized(OnShardInitialized onShardInitialized) { + void addOnShardInitialized(final OnShardInitialized onShardInitialized) { onShardInitializedSet.add(onShardInitialized); } - void removeOnShardInitialized(OnShardInitialized onShardInitialized) { + void removeOnShardInitialized(final OnShardInitialized onShardInitialized) { onShardInitializedSet.remove(onShardInitialized); } - void setRole(String newRole) { + void setRole(final String newRole) { this.role = newRole; notifyOnShardInitializedCallbacks(); @@ -213,7 +219,7 @@ final class ShardInformation { return role; } - void setFollowerSyncStatus(boolean syncStatus) { + void setFollowerSyncStatus(final boolean syncStatus) { this.followerSyncStatus = syncStatus; } @@ -227,7 +233,7 @@ final class ShardInformation { return false; } - boolean setLeaderId(String leaderId) { + boolean setLeaderId(final String leaderId) { final boolean changed = !Objects.equals(this.leaderId, leaderId); this.leaderId = leaderId; if (leaderId != null) { @@ -242,7 +248,7 @@ final class ShardInformation { return leaderId; } - void setLeaderAvailable(boolean leaderAvailable) { + void setLeaderAvailable(final boolean leaderAvailable) { this.leaderAvailable = leaderAvailable; if (leaderAvailable) { @@ -254,7 +260,7 @@ final class ShardInformation { return leaderVersion; } - void setLeaderVersion(short leaderVersion) { + void setLeaderVersion(final short leaderVersion) { this.leaderVersion = leaderVersion; } @@ -262,7 +268,15 @@ final class ShardInformation { return isActiveMember; } - void setActiveMember(boolean isActiveMember) { + void setActiveMember(final boolean isActiveMember) { this.isActiveMember = isActiveMember; } + + SchemaContext getSchemaContext() { + return schemaContextProvider.getSchemaContext(); + } + + void setSchemaContext(final SchemaContext schemaContext) { + schemaContextProvider.set(Preconditions.checkNotNull(schemaContext)); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java index aa9be490bd..6a91bf79f6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java @@ -175,7 +175,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private ListenerRegistration configListenerReg = null; private PrefixedShardConfigUpdateHandler configUpdateHandler; - ShardManager(AbstractShardManagerCreator builder) { + ShardManager(final AbstractShardManagerCreator builder) { this.cluster = builder.getCluster(); this.configuration = builder.getConfiguration(); this.datastoreContextFactory = builder.getDatastoreContextFactory(); @@ -220,7 +220,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } @Override - public void handleCommand(Object message) throws Exception { + public void handleCommand(final Object message) throws Exception { if (message instanceof FindPrimary) { findPrimary((FindPrimary)message); } else if (message instanceof FindLocalShard) { @@ -356,7 +356,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { combinedFutures.onComplete(new OnComplete>() { @Override - public void onComplete(Throwable failure, Iterable results) { + public void onComplete(final Throwable failure, final Iterable results) { LOG.debug("{}: All shards shutdown - sending PoisonPill to self", persistenceId()); self().tell(PoisonPill.getInstance(), self()); @@ -379,15 +379,15 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { }, dispatcher); } - private void onWrappedShardResponse(WrappedShardResponse message) { + private void onWrappedShardResponse(final WrappedShardResponse message) { if (message.getResponse() instanceof RemoveServerReply) { onRemoveServerReply(getSender(), message.getShardId(), (RemoveServerReply) message.getResponse(), message.getLeaderPath()); } } - private void onRemoveServerReply(ActorRef originalSender, ShardIdentifier shardId, RemoveServerReply replyMsg, - String leaderPath) { + private void onRemoveServerReply(final ActorRef originalSender, final ShardIdentifier shardId, + final RemoveServerReply replyMsg, final String leaderPath) { shardReplicaOperationsInProgress.remove(shardId.getShardName()); LOG.debug("{}: Received {} for shard {}", persistenceId(), replyMsg, shardId.getShardName()); @@ -427,7 +427,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { futureObj.onComplete(new OnComplete() { @Override - public void onComplete(Throwable failure, Object response) { + public void onComplete(final Throwable failure, final Object response) { if (failure != null) { shardReplicaOperationsInProgress.remove(shardName); String msg = String.format("RemoveServer request to leader %s for shard %s failed", @@ -445,8 +445,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client)); } - private void removeShardReplica(RemoveShardReplica contextMessage, final String shardName, final String primaryPath, - final ActorRef sender) { + private void removeShardReplica(final RemoveShardReplica contextMessage, final String shardName, + final String primaryPath, final ActorRef sender) { if (isShardReplicaOperationInProgress(shardName, sender)) { return; } @@ -467,7 +467,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { futureObj.onComplete(new OnComplete() { @Override - public void onComplete(Throwable failure, Object response) { + public void onComplete(final Throwable failure, final Object response) { if (failure != null) { shardReplicaOperationsInProgress.remove(shardName); String msg = String.format("RemoveServer request to leader %s for shard %s failed", @@ -485,7 +485,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client)); } - private void onShardReplicaRemoved(ServerRemoved message) { + private void onShardReplicaRemoved(final ServerRemoved message) { removeShard(new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build()); } @@ -564,7 +564,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } @SuppressWarnings("checkstyle:IllegalCatch") - private void onCreateShard(CreateShard createShard) { + private void onCreateShard(final CreateShard createShard) { LOG.debug("{}: onCreateShard: {}", persistenceId(), createShard); Object reply; @@ -624,7 +624,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { final ActorRef sender = getSender(); stopOnComplete.addOnComplete(new OnComplete() { @Override - public void onComplete(Throwable failure, Boolean result) { + public void onComplete(final Throwable failure, final Boolean result) { LOG.debug("{} : Stop complete for shard {} - re-queing {}", persistenceId(), shardName, messageToDefer); self().tell(messageToDefer, sender); } @@ -633,7 +633,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { return true; } - private void doCreatePrefixShard(PrefixShardConfiguration config, ShardIdentifier shardId, String shardName) { + private void doCreatePrefixShard(final PrefixShardConfiguration config, final ShardIdentifier shardId, + final String shardName) { configuration.addPrefixShardConfiguration(config); final Builder builder = newShardDatastoreContextBuilder(shardName); @@ -653,7 +654,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { localShards.put(info.getShardName(), info); if (schemaContext != null) { - info.setActor(newShardActor(schemaContext, info)); + info.setSchemaContext(schemaContext); + info.setActor(newShardActor(info)); } } @@ -714,16 +716,17 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { localShards.put(info.getShardName(), info); if (schemaContext != null) { - info.setActor(newShardActor(schemaContext, info)); + info.setSchemaContext(schemaContext); + info.setActor(newShardActor(info)); } } - private DatastoreContext.Builder newShardDatastoreContextBuilder(String shardName) { + private DatastoreContext.Builder newShardDatastoreContextBuilder(final String shardName) { return DatastoreContext.newBuilderFrom(datastoreContextFactory.getShardDatastoreContext(shardName)) .shardPeerAddressResolver(peerAddressResolver); } - private DatastoreContext newShardDatastoreContext(String shardName) { + private DatastoreContext newShardDatastoreContext(final String shardName) { return newShardDatastoreContextBuilder(shardName).build(); } @@ -736,7 +739,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } - private void onLeaderStateChanged(ShardLeaderStateChanged leaderStateChanged) { + private void onLeaderStateChanged(final ShardLeaderStateChanged leaderStateChanged) { LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged); ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId()); @@ -753,7 +756,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } - private void onShardNotInitializedTimeout(ShardNotInitializedTimeout message) { + private void onShardNotInitializedTimeout(final ShardNotInitializedTimeout message) { ShardInformation shardInfo = message.getShardInfo(); LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(), @@ -770,7 +773,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } - private void onFollowerInitialSyncStatus(FollowerInitialSyncUpStatus status) { + private void onFollowerInitialSyncStatus(final FollowerInitialSyncUpStatus status) { LOG.info("{} Received follower initial sync status for {} status sync done {}", persistenceId(), status.getName(), status.isInitialSyncDone()); @@ -784,7 +787,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } - private void onRoleChangeNotification(RoleChangeNotification roleChanged) { + private void onRoleChangeNotification(final RoleChangeNotification roleChanged) { LOG.info("{}: Received role changed for {} from {} to {}", persistenceId(), roleChanged.getMemberId(), roleChanged.getOldRole(), roleChanged.getNewRole()); @@ -797,7 +800,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } - private ShardInformation findShardInformation(String memberId) { + private ShardInformation findShardInformation(final String memberId) { for (ShardInformation info : localShards.values()) { if (info.getShardId().toString().equals(memberId)) { return info; @@ -827,7 +830,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { return true; } - private void onActorInitialized(Object message) { + private void onActorInitialized(final Object message) { final ActorRef sender = getSender(); if (sender == null) { @@ -848,7 +851,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { markShardAsInitialized(shardId.getShardName()); } - private void markShardAsInitialized(String shardName) { + private void markShardAsInitialized(final String shardName) { LOG.debug("{}: Initializing shard [{}]", persistenceId(), shardName); ShardInformation shardInformation = localShards.get(shardName); @@ -860,7 +863,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } @Override - protected void handleRecover(Object message) throws Exception { + protected void handleRecover(final Object message) throws Exception { if (message instanceof RecoveryCompleted) { onRecoveryCompleted(); } else if (message instanceof SnapshotOffer) { @@ -888,8 +891,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { createLocalShards(); } - private void sendResponse(ShardInformation shardInformation, boolean doWait, - boolean wantShardReady, final Supplier messageSupplier) { + private void sendResponse(final ShardInformation shardInformation, final boolean doWait, + final boolean wantShardReady, final Supplier messageSupplier) { if (!shardInformation.isShardInitialized() || wantShardReady && !shardInformation.isShardReadyWithLeaderId()) { if (doWait) { final ActorRef sender = getSender(); @@ -937,11 +940,11 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { getSender().tell(messageSupplier.get(), getSelf()); } - private static NoShardLeaderException createNoShardLeaderException(ShardIdentifier shardId) { + private static NoShardLeaderException createNoShardLeaderException(final ShardIdentifier shardId) { return new NoShardLeaderException(null, shardId.toString()); } - private static NotInitializedException createNotInitializedException(ShardIdentifier shardId) { + private static NotInitializedException createNotInitializedException(final ShardIdentifier shardId) { return new NotInitializedException(String.format( "Found primary shard %s but it's not initialized yet. Please try again later", shardId)); } @@ -951,7 +954,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { return MemberName.forName(member.roles().iterator().next()); } - private void memberRemoved(ClusterEvent.MemberRemoved message) { + private void memberRemoved(final ClusterEvent.MemberRemoved message) { MemberName memberName = memberToName(message.member()); LOG.info("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName, @@ -964,7 +967,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } - private void memberExited(ClusterEvent.MemberExited message) { + private void memberExited(final ClusterEvent.MemberExited message) { MemberName memberName = memberToName(message.member()); LOG.info("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName, @@ -977,7 +980,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } - private void memberUp(ClusterEvent.MemberUp message) { + private void memberUp(final ClusterEvent.MemberUp message) { MemberName memberName = memberToName(message.member()); LOG.info("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName, @@ -986,12 +989,12 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { memberUp(memberName, message.member().address()); } - private void memberUp(MemberName memberName, Address address) { + private void memberUp(final MemberName memberName, final Address address) { addPeerAddress(memberName, address); checkReady(); } - private void memberWeaklyUp(MemberWeaklyUp message) { + private void memberWeaklyUp(final MemberWeaklyUp message) { MemberName memberName = memberToName(message.member()); LOG.info("{}: Received MemberWeaklyUp: memberName: {}, address: {}", persistenceId(), memberName, @@ -1000,7 +1003,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { memberUp(memberName, message.member().address()); } - private void addPeerAddress(MemberName memberName, Address address) { + private void addPeerAddress(final MemberName memberName, final Address address) { peerAddressResolver.addPeerAddress(memberName, address); for (ShardInformation info : localShards.values()) { @@ -1012,7 +1015,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } - private void memberReachable(ClusterEvent.ReachableMember message) { + private void memberReachable(final ClusterEvent.ReachableMember message) { MemberName memberName = memberToName(message.member()); LOG.info("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address()); @@ -1021,7 +1024,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { markMemberAvailable(memberName); } - private void memberUnreachable(ClusterEvent.UnreachableMember message) { + private void memberUnreachable(final ClusterEvent.UnreachableMember message) { MemberName memberName = memberToName(message.member()); LOG.info("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address()); @@ -1058,7 +1061,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } - private void onDatastoreContextFactory(DatastoreContextFactory factory) { + private void onDatastoreContextFactory(final DatastoreContextFactory factory) { datastoreContextFactory = factory; for (ShardInformation info : localShards.values()) { info.setDatastoreContext(newShardDatastoreContext(info.getShardName()), getSelf()); @@ -1117,9 +1120,11 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getAllModuleIdentifiers().size()); for (ShardInformation info : localShards.values()) { + info.setSchemaContext(schemaContext); + if (info.getActor() == null) { LOG.debug("Creating Shard {}", info.getShardId()); - info.setActor(newShardActor(schemaContext, info)); + info.setActor(newShardActor(info)); } else { info.getActor().tell(message, getSelf()); } @@ -1132,12 +1137,12 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } @VisibleForTesting - protected ActorRef newShardActor(final SchemaContext shardSchemaContext, final ShardInformation info) { - return getContext().actorOf(info.newProps(shardSchemaContext).withDispatcher(shardDispatcherPath), + protected ActorRef newShardActor(final ShardInformation info) { + return getContext().actorOf(info.newProps().withDispatcher(shardDispatcherPath), info.getShardId().toString()); } - private void findPrimary(FindPrimary message) { + private void findPrimary(final FindPrimary message) { LOG.debug("{}: In findPrimary: {}", persistenceId(), message); final String shardName = message.getShardName(); @@ -1194,7 +1199,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { Future futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout); futureObj.onComplete(new OnComplete() { @Override - public void onComplete(Throwable failure, Object response) { + public void onComplete(final Throwable failure, final Object response) { if (failure != null) { handler.onFailure(failure); } else { @@ -1218,7 +1223,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { * @param shardName the shard name * @return a b */ - private ShardIdentifier getShardIdentifier(MemberName memberName, String shardName) { + private ShardIdentifier getShardIdentifier(final MemberName memberName, final String shardName) { return peerAddressResolver.getShardIdentifier(memberName, shardName); } @@ -1255,7 +1260,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { * * @param shardName the shard name */ - private Map getPeerAddresses(String shardName) { + private Map getPeerAddresses(final String shardName) { final Collection members = configuration.getMembersFromShardName(shardName); return getPeerAddresses(shardName, members); } @@ -1324,7 +1329,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), getSelf()) { @Override - public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) { + public void onRemotePrimaryShardFound(final RemotePrimaryShardFound response) { final RunnableMessage runnable = (RunnableMessage) () -> addPrefixShard(getShardName(), message.getShardPrefix(), response, getSender()); if (!isPreviousShardActorStopInProgress(getShardName(), runnable)) { @@ -1333,7 +1338,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } @Override - public void onLocalPrimaryFound(LocalPrimaryShardFound message) { + public void onLocalPrimaryFound(final LocalPrimaryShardFound message) { sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor()); } }); @@ -1364,7 +1369,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), getSelf()) { @Override - public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) { + public void onRemotePrimaryShardFound(final RemotePrimaryShardFound response) { final RunnableMessage runnable = (RunnableMessage) () -> addShard(getShardName(), response, getSender()); if (!isPreviousShardActorStopInProgress(getShardName(), runnable)) { @@ -1373,13 +1378,13 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } @Override - public void onLocalPrimaryFound(LocalPrimaryShardFound message) { + public void onLocalPrimaryFound(final LocalPrimaryShardFound message) { sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor()); } }); } - private void sendLocalReplicaAlreadyExistsReply(String shardName, ActorRef sender) { + private void sendLocalReplicaAlreadyExistsReply(final String shardName, final ActorRef sender) { String msg = String.format("Local shard %s already exists", shardName); LOG.debug("{}: {}", persistenceId(), msg); sender.tell(new Status.Failure(new AlreadyExistsException(msg)), getSelf()); @@ -1408,8 +1413,9 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext, Shard.builder(), peerAddressResolver); shardInfo.setActiveMember(false); + shardInfo.setSchemaContext(schemaContext); localShards.put(shardName, shardInfo); - shardInfo.setActor(newShardActor(schemaContext, shardInfo)); + shardInfo.setActor(newShardActor(shardInfo)); } else { removeShardOnFailure = false; shardInfo = existingShardInfo; @@ -1438,8 +1444,9 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext, Shard.builder(), peerAddressResolver); shardInfo.setActiveMember(false); + shardInfo.setSchemaContext(schemaContext); localShards.put(shardName, shardInfo); - shardInfo.setActor(newShardActor(schemaContext, shardInfo)); + shardInfo.setActor(newShardActor(shardInfo)); } else { removeShardOnFailure = false; shardInfo = existingShardInfo; @@ -1468,7 +1475,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { futureObj.onComplete(new OnComplete() { @Override - public void onComplete(Throwable failure, Object addServerResponse) { + public void onComplete(final Throwable failure, final Object addServerResponse) { if (failure != null) { LOG.debug("{}: AddServer request to {} for {} failed", persistenceId(), response.getPrimaryPath(), shardName, failure); @@ -1484,8 +1491,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client)); } - private void onAddServerFailure(String shardName, String message, Throwable failure, ActorRef sender, - boolean removeShardOnFailure) { + private void onAddServerFailure(final String shardName, final String message, final Throwable failure, + final ActorRef sender, final boolean removeShardOnFailure) { shardReplicaOperationsInProgress.remove(shardName); if (removeShardOnFailure) { @@ -1499,8 +1506,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { new RuntimeException(message, failure)), getSelf()); } - private void onAddServerReply(ShardInformation shardInfo, AddServerReply replyMsg, ActorRef sender, - String leaderPath, boolean removeShardOnFailure) { + private void onAddServerReply(final ShardInformation shardInfo, final AddServerReply replyMsg, + final ActorRef sender, final String leaderPath, final boolean removeShardOnFailure) { String shardName = shardInfo.getShardName(); shardReplicaOperationsInProgress.remove(shardName); @@ -1528,29 +1535,23 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } - private static Exception getServerChangeException(Class serverChange, ServerChangeStatus serverChangeStatus, - String leaderPath, ShardIdentifier shardId) { - Exception failure; + private static Exception getServerChangeException(final Class serverChange, + final ServerChangeStatus serverChangeStatus, final String leaderPath, final ShardIdentifier shardId) { switch (serverChangeStatus) { case TIMEOUT: - failure = new TimeoutException(String.format( + return new TimeoutException(String.format( "The shard leader %s timed out trying to replicate the initial data to the new shard %s." + "Possible causes - there was a problem replicating the data or shard leadership changed " + "while replicating the shard data", leaderPath, shardId.getShardName())); - break; case NO_LEADER: - failure = createNoShardLeaderException(shardId); - break; + return createNoShardLeaderException(shardId); case NOT_SUPPORTED: - failure = new UnsupportedOperationException(String.format("%s request is not supported for shard %s", + return new UnsupportedOperationException(String.format("%s request is not supported for shard %s", serverChange.getSimpleName(), shardId.getShardName())); - break; default : - failure = new RuntimeException(String.format( - "%s request to leader %s for shard %s failed with status %s", + return new RuntimeException(String.format("%s request to leader %s for shard %s failed with status %s", serverChange.getSimpleName(), leaderPath, shardId.getShardName(), serverChangeStatus)); } - return failure; } private void onRemoveShardReplica(final RemoveShardReplica shardReplicaMsg) { @@ -1559,12 +1560,12 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { findPrimary(shardReplicaMsg.getShardName(), new AutoFindPrimaryFailureResponseHandler(getSender(), shardReplicaMsg.getShardName(), persistenceId(), getSelf()) { @Override - public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) { + public void onRemotePrimaryShardFound(final RemotePrimaryShardFound response) { doRemoveShardReplicaAsync(response.getPrimaryPath()); } @Override - public void onLocalPrimaryFound(LocalPrimaryShardFound response) { + public void onLocalPrimaryFound(final LocalPrimaryShardFound response) { doRemoveShardReplicaAsync(response.getPrimaryPath()); } @@ -1585,12 +1586,12 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), getSelf()) { @Override - public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) { + public void onRemotePrimaryShardFound(final RemotePrimaryShardFound response) { doRemoveShardReplicaAsync(response.getPrimaryPath()); } @Override - public void onLocalPrimaryFound(LocalPrimaryShardFound response) { + public void onLocalPrimaryFound(final LocalPrimaryShardFound response) { doRemoveShardReplicaAsync(response.getPrimaryPath()); } @@ -1619,7 +1620,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { return currentSnapshot; } - private void applyShardManagerSnapshot(ShardManagerSnapshot snapshot) { + private void applyShardManagerSnapshot(final ShardManagerSnapshot snapshot) { currentSnapshot = snapshot; LOG.debug("{}: onSnapshotOffer: {}", persistenceId(), currentSnapshot); @@ -1643,7 +1644,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } - private void onSaveSnapshotSuccess(SaveSnapshotSuccess successMessage) { + private void onSaveSnapshotSuccess(final SaveSnapshotSuccess successMessage) { LOG.debug("{} saved ShardManager snapshot successfully. Deleting the prev snapshot if available", persistenceId()); deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), successMessage.metadata().timestamp() - 1, @@ -1667,7 +1668,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { localShardFound.getPath(), getSender())); } - private void onFlipShardMembersVotingStatus(FlipShardMembersVotingStatus flipMembersVotingStatus) { + private void onFlipShardMembersVotingStatus(final FlipShardMembersVotingStatus flipMembersVotingStatus) { LOG.debug("{}: onFlipShardMembersVotingStatus: {}", persistenceId(), flipMembersVotingStatus); ActorRef sender = getSender(); @@ -1678,7 +1679,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { future.onComplete(new OnComplete() { @Override - public void onComplete(Throwable failure, Object response) { + public void onComplete(final Throwable failure, final Object response) { if (failure != null) { sender.tell(new Status.Failure(new RuntimeException( String.format("Failed to access local shard %s", shardName), failure)), self()); @@ -1702,7 +1703,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } - private void findLocalShard(FindLocalShard message) { + private void findLocalShard(final FindLocalShard message) { LOG.debug("{}: findLocalShard : {}", persistenceId(), message.getShardName()); final ShardInformation shardInformation = localShards.get(message.getShardName()); @@ -1727,7 +1728,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { Future futureObj = ask(getSelf(), new FindLocalShard(shardName, true), findLocalTimeout); futureObj.onComplete(new OnComplete() { @Override - public void onComplete(Throwable failure, Object response) { + public void onComplete(final Throwable failure, final Object response) { if (failure != null) { LOG.debug("{}: Received failure from FindLocalShard for shard {}", persistenceId, shardName, failure); @@ -1753,7 +1754,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client)); } - private void changeShardMembersVotingStatus(ChangeServersVotingStatus changeServersVotingStatus, + private void changeShardMembersVotingStatus(final ChangeServersVotingStatus changeServersVotingStatus, final String shardName, final ActorRef shardActorRef, final ActorRef sender) { if (isShardReplicaOperationInProgress(shardName, sender)) { return; @@ -1772,7 +1773,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { futureObj.onComplete(new OnComplete() { @Override - public void onComplete(Throwable failure, Object response) { + public void onComplete(final Throwable failure, final Object response) { shardReplicaOperationsInProgress.remove(shardName); if (failure != null) { String msg = String.format("ChangeServersVotingStatus request to local shard %s failed", @@ -1809,8 +1810,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { String leaderPath; boolean removeShardOnFailure; - ForwardedAddServerReply(ShardInformation shardInfo, AddServerReply addServerReply, String leaderPath, - boolean removeShardOnFailure) { + ForwardedAddServerReply(final ShardInformation shardInfo, final AddServerReply addServerReply, + final String leaderPath, final boolean removeShardOnFailure) { this.shardInfo = shardInfo; this.addServerReply = addServerReply; this.leaderPath = leaderPath; @@ -1824,8 +1825,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { Throwable failure; boolean removeShardOnFailure; - ForwardedAddServerFailure(String shardName, String failureMessage, Throwable failure, - boolean removeShardOnFailure) { + ForwardedAddServerFailure(final String shardName, final String failureMessage, final Throwable failure, + final boolean removeShardOnFailure) { this.shardName = shardName; this.failureMessage = failureMessage; this.failure = failure; @@ -1837,7 +1838,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private final Runnable replyRunnable; private Cancellable timeoutSchedule; - OnShardInitialized(Runnable replyRunnable) { + OnShardInitialized(final Runnable replyRunnable) { this.replyRunnable = replyRunnable; } @@ -1849,13 +1850,13 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { return timeoutSchedule; } - void setTimeoutSchedule(Cancellable timeoutSchedule) { + void setTimeoutSchedule(final Cancellable timeoutSchedule) { this.timeoutSchedule = timeoutSchedule; } } static class OnShardReady extends OnShardInitialized { - OnShardReady(Runnable replyRunnable) { + OnShardReady(final Runnable replyRunnable) { super(replyRunnable); } } @@ -1915,8 +1916,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { * @param persistenceId The persistenceId for the ShardManager * @param shardManagerActor The ShardManager actor which triggered the call to FindPrimary */ - protected AutoFindPrimaryFailureResponseHandler(ActorRef targetActor, String shardName, String persistenceId, - ActorRef shardManagerActor) { + protected AutoFindPrimaryFailureResponseHandler(final ActorRef targetActor, final String shardName, + final String persistenceId, final ActorRef shardManagerActor) { this.targetActor = Preconditions.checkNotNull(targetActor); this.shardName = Preconditions.checkNotNull(shardName); this.persistenceId = Preconditions.checkNotNull(persistenceId); @@ -1932,14 +1933,14 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } @Override - public void onFailure(Throwable failure) { + public void onFailure(final Throwable failure) { LOG.debug("{}: Received failure from FindPrimary for shard {}", persistenceId, shardName, failure); targetActor.tell(new Status.Failure(new RuntimeException( String.format("Failed to find leader for shard %s", shardName), failure)), shardManagerActor); } @Override - public void onUnknownResponse(Object response) { + public void onUnknownResponse(final Object response) { String msg = String.format("Failed to find leader for shard %s: received response: %s", shardName, response); LOG.debug("{}: {}", persistenceId, msg); @@ -1956,7 +1957,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private final Object response; private final String leaderPath; - WrappedShardResponse(ShardIdentifier shardId, Object response, String leaderPath) { + WrappedShardResponse(final ShardIdentifier shardId, final Object response, final String leaderPath) { this.shardId = shardId; this.response = response; this.leaderPath = leaderPath; @@ -1980,7 +1981,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private final ShardInformation shardInfo; private final OnShardInitialized onShardInitialized; - ShardNotInitializedTimeout(ShardInformation shardInfo, OnShardInitialized onShardInitialized, ActorRef sender) { + ShardNotInitializedTimeout(final ShardInformation shardInfo, final OnShardInitialized onShardInitialized, + final ActorRef sender) { this.sender = sender; this.shardInfo = shardInfo; this.onShardInitialized = onShardInitialized; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java index 3a676586eb..59a8ecd972 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java @@ -125,7 +125,8 @@ public abstract class AbstractShardTest extends AbstractActorTest { } protected Shard.Builder newShardBuilder() { - return Shard.builder().id(shardID).datastoreContext(newDatastoreContext()).schemaContext(SCHEMA_CONTEXT); + return Shard.builder().id(shardID).datastoreContext(newDatastoreContext()) + .schemaContextProvider(() -> SCHEMA_CONTEXT); } protected void testRecovery(final Set listEntryKeys) throws Exception { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index 0bd34aac3e..445116246b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -2020,13 +2020,13 @@ public class ShardTest extends AbstractShardTest { .shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build(); final Props persistentProps = Shard.builder().id(shardID).datastoreContext(persistentContext) - .schemaContext(SCHEMA_CONTEXT).props(); + .schemaContextProvider(() -> SCHEMA_CONTEXT).props(); final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder() .shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build(); final Props nonPersistentProps = Shard.builder().id(shardID).datastoreContext(nonPersistentContext) - .schemaContext(SCHEMA_CONTEXT).props(); + .schemaContextProvider(() -> SCHEMA_CONTEXT).props(); new ShardTestKit(getSystem()) { { @@ -2174,14 +2174,14 @@ public class ShardTest extends AbstractShardTest { .datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build()) .peerAddresses(Collections.singletonMap(leaderShardID.toString(), "akka://test/user/" + leaderShardID.toString())) - .schemaContext(SCHEMA_CONTEXT).props() + .schemaContextProvider(() -> SCHEMA_CONTEXT).props() .withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString()); final TestActorRef leaderShard = actorFactory .createTestActor(Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext()) .peerAddresses(Collections.singletonMap(followerShardID.toString(), "akka://test/user/" + followerShardID.toString())) - .schemaContext(SCHEMA_CONTEXT).props() + .schemaContextProvider(() -> SCHEMA_CONTEXT).props() .withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString()); leaderShard.tell(TimeoutNow.INSTANCE, ActorRef.noSender()); @@ -2293,14 +2293,14 @@ public class ShardTest extends AbstractShardTest { .datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build()) .peerAddresses(Collections.singletonMap(leaderShardID.toString(), "akka://test/user/" + leaderShardID.toString())) - .schemaContext(SCHEMA_CONTEXT).props() + .schemaContextProvider(() -> SCHEMA_CONTEXT).props() .withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString()); final TestActorRef leaderShard = actorFactory .createTestActor(Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext()) .peerAddresses(Collections.singletonMap(followerShardID.toString(), "akka://test/user/" + followerShardID.toString())) - .schemaContext(SCHEMA_CONTEXT).props() + .schemaContextProvider(() -> SCHEMA_CONTEXT).props() .withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString()); leaderShard.tell(TimeoutNow.INSTANCE, ActorRef.noSender()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java index 233ecb8cf3..881429bee6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java @@ -51,7 +51,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { private ActorRef createShard() { ActorRef shard = getSystem().actorOf(Shard.builder().id(SHARD_IDENTIFIER).datastoreContext(datastoreContext) - .schemaContext(TestModel.createTestContext()).props()); + .schemaContextProvider(() -> TEST_SCHEMA_CONTEXT).props()); ShardTestKit.waitUntilLeader(shard); return shard; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java index f8997106f6..84bc87238a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java @@ -50,6 +50,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; public class ShardTransactionTest extends AbstractActorTest { @@ -59,6 +60,7 @@ public class ShardTransactionTest extends AbstractActorTest { private static final ShardIdentifier SHARD_IDENTIFIER = ShardIdentifier.create("inventory", MEMBER_NAME, "config"); + private static final SchemaContext TEST_MODEL = TestModel.createTestContext(); private DatastoreContext datastoreContext = DatastoreContext.newBuilder().persistent(false).build(); @@ -70,7 +72,7 @@ public class ShardTransactionTest extends AbstractActorTest { @Before public void setUp() { shard = actorFactory.createTestActor(Shard.builder().id(SHARD_IDENTIFIER).datastoreContext(datastoreContext) - .schemaContext(TestModel.createTestContext()).props() + .schemaContextProvider(() -> TEST_MODEL).props() .withDispatcher(Dispatchers.DefaultDispatcherId())); ShardTestKit.waitUntilLeader(shard); store = shard.underlyingActor().getDataStore(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java index c61bbc10d3..cfef02c509 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java @@ -1223,25 +1223,26 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { return newShardProps(newShardId(LOCAL_MEMBER_NAME), Collections.emptyMap(), LOCAL_MEMBER_NAME); } - private Props newShardProps(ShardIdentifier shardId, Map peers, String memberName) { + private Props newShardProps(final ShardIdentifier shardId, final Map peers, + final String memberName) { return newShardProps(shardId, peers, memberName, EntityOwnerSelectionStrategyConfig.newBuilder().build()); } - private Props newShardProps(ShardIdentifier shardId, Map peers, String memberName, - EntityOwnerSelectionStrategyConfig config) { + private Props newShardProps(final ShardIdentifier shardId, final Map peers, final String memberName, + final EntityOwnerSelectionStrategyConfig config) { return newShardBuilder(shardId, peers, memberName).ownerSelectionStrategyConfig(config).props() .withDispatcher(Dispatchers.DefaultDispatcherId()); } - private EntityOwnershipShard.Builder newShardBuilder(ShardIdentifier shardId, Map peers, - String memberName) { + private EntityOwnershipShard.Builder newShardBuilder(final ShardIdentifier shardId, final Map peers, + final String memberName) { return EntityOwnershipShard.newBuilder().id(shardId).peerAddresses(peers).datastoreContext( - dataStoreContextBuilder.build()).schemaContext(SCHEMA_CONTEXT).localMemberName( + dataStoreContextBuilder.build()).schemaContextProvider(() -> SCHEMA_CONTEXT).localMemberName( MemberName.forName(memberName)).ownerSelectionStrategyConfig( EntityOwnerSelectionStrategyConfig.newBuilder().build()); } - private Map peerMap(String... peerIds) { + private Map peerMap(final String... peerIds) { ImmutableMap.Builder builder = ImmutableMap.builder(); for (String peerId: peerIds) { builder.put(peerId, actorFactory.createTestActorPath(peerId)).build(); @@ -1254,14 +1255,14 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { private final TestActorRef collectorActor; private final Map, Predicate> dropMessagesOfType = new ConcurrentHashMap<>(); - TestEntityOwnershipShard(Builder builder, TestActorRef collectorActor) { + TestEntityOwnershipShard(final Builder builder, final TestActorRef collectorActor) { super(builder); this.collectorActor = collectorActor; } @SuppressWarnings({ "unchecked", "rawtypes" }) @Override - public void handleCommand(Object message) { + public void handleCommand(final Object message) { Predicate drop = dropMessagesOfType.get(message.getClass()); if (drop == null || !drop.test(message)) { super.handleCommand(message); @@ -1272,15 +1273,15 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { } } - void startDroppingMessagesOfType(Class msgClass) { + void startDroppingMessagesOfType(final Class msgClass) { dropMessagesOfType.put(msgClass, msg -> true); } - void startDroppingMessagesOfType(Class msgClass, Predicate filter) { + void startDroppingMessagesOfType(final Class msgClass, final Predicate filter) { dropMessagesOfType.put(msgClass, filter); } - void stopDroppingMessagesOfType(Class msgClass) { + void stopDroppingMessagesOfType(final Class msgClass) { dropMessagesOfType.remove(msgClass); } @@ -1288,11 +1289,11 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { return collectorActor; } - static Props props(Builder builder) { + static Props props(final Builder builder) { return props(builder, null); } - static Props props(Builder builder, TestActorRef collectorActor) { + static Props props(final Builder builder, final TestActorRef collectorActor) { return Props.create(TestEntityOwnershipShard.class, builder, collectorActor) .withDispatcher(Dispatchers.DefaultDispatcherId()); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerTest.java index 5efd094702..7f415d7dc7 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManagerTest.java @@ -137,11 +137,11 @@ public class ShardManagerTest extends AbstractShardManagerTest { private final String shardMgrID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString(); - private ActorSystem newActorSystem(String config) { + private ActorSystem newActorSystem(final String config) { return newActorSystem("cluster-test", config); } - private ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) { + private ActorRef newMockShardActor(final ActorSystem system, final String shardName, final String memberName) { String name = ShardIdentifier.create(shardName, MemberName.forName(memberName), "config").toString(); if (system == getSystem()) { return actorFactory.createActor(Props.create(MessageCollectorActor.class), name); @@ -154,7 +154,7 @@ public class ShardManagerTest extends AbstractShardManagerTest { return newShardMgrProps(new MockConfiguration()); } - private static DatastoreContextFactory newDatastoreContextFactory(DatastoreContext datastoreContext) { + private static DatastoreContextFactory newDatastoreContextFactory(final DatastoreContext datastoreContext) { DatastoreContextFactory mockFactory = mock(DatastoreContextFactory.class); Mockito.doReturn(datastoreContext).when(mockFactory).getBaseDatastoreContext(); Mockito.doReturn(datastoreContext).when(mockFactory).getShardDatastoreContext(Mockito.anyString()); @@ -165,7 +165,7 @@ public class ShardManagerTest extends AbstractShardManagerTest { return newTestShardMgrBuilderWithMockShardActor(mockShardActor); } - private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor(ActorRef shardActor) { + private TestShardManager.Builder newTestShardMgrBuilderWithMockShardActor(final ActorRef shardActor) { return TestShardManager.builder(datastoreContextBuilder).shardActor(shardActor) .distributedDataStore(mock(DistributedDataStore.class)); } @@ -176,7 +176,7 @@ public class ShardManagerTest extends AbstractShardManagerTest { Dispatchers.DefaultDispatcherId()); } - private Props newPropsShardMgrWithMockShardActor(ActorRef shardActor) { + private Props newPropsShardMgrWithMockShardActor(final ActorRef shardActor) { return newTestShardMgrBuilderWithMockShardActor(shardActor).props() .withDispatcher(Dispatchers.DefaultDispatcherId()); } @@ -186,14 +186,15 @@ public class ShardManagerTest extends AbstractShardManagerTest { return newTestShardManager(newShardMgrProps()); } - private TestShardManager newTestShardManager(Props props) { + private TestShardManager newTestShardManager(final Props props) { TestActorRef shardManagerActor = actorFactory.createTestActor(props); TestShardManager shardManager = shardManagerActor.underlyingActor(); shardManager.waitForRecoveryComplete(); return shardManager; } - private static void waitForShardInitialized(ActorRef shardManager, String shardName, JavaTestKit kit) { + private static void waitForShardInitialized(final ActorRef shardManager, final String shardName, + final JavaTestKit kit) { AssertionError last = null; Stopwatch sw = Stopwatch.createStarted(); while (sw.elapsed(TimeUnit.SECONDS) <= 5) { @@ -212,7 +213,7 @@ public class ShardManagerTest extends AbstractShardManagerTest { } @SuppressWarnings("unchecked") - private static T expectMsgClassOrFailure(Class msgClass, JavaTestKit kit, String msg) { + private static T expectMsgClassOrFailure(final Class msgClass, final JavaTestKit kit, final String msg) { Object reply = kit.expectMsgAnyClassOf(JavaTestKit.duration("5 sec"), msgClass, Failure.class); if (reply instanceof Failure) { throw new AssertionError(msg + " failed", ((Failure)reply).cause()); @@ -237,12 +238,12 @@ public class ShardManagerTest extends AbstractShardManagerTest { final MockConfiguration mockConfig = new MockConfiguration() { @Override - public Collection getMemberShardNames(MemberName memberName) { + public Collection getMemberShardNames(final MemberName memberName) { return Arrays.asList("default", "topology"); } @Override - public Collection getMembersFromShardName(String shardName) { + public Collection getMembersFromShardName(final String shardName) { return members("member-1"); } }; @@ -260,12 +261,12 @@ public class ShardManagerTest extends AbstractShardManagerTest { final PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache(); final CountDownLatch newShardActorLatch = new CountDownLatch(2); class LocalShardManager extends ShardManager { - LocalShardManager(AbstractShardManagerCreator creator) { + LocalShardManager(final AbstractShardManagerCreator creator) { super(creator); } @Override - protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) { + protected ActorRef newShardActor(final ShardInformation info) { Entry entry = shardInfoMap.get(info.getShardName()); ActorRef ref = null; if (entry != null) { @@ -1141,7 +1142,7 @@ public class ShardManagerTest extends AbstractShardManagerTest { LOG.info("testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards starting"); TestShardManager shardManager = newTestShardManager(newShardMgrProps(new MockConfiguration() { @Override - public List getMemberShardNames(MemberName memberName) { + public List getMemberShardNames(final MemberName memberName) { return Arrays.asList("default", "astronauts"); } })); @@ -1201,7 +1202,7 @@ public class ShardManagerTest extends AbstractShardManagerTest { }; } - private static List members(String... names) { + private static List members(final String... names) { return Arrays.asList(names).stream().map(MemberName::forName).collect(Collectors.toList()); } @@ -2112,14 +2113,14 @@ public class ShardManagerTest extends AbstractShardManagerTest { private CountDownLatch memberReachableReceived = new CountDownLatch(1); private volatile MessageInterceptor messageInterceptor; - private TestShardManager(Builder builder) { + private TestShardManager(final Builder builder) { super(builder); shardActor = builder.shardActor; shardActors = builder.shardActors; } @Override - protected void handleRecover(Object message) throws Exception { + protected void handleRecover(final Object message) throws Exception { try { super.handleRecover(message); } finally { @@ -2129,14 +2130,14 @@ public class ShardManagerTest extends AbstractShardManagerTest { } } - private void countDownIfOther(final Member member, CountDownLatch latch) { + private void countDownIfOther(final Member member, final CountDownLatch latch) { if (!getCluster().getCurrentMemberName().equals(memberToName(member))) { latch.countDown(); } } @Override - public void handleCommand(Object message) throws Exception { + public void handleCommand(final Object message) throws Exception { try { if (messageInterceptor != null && messageInterceptor.canIntercept(message)) { getSender().tell(messageInterceptor.apply(message), getSelf()); @@ -2158,7 +2159,7 @@ public class ShardManagerTest extends AbstractShardManagerTest { } } - void setMessageInterceptor(MessageInterceptor messageInterceptor) { + void setMessageInterceptor(final MessageInterceptor messageInterceptor) { this.messageInterceptor = messageInterceptor; } @@ -2198,7 +2199,7 @@ public class ShardManagerTest extends AbstractShardManagerTest { findPrimaryMessageReceived = new CountDownLatch(1); } - public static Builder builder(DatastoreContext.Builder datastoreContextBuilder) { + public static Builder builder(final DatastoreContext.Builder datastoreContextBuilder) { return new Builder(datastoreContextBuilder); } @@ -2206,37 +2207,37 @@ public class ShardManagerTest extends AbstractShardManagerTest { private ActorRef shardActor; private final Map shardActors = new HashMap<>(); - Builder(DatastoreContext.Builder datastoreContextBuilder) { + Builder(final DatastoreContext.Builder datastoreContextBuilder) { super(TestShardManager.class); datastoreContextFactory(newDatastoreContextFactory(datastoreContextBuilder.build())); } - Builder shardActor(ActorRef newShardActor) { + Builder shardActor(final ActorRef newShardActor) { this.shardActor = newShardActor; return this; } - Builder addShardActor(String shardName, ActorRef actorRef) { + Builder addShardActor(final String shardName, final ActorRef actorRef) { shardActors.put(shardName, actorRef); return this; } } @Override - public void saveSnapshot(Object obj) { + public void saveSnapshot(final Object obj) { snapshot = (ShardManagerSnapshot) obj; snapshotPersist.countDown(); super.saveSnapshot(obj); } - void verifySnapshotPersisted(Set shardList) { + void verifySnapshotPersisted(final Set shardList) { assertEquals("saveSnapshot invoked", true, Uninterruptibles.awaitUninterruptibly(snapshotPersist, 5, TimeUnit.SECONDS)); assertEquals("Shard Persisted", shardList, Sets.newHashSet(snapshot.getShardList())); } @Override - protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) { + protected ActorRef newShardActor(final ShardInformation info) { if (shardActors.get(info.getShardName()) != null) { return shardActors.get(info.getShardName()); } @@ -2245,7 +2246,7 @@ public class ShardManagerTest extends AbstractShardManagerTest { return shardActor; } - return super.newShardActor(schemaContext, info); + return super.newShardActor(info); } } @@ -2253,7 +2254,7 @@ public class ShardManagerTest extends AbstractShardManagerTest { extends AbstractShardManagerCreator { private final Class shardManagerClass; - AbstractGenericCreator(Class shardManagerClass) { + AbstractGenericCreator(final Class shardManagerClass) { this.shardManagerClass = shardManagerClass; cluster(new MockClusterWrapper()).configuration(new MockConfiguration()).waitTillReadyCountDownLatch(ready) .primaryShardInfoCache(new PrimaryShardInfoFutureCache()); @@ -2267,7 +2268,7 @@ public class ShardManagerTest extends AbstractShardManagerTest { } private static class GenericCreator extends AbstractGenericCreator, C> { - GenericCreator(Class shardManagerClass) { + GenericCreator(final Class shardManagerClass) { super(shardManagerClass); } } @@ -2276,7 +2277,7 @@ public class ShardManagerTest extends AbstractShardManagerTest { private static final long serialVersionUID = 1L; private final Creator delegate; - DelegatingShardManagerCreator(Creator delegate) { + DelegatingShardManagerCreator(final Creator delegate) { this.delegate = delegate; } @@ -2293,12 +2294,12 @@ public class ShardManagerTest extends AbstractShardManagerTest { private static MessageInterceptor newFindPrimaryInterceptor(final ActorRef primaryActor) { return new MessageInterceptor() { @Override - public Object apply(Object message) { + public Object apply(final Object message) { return new RemotePrimaryShardFound(Serialization.serializedActorPath(primaryActor), (short) 1); } @Override - public boolean canIntercept(Object message) { + public boolean canIntercept(final Object message) { return message instanceof FindPrimary; } }; @@ -2311,13 +2312,13 @@ public class ShardManagerTest extends AbstractShardManagerTest { private final Class requestClass; @SuppressWarnings("unused") - MockRespondActor(Class requestClass, Object responseMsg) { + MockRespondActor(final Class requestClass, final Object responseMsg) { this.requestClass = requestClass; this.responseMsg = responseMsg; } @Override - public void onReceive(Object message) throws Exception { + public void onReceive(final Object message) throws Exception { if (message.equals(CLEAR_RESPONSE)) { responseMsg = null; } else {