X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShard.java;h=6a6a181b6c03ac744d02a3e8e815011d2cf99c3f;hp=abcde747b93b132f8c492f25ac8af43f96c84a26;hb=0875a6258aa459ccf558529bdf1524c7c8986c54;hpb=9cd4e7995210f8381892004373acc71c8b3ae7af diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index abcde747b9..6a6a181b6c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -14,6 +14,7 @@ import akka.actor.Props; import akka.event.Logging; import akka.event.LoggingAdapter; import akka.japi.Creator; +import akka.persistence.RecoveryFailure; import akka.serialization.Serialization; import com.google.common.base.Optional; @@ -21,7 +22,7 @@ import com.google.common.base.Preconditions; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; - +import com.google.protobuf.ByteString; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory; @@ -96,16 +97,17 @@ public class Shard extends RaftActor { private final List dataChangeListeners = new ArrayList<>(); - private final ShardContext shardContext; + private final DatastoreContext datastoreContext; + private SchemaContext schemaContext; private Shard(ShardIdentifier name, Map peerAddresses, - ShardContext shardContext) { + DatastoreContext datastoreContext) { super(name.toString(), mapPeerAddresses(peerAddresses), Optional.of(configParams)); this.name = name; - this.shardContext = shardContext; + this.datastoreContext = datastoreContext; String setting = System.getProperty("shard.persistent"); @@ -114,10 +116,11 @@ public class Shard extends RaftActor { LOG.info("Shard created : {} persistent : {}", name, persistent); store = InMemoryDOMDataStoreFactory.create(name.toString(), null, - shardContext.getDataStoreProperties()); + datastoreContext.getDataStoreProperties()); shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString()); + } private static Map mapPeerAddresses( @@ -134,16 +137,27 @@ public class Shard extends RaftActor { public static Props props(final ShardIdentifier name, final Map peerAddresses, - ShardContext shardContext) { + DatastoreContext datastoreContext) { Preconditions.checkNotNull(name, "name should not be null"); Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null"); - Preconditions.checkNotNull(shardContext, "shardContext should not be null"); + Preconditions.checkNotNull(datastoreContext, "shardContext should not be null"); - return Props.create(new ShardCreator(name, peerAddresses, shardContext)); + return Props.create(new ShardCreator(name, peerAddresses, datastoreContext)); + } + + @Override public void onReceiveRecover(Object message) { + LOG.debug("onReceiveRecover: Received message {} from {}", message.getClass().toString(), + getSender()); + + if (message instanceof RecoveryFailure){ + LOG.error(((RecoveryFailure) message).cause(), "Recovery failed because of this cause"); + } else { + super.onReceiveRecover(message); + } } @Override public void onReceiveCommand(Object message) { - LOG.debug("Received message {} from {}", message.getClass().toString(), + LOG.debug("onReceiveCommand: Received message {} from {}", message.getClass().toString(), getSender()); if (message.getClass() @@ -185,7 +199,7 @@ public class Shard extends RaftActor { return getContext().actorOf( ShardTransaction.props(store.newReadOnlyTransaction(), getSelf(), - schemaContext, shardContext), transactionId.toString()); + schemaContext,datastoreContext, name.toString()), transactionId.toString()); } else if (createTransaction.getTransactionType() == TransactionProxy.TransactionType.READ_WRITE.ordinal()) { @@ -194,7 +208,7 @@ public class Shard extends RaftActor { return getContext().actorOf( ShardTransaction.props(store.newReadWriteTransaction(), getSelf(), - schemaContext, shardContext), transactionId.toString()); + schemaContext, datastoreContext,name.toString()), transactionId.toString()); } else if (createTransaction.getTransactionType() @@ -204,7 +218,7 @@ public class Shard extends RaftActor { return getContext().actorOf( ShardTransaction.props(store.newWriteOnlyTransaction(), getSelf(), - schemaContext, shardContext), transactionId.toString()); + schemaContext, datastoreContext, name.toString()), transactionId.toString()); } else { throw new IllegalArgumentException( "Shard="+name + ":CreateTransaction message has unidentified transaction type=" @@ -226,7 +240,8 @@ public class Shard extends RaftActor { .tell(new CreateTransactionReply( Serialization.serializedActorPath(transactionActor), createTransaction.getTransactionId()).toSerializable(), - getSelf()); + getSelf() + ); } private void commit(final ActorRef sender, Object serialized) { @@ -264,9 +279,9 @@ public class Shard extends RaftActor { Futures.addCallback(future, new FutureCallback() { @Override public void onSuccess(Void v) { - sender.tell(new CommitTransactionReply().toSerializable(),self); - shardMBean.incrementCommittedTransactionCount(); - shardMBean.setLastCommittedTransactionTime(new Date()); + sender.tell(new CommitTransactionReply().toSerializable(), self); + shardMBean.incrementCommittedTransactionCount(); + shardMBean.setLastCommittedTransactionTime(new Date()); } @Override @@ -343,7 +358,7 @@ public class Shard extends RaftActor { private void createTransactionChain() { DOMStoreTransactionChain chain = store.createTransactionChain(); ActorRef transactionChain = getContext().actorOf( - ShardTransactionChain.props(chain, schemaContext, shardContext)); + ShardTransactionChain.props(chain, schemaContext, datastoreContext,name.toString() )); getSender().tell(new CreateTransactionChainReply(transactionChain.path()).toSerializable(), getSelf()); } @@ -363,7 +378,6 @@ public class Shard extends RaftActor { identifier, clientActor.path().toString()); } - } else { LOG.error("Unknown state received {}", data); } @@ -381,11 +395,11 @@ public class Shard extends RaftActor { } - @Override protected Object createSnapshot() { + @Override protected void createSnapshot() { throw new UnsupportedOperationException("createSnapshot"); } - @Override protected void applySnapshot(Object snapshot) { + @Override protected void applySnapshot(ByteString snapshot) { throw new UnsupportedOperationException("applySnapshot"); } @@ -423,18 +437,18 @@ public class Shard extends RaftActor { final ShardIdentifier name; final Map peerAddresses; - final ShardContext shardContext; + final DatastoreContext datastoreContext; ShardCreator(ShardIdentifier name, Map peerAddresses, - ShardContext shardContext) { + DatastoreContext datastoreContext) { this.name = name; this.peerAddresses = peerAddresses; - this.shardContext = shardContext; + this.datastoreContext = datastoreContext; } @Override public Shard create() throws Exception { - return new Shard(name, peerAddresses, shardContext); + return new Shard(name, peerAddresses, datastoreContext); } } }