X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardManager.java;h=88f818f0faedf76f0349ce1f7294dee37b9d79d1;hb=refs%2Fchanges%2F49%2F12649%2F4;hp=e68628dbf5c1fc992afb30ae986fff8ed8f6eef1;hpb=1b8f7c7beaed83797320686bebddd536637aed9a;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index e68628dbf5..88f818f0fa 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -24,6 +24,9 @@ import akka.persistence.RecoveryCompleted; import akka.persistence.RecoveryFailure; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; +import com.google.common.collect.Lists; +import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier; @@ -90,22 +93,29 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private final Collection knownModules = new HashSet<>(128); + private final DataPersistenceProvider dataPersistenceProvider; + /** * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be * configuration or operational */ - private ShardManager(String type, ClusterWrapper cluster, Configuration configuration, + protected ShardManager(String type, ClusterWrapper cluster, Configuration configuration, DatastoreContext datastoreContext) { this.type = Preconditions.checkNotNull(type, "type should not be null"); this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null"); this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null"); this.datastoreContext = datastoreContext; + this.dataPersistenceProvider = createDataPersistenceProvider(datastoreContext.isPersistent()); // Subscribe this actor to cluster member events cluster.subscribeToMemberEvents(getSelf()); - //createLocalShards(null); + createLocalShards(); + } + + protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) { + return (persistent) ? new PersistentDataProvider() : new NonPersistentDataProvider(); } public static Props props(final String type, @@ -123,8 +133,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { @Override public void handleCommand(Object message) throws Exception { if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) { - findPrimary( - FindPrimary.fromSerializable(message)); + findPrimary(FindPrimary.fromSerializable(message)); } else if(message instanceof FindLocalShard){ findLocalShard((FindLocalShard) message); } else if (message instanceof UpdateSchemaContext) { @@ -160,49 +169,76 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { markShardAsInitialized(shardId.getShardName()); } - @VisibleForTesting protected void markShardAsInitialized(String shardName) { + private void markShardAsInitialized(String shardName) { LOG.debug("Initializing shard [{}]", shardName); ShardInformation shardInformation = localShards.get(shardName); if (shardInformation != null) { - shardInformation.setShardInitialized(true); + shardInformation.setActorInitialized(); } } - @Override protected void handleRecover(Object message) throws Exception { + @Override + protected void handleRecover(Object message) throws Exception { + if(dataPersistenceProvider.isRecoveryApplicable()) { + if (message instanceof SchemaContextModules) { + SchemaContextModules msg = (SchemaContextModules) message; + knownModules.clear(); + knownModules.addAll(msg.getModules()); + } else if (message instanceof RecoveryFailure) { + RecoveryFailure failure = (RecoveryFailure) message; + LOG.error(failure.cause(), "Recovery failed"); + } else if (message instanceof RecoveryCompleted) { + LOG.info("Recovery complete : {}", persistenceId()); + + // Delete all the messages from the akka journal except the last one + deleteMessages(lastSequenceNr() - 1); + } + } else { + if (message instanceof RecoveryCompleted) { + LOG.info("Recovery complete : {}", persistenceId()); - if(message instanceof SchemaContextModules){ - SchemaContextModules msg = (SchemaContextModules) message; - knownModules.clear(); - knownModules.addAll(msg.getModules()); - } else if(message instanceof RecoveryFailure){ - RecoveryFailure failure = (RecoveryFailure) message; - LOG.error(failure.cause(), "Recovery failed"); - } else if(message instanceof RecoveryCompleted){ - LOG.info("Recovery complete : {}", persistenceId()); - - // Delete all the messages from the akka journal except the last one - deleteMessages(lastSequenceNr() - 1); + // Delete all the messages from the akka journal + deleteMessages(lastSequenceNr()); + } } } private void findLocalShard(FindLocalShard message) { - ShardInformation shardInformation = localShards.get(message.getShardName()); + final ShardInformation shardInformation = localShards.get(message.getShardName()); if(shardInformation == null){ getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf()); return; } - sendResponse(shardInformation, new LocalShardFound(shardInformation.getActor())); + sendResponse(shardInformation, message.isWaitUntilInitialized(), new Supplier() { + @Override + public Object get() { + return new LocalShardFound(shardInformation.getActor()); + } + }); } - private void sendResponse(ShardInformation shardInformation, Object message) { + private void sendResponse(ShardInformation shardInformation, boolean waitUntilInitialized, + final Supplier messageSupplier) { if (!shardInformation.isShardInitialized()) { - getSender().tell(new ActorNotInitialized(), getSelf()); + if(waitUntilInitialized) { + final ActorRef sender = getSender(); + final ActorRef self = self(); + shardInformation.addRunnableOnInitialized(new Runnable() { + @Override + public void run() { + sender.tell(messageSupplier.get(), self); + } + }); + } else { + getSender().tell(new ActorNotInitialized(), getSelf()); + } + return; } - getSender().tell(message, getSelf()); + getSender().tell(messageSupplier.get(), getSelf()); } private void memberRemoved(ClusterEvent.MemberRemoved message) { @@ -244,14 +280,17 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { knownModules.clear(); knownModules.addAll(newModules); - persist(new SchemaContextModules(newModules), new Procedure() { + dataPersistenceProvider.persist(new SchemaContextModules(newModules), new Procedure() { - @Override public void apply(SchemaContextModules param) throws Exception { + @Override + public void apply(SchemaContextModules param) throws Exception { LOG.info("Sending new SchemaContext to Shards"); - if (localShards.size() == 0) { - createLocalShards(schemaContext); - } else { - for (ShardInformation info : localShards.values()) { + for (ShardInformation info : localShards.values()) { + if (info.getActor() == null) { + info.setActor(getContext().actorOf(Shard.props(info.getShardId(), + info.getPeerAddresses(), datastoreContext, schemaContext), + info.getShardId().toString())); + } else { info.getActor().tell(message, getSelf()); } } @@ -265,14 +304,18 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } private void findPrimary(FindPrimary message) { - final ActorRef sender = getSender(); String shardName = message.getShardName(); // First see if the there is a local replica for the shard - ShardInformation info = localShards.get(shardName); + final ShardInformation info = localShards.get(shardName); if (info != null) { - ActorPath shardPath = info.getActorPath(); - sendResponse(info, new PrimaryFound(shardPath.toString()).toSerializable()); + sendResponse(info, message.isWaitUntilInitialized(), new Supplier() { + @Override + public Object get() { + return new PrimaryFound(info.getActorPath().toString()).toSerializable(); + } + }); + return; } @@ -331,7 +374,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { * runs * */ - private void createLocalShards(SchemaContext schemaContext) { + private void createLocalShards() { String memberName = this.cluster.getCurrentMemberName(); List memberShardNames = this.configuration.getMemberShardNames(memberName); @@ -340,11 +383,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { for(String shardName : memberShardNames){ ShardIdentifier shardId = getShardIdentifier(memberName, shardName); Map peerAddresses = getPeerAddresses(shardName); - ActorRef actor = getContext() - .actorOf(Shard.props(shardId, peerAddresses, datastoreContext, schemaContext), - shardId.toString()); localShardActorNames.add(shardId.toString()); - localShards.put(shardName, new ShardInformation(shardName, actor, peerAddresses)); + localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses)); } mBean = ShardManagerInfo.createShardManagerMBean("shard-manager-" + this.type, @@ -398,64 +438,98 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } - @Override public String persistenceId() { + @Override + public String persistenceId() { return "shard-manager-" + type; } - @VisibleForTesting public Collection getKnownModules() { + @VisibleForTesting + Collection getKnownModules() { return knownModules; } + @VisibleForTesting + DataPersistenceProvider getDataPersistenceProvider() { + return dataPersistenceProvider; + } + private class ShardInformation { + private final ShardIdentifier shardId; private final String shardName; - private final ActorRef actor; - private final ActorPath actorPath; + private ActorRef actor; + private ActorPath actorPath; private final Map peerAddresses; - private boolean shardInitialized = false; //flag that determines if the actor is ready for business - private ShardInformation(String shardName, ActorRef actor, - Map peerAddresses) { + // flag that determines if the actor is ready for business + private boolean actorInitialized = false; + + private final List runnablesOnInitialized = Lists.newArrayList(); + + private ShardInformation(String shardName, ShardIdentifier shardId, + Map peerAddresses) { this.shardName = shardName; - this.actor = actor; - this.actorPath = actor.path(); + this.shardId = shardId; this.peerAddresses = peerAddresses; } - public String getShardName() { + String getShardName() { return shardName; } - public ActorRef getActor(){ + ActorRef getActor(){ return actor; } - public ActorPath getActorPath() { + ActorPath getActorPath() { return actorPath; } - public void updatePeerAddress(ShardIdentifier peerId, String peerAddress){ + void setActor(ActorRef actor) { + this.actor = actor; + this.actorPath = actor.path(); + } + + ShardIdentifier getShardId() { + return shardId; + } + + Map getPeerAddresses() { + return peerAddresses; + } + + void updatePeerAddress(ShardIdentifier peerId, String peerAddress){ LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress); if(peerAddresses.containsKey(peerId)){ peerAddresses.put(peerId, peerAddress); - if(LOG.isDebugEnabled()) { - LOG.debug( - "Sending PeerAddressResolved for peer {} with address {} to {}", - peerId, peerAddress, actor.path()); - } - actor - .tell(new PeerAddressResolved(peerId, peerAddress), - getSelf()); + if(actor != null) { + if(LOG.isDebugEnabled()) { + LOG.debug("Sending PeerAddressResolved for peer {} with address {} to {}", + peerId, peerAddress, actor.path()); + } + + actor.tell(new PeerAddressResolved(peerId, peerAddress), getSelf()); + } } } - public boolean isShardInitialized() { - return shardInitialized; + boolean isShardInitialized() { + return getActor() != null && actorInitialized; + } + + void setActorInitialized() { + this.actorInitialized = true; + + for(Runnable runnable: runnablesOnInitialized) { + runnable.run(); + } + + runnablesOnInitialized.clear(); } - public void setShardInitialized(boolean shardInitialized) { - this.shardInitialized = shardInitialized; + void addRunnableOnInitialized(Runnable runnable) { + runnablesOnInitialized.add(runnable); } } @@ -482,6 +556,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } static class SchemaContextModules implements Serializable { + private static final long serialVersionUID = 1L; private final Set modules; SchemaContextModules(Set modules){