import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.japi.Creator;
+import akka.persistence.RecoveryFailure;
import akka.serialization.Serialization;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-
+import com.google.protobuf.ByteString;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
private final List<ActorSelection> dataChangeListeners = new ArrayList<>();
- private final ShardContext shardContext;
+ private final DatastoreContext datastoreContext;
+
private SchemaContext schemaContext;
private Shard(ShardIdentifier name, Map<ShardIdentifier, String> peerAddresses,
- ShardContext shardContext) {
+ DatastoreContext datastoreContext) {
super(name.toString(), mapPeerAddresses(peerAddresses), Optional.of(configParams));
this.name = name;
- this.shardContext = shardContext;
+ this.datastoreContext = datastoreContext;
String setting = System.getProperty("shard.persistent");
LOG.info("Shard created : {} persistent : {}", name, persistent);
store = InMemoryDOMDataStoreFactory.create(name.toString(), null,
- shardContext.getDataStoreProperties());
+ datastoreContext.getDataStoreProperties());
shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString());
+
}
private static Map<String, String> mapPeerAddresses(
public static Props props(final ShardIdentifier name,
final Map<ShardIdentifier, String> peerAddresses,
- ShardContext shardContext) {
+ DatastoreContext datastoreContext) {
Preconditions.checkNotNull(name, "name should not be null");
Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");
- Preconditions.checkNotNull(shardContext, "shardContext should not be null");
+ Preconditions.checkNotNull(datastoreContext, "shardContext should not be null");
- return Props.create(new ShardCreator(name, peerAddresses, shardContext));
+ return Props.create(new ShardCreator(name, peerAddresses, datastoreContext));
+ }
+
+ @Override public void onReceiveRecover(Object message) {
+ LOG.debug("onReceiveRecover: Received message {} from {}", message.getClass().toString(),
+ getSender());
+
+ if (message instanceof RecoveryFailure){
+ LOG.error(((RecoveryFailure) message).cause(), "Recovery failed because of this cause");
+ } else {
+ super.onReceiveRecover(message);
+ }
}
@Override public void onReceiveCommand(Object message) {
- LOG.debug("Received message {} from {}", message.getClass().toString(),
+ LOG.debug("onReceiveCommand: Received message {} from {}", message.getClass().toString(),
getSender());
if (message.getClass()
return getContext().actorOf(
ShardTransaction.props(store.newReadOnlyTransaction(), getSelf(),
- schemaContext, shardContext), transactionId.toString());
+ schemaContext,datastoreContext, name.toString()), transactionId.toString());
} else if (createTransaction.getTransactionType()
== TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
return getContext().actorOf(
ShardTransaction.props(store.newReadWriteTransaction(), getSelf(),
- schemaContext, shardContext), transactionId.toString());
+ schemaContext, datastoreContext,name.toString()), transactionId.toString());
} else if (createTransaction.getTransactionType()
return getContext().actorOf(
ShardTransaction.props(store.newWriteOnlyTransaction(), getSelf(),
- schemaContext, shardContext), transactionId.toString());
+ schemaContext, datastoreContext, name.toString()), transactionId.toString());
} else {
throw new IllegalArgumentException(
"Shard="+name + ":CreateTransaction message has unidentified transaction type="
.tell(new CreateTransactionReply(
Serialization.serializedActorPath(transactionActor),
createTransaction.getTransactionId()).toSerializable(),
- getSelf());
+ getSelf()
+ );
}
private void commit(final ActorRef sender, Object serialized) {
Futures.addCallback(future, new FutureCallback<Void>() {
@Override
public void onSuccess(Void v) {
- sender.tell(new CommitTransactionReply().toSerializable(),self);
- shardMBean.incrementCommittedTransactionCount();
- shardMBean.setLastCommittedTransactionTime(new Date());
+ sender.tell(new CommitTransactionReply().toSerializable(), self);
+ shardMBean.incrementCommittedTransactionCount();
+ shardMBean.setLastCommittedTransactionTime(new Date());
}
@Override
private void createTransactionChain() {
DOMStoreTransactionChain chain = store.createTransactionChain();
ActorRef transactionChain = getContext().actorOf(
- ShardTransactionChain.props(chain, schemaContext, shardContext));
+ ShardTransactionChain.props(chain, schemaContext, datastoreContext,name.toString() ));
getSender().tell(new CreateTransactionChainReply(transactionChain.path()).toSerializable(),
getSelf());
}
identifier, clientActor.path().toString());
}
-
} else {
LOG.error("Unknown state received {}", data);
}
}
- @Override protected Object createSnapshot() {
+ @Override protected void createSnapshot() {
throw new UnsupportedOperationException("createSnapshot");
}
- @Override protected void applySnapshot(Object snapshot) {
+ @Override protected void applySnapshot(ByteString snapshot) {
throw new UnsupportedOperationException("applySnapshot");
}
final ShardIdentifier name;
final Map<ShardIdentifier, String> peerAddresses;
- final ShardContext shardContext;
+ final DatastoreContext datastoreContext;
ShardCreator(ShardIdentifier name, Map<ShardIdentifier, String> peerAddresses,
- ShardContext shardContext) {
+ DatastoreContext datastoreContext) {
this.name = name;
this.peerAddresses = peerAddresses;
- this.shardContext = shardContext;
+ this.datastoreContext = datastoreContext;
}
@Override
public Shard create() throws Exception {
- return new Shard(name, peerAddresses, shardContext);
+ return new Shard(name, peerAddresses, datastoreContext);
}
}
}