+++ /dev/null
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
-
- This program and the accompanying materials are made available under the
- terms of the Eclipse Public License v1.0 which accompanies this distribution,
- and is available at http://www.eclipse.org/legal/epl-v10.html
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.opendaylight.odlparent</groupId>
- <artifactId>features-parent</artifactId>
- <version>1.7.0-SNAPSHOT</version>
- <relativePath/>
- </parent>
- <artifactId>features-akka</artifactId>
- <groupId>org.opendaylight.controller</groupId>
- <version>1.7.0-SNAPSHOT</version>
- <packaging>pom</packaging>
- <properties>
- <features.file>features.xml</features.file>
- <branding.version>1.3.0-SNAPSHOT</branding.version>
- <karaf.resources.version>1.7.0-SNAPSHOT</karaf.resources.version>
- <feature.test.version>0.9.0-SNAPSHOT</feature.test.version>
- <karaf.empty.version>1.7.0-SNAPSHOT</karaf.empty.version>
- <surefire.version>2.16</surefire.version>
- <akka.version>2.3.14</akka.version>
- <scala.version>2.11</scala.version>
- </properties>
- <dependencyManagement>
- <dependencies>
- <dependency>
- <groupId>org.opendaylight.controller</groupId>
- <artifactId>commons.opendaylight</artifactId>
- <version>1.7.0-SNAPSHOT</version>
- <type>pom</type>
- <scope>import</scope>
- </dependency>
- </dependencies>
- </dependencyManagement>
- <dependencies>
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- </dependency>
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-reflect</artifactId>
- </dependency>
- <dependency>
- <groupId>com.typesafe</groupId>
- <artifactId>config</artifactId>
- </dependency>
- <dependency>
- <groupId>com.typesafe.akka</groupId>
- <artifactId>akka-actor_${scala.version}</artifactId>
- </dependency>
- <dependency>
- <groupId>com.typesafe.akka</groupId>
- <artifactId>akka-slf4j_${scala.version}</artifactId>
- </dependency>
- <dependency>
- <groupId>com.typesafe.akka</groupId>
- <artifactId>akka-osgi_${scala.version}</artifactId>
- </dependency>
- <dependency>
- <groupId>org.uncommons.maths</groupId>
- <artifactId>uncommons-maths</artifactId>
- <exclusions>
- <exclusion>
- <groupId>jfree</groupId>
- <artifactId>jcommon</artifactId>
- </exclusion>
- <exclusion>
- <groupId>jfree</groupId>
- <artifactId>jfreechart</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>com.google.protobuf</groupId>
- <artifactId>protobuf-java</artifactId>
- </dependency>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty</artifactId>
- <version>3.9.9.Final</version>
- </dependency>
- <dependency>
- <groupId>com.typesafe.akka</groupId>
- <artifactId>akka-remote_${scala.version}</artifactId>
- </dependency>
- <dependency>
- <groupId>com.typesafe.akka</groupId>
- <artifactId>akka-cluster_${scala.version}</artifactId>
- </dependency>
- <dependency>
- <groupId>org.iq80.leveldb</groupId>
- <artifactId>leveldb</artifactId>
- </dependency>
- <dependency>
- <groupId>org.fusesource.leveldbjni</groupId>
- <artifactId>leveldbjni-all</artifactId>
- </dependency>
- <!-- test to validate features.xml -->
- <dependency>
- <groupId>org.opendaylight.odlparent</groupId>
- <artifactId>features-test</artifactId>
- <scope>test</scope>
- </dependency>
- <!-- dependency for opendaylight-karaf-empty for use by testing -->
- <dependency>
- <groupId>org.opendaylight.controller</groupId>
- <artifactId>opendaylight-karaf-empty</artifactId>
- <version>${karaf.empty.version}</version>
- <scope>test</scope>
- <type>zip</type>
- </dependency>
- <!-- Uncomment this if you get an error : java.lang.NoSuchMethodError: org.slf4j.helpers.MessageFormatter.format(Ljava/lang/String;Ljava/lang/Object;Ljava/lang/Object;)Lorg/slf4j/helpers/FormattingTuple;
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-simple</artifactId>
- <version>1.7.2</version>
- </dependency>
- -->
-
- </dependencies>
- <scm>
- <connection>scm:git:http://git.opendaylight.org/gerrit/controller.git</connection>
- <developerConnection>scm:git:ssh://git.opendaylight.org:29418/controller.git</developerConnection>
- <tag>HEAD</tag>
- <url>https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=summary</url>
- </scm>
-</project>
+++ /dev/null
-<?xml version="1.0" encoding="UTF-8"?>
-<!-- vi: set et smarttab sw=4 tabstop=4: -->
-<!--
- Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
-
- This program and the accompanying materials are made available under the
- terms of the Eclipse Public License v1.0 which accompanies this distribution,
- and is available at http://www.eclipse.org/legal/epl-v10.html
--->
-<features name="odl-controller-${project.version}" xmlns="http://karaf.apache.org/xmlns/features/v1.2.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://karaf.apache.org/xmlns/features/v1.2.0 http://karaf.apache.org/xmlns/features/v1.2.0">
- <feature name='odl-akka-all' version='${project.version}' description='OpenDaylight :: Akka :: All'>
- <feature version="${scala.version}">odl-akka-scala</feature>
- <feature version="${akka.version}">odl-akka-system</feature>
- <feature version="${akka.version}">odl-akka-clustering</feature>
- <feature version='0.7'>odl-akka-leveldb</feature>
- <feature version="${akka.version}">odl-akka-persistence</feature>
- </feature>
- <feature name="odl-akka-scala" description="Scala Runtime for OpenDaylight" version="${scala.version}">
- <bundle>mvn:org.scala-lang/scala-library/{{VERSION}}</bundle>
- <bundle>mvn:org.scala-lang/scala-reflect/{{VERSION}}</bundle>
- </feature>
- <feature name="odl-akka-system" description="Akka Actor Framework System Bundles" version="${akka.version}">
- <feature version="${scala.version}">odl-akka-scala</feature>
- <bundle>mvn:com.typesafe/config/{{VERSION}}</bundle>
- <bundle>mvn:com.typesafe.akka/akka-actor_${scala.version}/${akka.version}</bundle>
- <bundle>mvn:com.typesafe.akka/akka-slf4j_${scala.version}/${akka.version}</bundle>
- <bundle>mvn:com.typesafe.akka/akka-osgi_${scala.version}/${akka.version}</bundle>
- </feature>
- <feature name="odl-akka-clustering" description="Akka Clustering" version="${akka.version}">
- <feature version="${akka.version}">odl-akka-system</feature>
- <bundle>wrap:mvn:org.uncommons.maths/uncommons-maths/{{VERSION}}</bundle>
- <bundle>mvn:com.google.protobuf/protobuf-java/{{VERSION}}</bundle>
- <bundle>mvn:io.netty/netty/{{VERSION}}</bundle>
- <bundle>mvn:com.typesafe.akka/akka-remote_${scala.version}/${akka.version}</bundle>
- <bundle>mvn:com.typesafe.akka/akka-cluster_${scala.version}/${akka.version}</bundle>
- </feature>
- <feature name='odl-akka-leveldb' description='LevelDB' version='0.7'>
- <bundle>wrap:mvn:org.iq80.leveldb/leveldb/{{VERSION}}</bundle>
- <bundle>mvn:org.fusesource.leveldbjni/leveldbjni-all/{{VERSION}}</bundle>
- </feature>
- <feature name='odl-akka-persistence' description='Akka Persistence' version="${akka.version}">
- <feature version='0.7'>odl-akka-leveldb</feature>
- <feature version="${akka.version}">odl-akka-system</feature>
- <bundle>mvn:com.typesafe.akka/akka-persistence-experimental_${scala.version}/${akka.version}</bundle>
- <bundle>wrap:mvn:com.google.protobuf/protobuf-java/{{VERSION}}$overwrite=merge&DynamicImport-Package=org.opendaylight.controller.protobuff.messages.*;org.opendaylight.controller.cluster.raft.protobuff.client.messages.*</bundle>
- </feature>
-</features>
<controller.mdsal.version>1.4.0-SNAPSHOT</controller.mdsal.version>
<config.version>0.5.0-SNAPSHOT</config.version>
<commons.opendaylight.version>1.7.0-SNAPSHOT</commons.opendaylight.version>
- <akka.version>2.3.14</akka.version>
+ <akka.features.version>1.7.0-SNAPSHOT</akka.features.version>
+ <akka.version>2.4.1</akka.version>
<features.file>features.xml</features.file>
<config.configfile.directory>etc/opendaylight/karaf</config.configfile.directory>
<config.clustering.configfile>05-clustering.xml</config.clustering.configfile>
<type>xml</type>
</dependency>
<dependency>
- <groupId>org.opendaylight.controller</groupId>
+ <groupId>org.opendaylight.odlparent</groupId>
<artifactId>features-akka</artifactId>
- <version>${commons.opendaylight.version}</version>
+ <version>${akka.features.version}</version>
<classifier>features</classifier>
<type>xml</type>
</dependency>
<groupId>org.opendaylight.controller</groupId>
<artifactId>sal-distributed-datastore</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-dom-broker-config</artifactId>
+ </dependency>
<!-- message-bus -->
<dependency>
<repository>mvn:org.opendaylight.controller/features-config/{{VERSION}}/xml/features</repository>
<repository>mvn:org.opendaylight.controller/features-config-persister/{{VERSION}}/xml/features</repository>
<repository>mvn:org.opendaylight.controller/features-config-netty/{{VERSION}}/xml/features</repository>
- <repository>mvn:org.opendaylight.controller/features-akka/{{VERSION}}/xml/features</repository>
+ <repository>mvn:org.opendaylight.odlparent/features-akka/{{VERSION}}/xml/features</repository>
<feature name='odl-mdsal-all' version='${project.version}' description="OpenDaylight :: MDSAL :: All">
<feature version='${project.version}'>odl-mdsal-broker</feature>
<feature version='${project.version}'>odl-mdsal-broker-local</feature>
<module>config-netty</module>
<module>mdsal</module>
<module>protocol-framework</module>
- <module>akka</module>
<module>extras</module>
<module>benchmark</module>
</modules>
<properties>
<mdsal.version>2.1.0-SNAPSHOT</mdsal.version>
<mdsal.model.version>0.9.0-SNAPSHOT</mdsal.model.version>
- <akka.version>2.3.14</akka.version>
<appauth.version>0.7.0-SNAPSHOT</appauth.version>
<archetype-app-northbound>0.3.0-SNAPSHOT</archetype-app-northbound>
<arphandler.version>0.8.0-SNAPSHOT</arphandler.version>
<jolokia-bridge.version>0.3.0-SNAPSHOT</jolokia-bridge.version>
<karaf.branding.version>1.3.0-SNAPSHOT</karaf.branding.version>
<karaf.shell.version>${karaf.version}</karaf.shell.version>
- <leveldb.version>0.7</leveldb.version>
- <leveldbjni.version>1.8-odl</leveldbjni.version>
<lifecycle.mapping.version>1.0.0</lifecycle.mapping.version>
<logging.bridge.version>0.7.0-SNAPSHOT</logging.bridge.version>
<maven.plugin.api.version>3.0.5</maven.plugin.api.version>
<northbound.jolokia.version>1.7.0-SNAPSHOT</northbound.jolokia.version>
<opendaylight-l2-types.version>2013.08.27.9-SNAPSHOT</opendaylight-l2-types.version>
<osgi-brandfragment.web.version>0.3.0-SNAPSHOT</osgi-brandfragment.web.version>
- <protobuf.version>2.5.0</protobuf.version>
<protocol-framework.version>0.8.0-SNAPSHOT</protocol-framework.version>
<protocol_plugins.openflow.version>0.7.0-SNAPSHOT</protocol_plugins.openflow.version>
<protocol_plugins.stub.version>0.7.0-SNAPSHOT</protocol_plugins.stub.version>
<samples.loadbalancer.northbound.version>0.7.0-SNAPSHOT</samples.loadbalancer.northbound.version>
<samples.simpleforwarding.version>0.7.0-SNAPSHOT</samples.simpleforwarding.version>
<sanitytest.version>0.7.0-SNAPSHOT</sanitytest.version>
- <scala.version>2.11</scala.version>
- <scala.micro.version>7</scala.micro.version>
<security.version>0.7.0-SNAPSHOT</security.version>
<karaf.security.version>0.7.0-SNAPSHOT</karaf.security.version>
<sitedeploy>dav:http://nexus.opendaylight.org/content/sites/site</sitedeploy>
<topologymanager.version>0.7.0-SNAPSHOT</topologymanager.version>
<topologymanager.shell.version>1.3.0-SNAPSHOT</topologymanager.shell.version>
<troubleshoot.web.version>0.7.0-SNAPSHOT</troubleshoot.web.version>
- <uncommons.maths.version>1.2.2a</uncommons.maths.version>
<usermanager.implementation.version>0.7.0-SNAPSHOT</usermanager.implementation.version>
<usermanager.northbound.version>0.3.0-SNAPSHOT</usermanager.northbound.version>
<usermanager.version>0.7.0-SNAPSHOT</usermanager.version>
<scope>provided</scope>
</dependency>
- <dependency>
- <groupId>com.typesafe.akka</groupId>
- <artifactId>akka-actor_${scala.version}</artifactId>
- <version>${akka.version}</version>
- </dependency>
-
- <dependency>
- <groupId>com.typesafe.akka</groupId>
- <artifactId>akka-cluster_${scala.version}</artifactId>
- <version>${akka.version}</version>
- </dependency>
-
- <dependency>
- <groupId>com.typesafe.akka</groupId>
- <artifactId>akka-persistence-experimental_${scala.version}</artifactId>
- <version>${akka.version}</version>
- </dependency>
-
- <dependency>
- <groupId>com.typesafe.akka</groupId>
- <artifactId>akka-remote_${scala.version}</artifactId>
- <version>${akka.version}</version>
- </dependency>
- <dependency>
- <groupId>com.typesafe.akka</groupId>
- <artifactId>akka-testkit_${scala.version}</artifactId>
- <version>${akka.version}</version>
- </dependency>
- <dependency>
- <groupId>com.typesafe.akka</groupId>
- <artifactId>akka-osgi_${scala.version}</artifactId>
- <version>${akka.version}</version>
- </dependency>
- <dependency>
- <groupId>com.typesafe.akka</groupId>
- <artifactId>akka-slf4j_${scala.version}</artifactId>
- <version>${akka.version}</version>
- </dependency>
<dependency>
<groupId>net.sourceforge.pmd</groupId>
<artifactId>pmd</artifactId>
<artifactId>gmaven-runtime-2.0</artifactId>
<version>1.5</version>
</dependency>
- <dependency>
- <groupId>org.uncommons.maths</groupId>
- <artifactId>uncommons-maths</artifactId>
- <version>${uncommons.maths.version}</version>
- </dependency>
- <dependency>
- <groupId>com.google.protobuf</groupId>
- <artifactId>protobuf-java</artifactId>
- <version>${protobuf.version}</version>
- </dependency>
- <dependency>
- <groupId>org.iq80.leveldb</groupId>
- <artifactId>leveldb</artifactId>
- <version>${leveldb.version}</version>
- </dependency>
- <dependency>
- <groupId>org.fusesource.leveldbjni</groupId>
- <artifactId>leveldbjni-all</artifactId>
- <version>${leveldbjni.version}</version>
- </dependency>
<!-- md-sal -->
<dependency>
<artifactId>reflections</artifactId>
<version>0.9.9-RC1</version>
</dependency>
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- <version>${scala.version}.${scala.micro.version}</version>
- </dependency>
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-reflect</artifactId>
- <version>${scala.version}.${scala.micro.version}</version>
- </dependency>
<dependency>
<groupId>org.opendaylight.controller</groupId>
<artifactId>commons.logback_settings</artifactId>
<dependency>
<groupId>com.typesafe.akka</groupId>
- <artifactId>akka-persistence-experimental_${scala.version}</artifactId>
+ <artifactId>akka-persistence_${scala.version}</artifactId>
</dependency>
<dependency>
// Delete all the akka snapshots as they will not be needed
persistentProvider.deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(),
- scala.Long.MaxValue()));
+ scala.Long.MaxValue(), 0L, 0L));
// Since we cleaned out the journal, we need to re-write the current election info.
context.getTermInformation().updateAndPersist(context.getTermInformation().getCurrentTerm(),
}
context.getPersistenceProvider().deleteSnapshots(new SnapshotSelectionCriteria(
- sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), Long.MAX_VALUE));
+ sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), Long.MAX_VALUE, 0L, 0L));
context.getPersistenceProvider().deleteMessages(lastSequenceNumber);
doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshotReply), any(ActorRef.class));
mockRaftActor.handleCommand(captureSnapshotReply);
- SaveSnapshotSuccess saveSnapshotSuccess = new SaveSnapshotSuccess(mock(SnapshotMetadata.class));
+ SaveSnapshotSuccess saveSnapshotSuccess = new SaveSnapshotSuccess(new SnapshotMetadata("", 0L, 0L));
doReturn(true).when(mockSupport).handleSnapshotMessage(same(saveSnapshotSuccess), any(ActorRef.class));
mockRaftActor.handleCommand(saveSnapshotSuccess);
- SaveSnapshotFailure saveSnapshotFailure = new SaveSnapshotFailure(mock(SnapshotMetadata.class), new Throwable());
+ SaveSnapshotFailure saveSnapshotFailure = new SaveSnapshotFailure(new SnapshotMetadata("", 0L, 0L), new Throwable());
doReturn(true).when(mockSupport).handleSnapshotMessage(same(saveSnapshotFailure), any(ActorRef.class));
mockRaftActor.handleCommand(saveSnapshotFailure);
package org.opendaylight.controller.cluster.raft.utils;
import akka.dispatch.Futures;
-import akka.japi.Procedure;
-import akka.persistence.PersistentConfirmation;
-import akka.persistence.PersistentId;
+import akka.persistence.AtomicWrite;
import akka.persistence.PersistentImpl;
import akka.persistence.PersistentRepr;
import akka.persistence.journal.japi.AsyncWriteJournal;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
import org.apache.commons.lang.SerializationUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Override
public Future<Void> doAsyncReplayMessages(final String persistenceId, final long fromSequenceNr,
- final long toSequenceNr, final long max, final Procedure<PersistentRepr> replayCallback) {
+ final long toSequenceNr, final long max, final Consumer<PersistentRepr> replayCallback) {
+ LOG.trace("doAsyncReplayMessages for {}: fromSequenceNr: {}, toSequenceNr: {}", persistenceId,
+ fromSequenceNr,toSequenceNr);
return Futures.future(new Callable<Void>() {
@Override
public Void call() throws Exception {
if (++count <= max && entry.getKey() >= fromSequenceNr && entry.getKey() <= toSequenceNr) {
PersistentRepr persistentMessage =
new PersistentImpl(deserialize(entry.getValue()), entry.getKey(), persistenceId,
- false, null, null);
- replayCallback.apply(persistentMessage);
+ null, false, null, null);
+ replayCallback.accept(persistentMessage);
}
}
}
@Override
public Future<Long> doAsyncReadHighestSequenceNr(String persistenceId, long fromSequenceNr) {
+ LOG.trace("doAsyncReadHighestSequenceNr for {}: fromSequenceNr: {}", persistenceId, fromSequenceNr);
+
// Akka calls this during recovery.
Map<Long, Object> journal = journals.get(persistenceId);
if(journal == null) {
}
@Override
- public Future<Void> doAsyncWriteMessages(final Iterable<PersistentRepr> messages) {
- return Futures.future(new Callable<Void>() {
+ public Future<Iterable<Optional<Exception>>> doAsyncWriteMessages(final Iterable<AtomicWrite> messages) {
+ return Futures.future(new Callable<Iterable<Optional<Exception>>>() {
@Override
- public Void call() throws Exception {
- for (PersistentRepr repr : messages) {
- LOG.trace("doAsyncWriteMessages: id: {}: seqNr: {}, payload: {}", repr.persistenceId(),
+ public Iterable<Optional<Exception>> call() throws Exception {
+ for (AtomicWrite write : messages) {
+ // Copy to array - workaround for eclipse "ambiguous method" errors for toIterator, toIterable etc
+ PersistentRepr[] array = new PersistentRepr[write.payload().size()];
+ write.payload().copyToArray(array);
+ for(PersistentRepr repr: array) {
+ LOG.trace("doAsyncWriteMessages: id: {}: seqNr: {}, payload: {}", repr.persistenceId(),
repr.sequenceNr(), repr.payload());
- addEntry(repr.persistenceId(), repr.sequenceNr(), repr.payload());
+ addEntry(repr.persistenceId(), repr.sequenceNr(), repr.payload());
- WriteMessagesComplete complete = writeMessagesComplete.get(repr.persistenceId());
- if(complete != null) {
- if(complete.ofType == null || complete.ofType.equals(repr.payload().getClass())) {
- complete.latch.countDown();
+ WriteMessagesComplete complete = writeMessagesComplete.get(repr.persistenceId());
+ if(complete != null) {
+ if(complete.ofType == null || complete.ofType.equals(repr.payload().getClass())) {
+ complete.latch.countDown();
+ }
}
}
}
- return null;
+ return Collections.emptyList();
}
}, context().dispatcher());
}
@Override
- public Future<Void> doAsyncWriteConfirmations(Iterable<PersistentConfirmation> confirmations) {
- return Futures.successful(null);
- }
-
- @Override
- public Future<Void> doAsyncDeleteMessages(Iterable<PersistentId> messageIds, boolean permanent) {
- return Futures.successful(null);
- }
-
- @Override
- public Future<Void> doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr, boolean permanent) {
+ public Future<Void> doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr) {
LOG.trace("doAsyncDeleteMessagesTo: {}", toSequenceNr);
Map<Long, Object> journal = journals.get(persistenceId);
if(journal != null) {
package org.opendaylight.controller.cluster.raft.utils;
import akka.dispatch.Futures;
-import akka.japi.Option;
import akka.persistence.SelectedSnapshot;
import akka.persistence.SnapshotMetadata;
import akka.persistence.SnapshotSelectionCriteria;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
}
@Override
- public Future<Option<SelectedSnapshot>> doLoadAsync(String persistenceId,
+ public Future<Optional<SelectedSnapshot>> doLoadAsync(String persistenceId,
SnapshotSelectionCriteria snapshotSelectionCriteria) {
List<StoredSnapshot> snapshotList = snapshots.get(persistenceId);
if(snapshotList == null){
- return Futures.successful(Option.<SelectedSnapshot>none());
+ return Futures.successful(Optional.<SelectedSnapshot>empty());
}
synchronized(snapshotList) {
for(int i = snapshotList.size() - 1; i >= 0; i--) {
StoredSnapshot snapshot = snapshotList.get(i);
if(matches(snapshot, snapshotSelectionCriteria)) {
- return Futures.successful(Option.some(new SelectedSnapshot(snapshot.metadata,
+ return Futures.successful(Optional.of(new SelectedSnapshot(snapshot.metadata,
snapshot.data)));
}
}
}
- return Futures.successful(Option.<SelectedSnapshot>none());
+ return Futures.successful(Optional.<SelectedSnapshot>empty());
}
private static boolean matches(StoredSnapshot snapshot, SnapshotSelectionCriteria criteria) {
}
@Override
- public void onSaved(SnapshotMetadata snapshotMetadata) throws Exception {
- }
+ public Future<Void> doDeleteAsync(SnapshotMetadata metadata) {
+ List<StoredSnapshot> snapshotList = snapshots.get(metadata.persistenceId());
- @Override
- public void doDelete(SnapshotMetadata snapshotMetadata) throws Exception {
- List<StoredSnapshot> snapshotList = snapshots.get(snapshotMetadata.persistenceId());
-
- if(snapshotList == null){
- return;
- }
-
- synchronized (snapshotList) {
- for(int i=0;i<snapshotList.size(); i++){
- StoredSnapshot snapshot = snapshotList.get(i);
- if(snapshotMetadata.equals(snapshot.metadata)){
- snapshotList.remove(i);
- break;
+ if (snapshotList != null) {
+ synchronized (snapshotList) {
+ for(int i=0;i<snapshotList.size(); i++){
+ StoredSnapshot snapshot = snapshotList.get(i);
+ if(metadata.equals(snapshot.metadata)){
+ snapshotList.remove(i);
+ break;
+ }
}
}
}
+
+ return Futures.successful(null);
}
@Override
- public void doDelete(String persistentId, SnapshotSelectionCriteria snapshotSelectionCriteria)
- throws Exception {
- LOG.trace("doDelete: persistentId {}: maxSequenceNr: {}: maxTimestamp {}", persistentId,
- snapshotSelectionCriteria.maxSequenceNr(), snapshotSelectionCriteria.maxTimestamp());
+ public Future<Void> doDeleteAsync(String persistenceId, SnapshotSelectionCriteria criteria) {
+ LOG.trace("doDelete: persistentId {}: maxSequenceNr: {}: maxTimestamp {}", persistenceId,
+ criteria.maxSequenceNr(), criteria.maxTimestamp());
- List<StoredSnapshot> snapshotList = snapshots.get(persistentId);
+ List<StoredSnapshot> snapshotList = snapshots.get(persistenceId);
if(snapshotList != null){
synchronized (snapshotList) {
Iterator<StoredSnapshot> iter = snapshotList.iterator();
while(iter.hasNext()) {
StoredSnapshot s = iter.next();
- if(matches(s, snapshotSelectionCriteria)) {
+ if(matches(s, criteria)) {
LOG.trace("Deleting snapshot for sequenceNr: {}, timestamp: {}: {}",
s.metadata.sequenceNr(), s.metadata.timestamp(), s.data);
}
}
- CountDownLatch latch = snapshotDeletedLatches.get(persistentId);
+ CountDownLatch latch = snapshotDeletedLatches.get(persistenceId);
if(latch != null) {
latch.countDown();
}
+
+ return Futures.successful(null);
}
private static class StoredSnapshot {
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
- <artifactId>akka-persistence-experimental_${scala.version}</artifactId>
+ <artifactId>akka-persistence_${scala.version}</artifactId>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
logger-startup-timeout = 300s
actor {
+ warn-about-java-serializer-usage = off
provider = "akka.cluster.ClusterActorRefProvider"
serializers {
java = "akka.serialization.JavaSerializer"
]
}
+
+ persistence {
+ journal.plugin = akka.persistence.journal.leveldb
+ snapshot-store.plugin = akka.persistence.snapshot-store.local
+ }
}
}
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
- <artifactId>akka-persistence-experimental_${scala.version}</artifactId>
+ <artifactId>akka-persistence_${scala.version}</artifactId>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
import akka.actor.ActorSelection;
import akka.actor.Cancellable;
import akka.actor.Props;
-import akka.persistence.RecoveryFailure;
import akka.serialization.Serialization;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
@Override
public void onReceiveRecover(final Object message) throws Exception {
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: onReceiveRecover: Received message {} from {}", persistenceId(),
- message.getClass().toString(), getSender());
- }
-
- if (message instanceof RecoveryFailure){
- LOG.error("{}: Recovery failed because of this cause",
- persistenceId(), ((RecoveryFailure) message).cause());
+ LOG.debug("{}: onReceiveRecover: Received message {} from {}", persistenceId(), message.getClass(),
+ getSender());
- // Even though recovery failed, we still need to finish our recovery, eg send the
- // ActorInitialized message and start the txCommitTimeoutCheckSchedule.
- onRecoveryComplete();
- } else {
- super.onReceiveRecover(message);
- if(LOG.isTraceEnabled()) {
- appendEntriesReplyTracker.begin();
- }
+ super.onReceiveRecover(message);
+ if (LOG.isTraceEnabled()) {
+ appendEntriesReplyTracker.begin();
}
}
private void onSaveSnapshotSuccess (SaveSnapshotSuccess successMessage) {
LOG.debug ("{} saved ShardManager snapshot successfully. Deleting the prev snapshot if available",
persistenceId());
- deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), (successMessage.metadata().timestamp() - 1)));
+ deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), successMessage.metadata().timestamp() - 1,
+ 0, 0));
}
private static class ForwardedAddServerReply {
CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
+ InMemoryJournal.addEntry(persistentID, 1, "Dummy data so akka will read from persistence");
+
DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
// Create the write Tx
CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
+ InMemoryJournal.addEntry(persistentID, 1, "Dummy data so akka will read from persistence");
+
DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
// Create the read-write Tx
<dependency>
<groupId>com.typesafe.akka</groupId>
- <artifactId>akka-persistence-experimental_${scala.version}</artifactId>
+ <artifactId>akka-persistence_${scala.version}</artifactId>
</dependency>
<dependency>
<artifactId>akka-osgi_${scala.version}</artifactId>
</dependency>
- <dependency>
- <groupId>com.typesafe.akka</groupId>
- <artifactId>akka-slf4j_${scala.version}</artifactId>
- </dependency>
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-slf4j_${scala.version}</artifactId>
+ </dependency>
- <dependency>
- <groupId>com.typesafe.akka</groupId>
- <artifactId>akka-persistence-experimental_${scala.version}</artifactId>
- </dependency>
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-persistence_${scala.version}</artifactId>
+ </dependency>
<!-- SAL Dependencies -->
<dependency>