CDS: Fix deleteSnapshots criteria in SnapshotManager
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / utils / InMemorySnapshotStore.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.utils;
10
11 import akka.dispatch.Futures;
12 import akka.japi.Option;
13 import akka.persistence.SelectedSnapshot;
14 import akka.persistence.SnapshotMetadata;
15 import akka.persistence.SnapshotSelectionCriteria;
16 import akka.persistence.snapshot.japi.SnapshotStore;
17 import com.google.common.collect.Lists;
18 import com.google.common.util.concurrent.Uninterruptibles;
19 import java.util.ArrayList;
20 import java.util.Collections;
21 import java.util.Iterator;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.concurrent.ConcurrentHashMap;
25 import java.util.concurrent.CountDownLatch;
26 import java.util.concurrent.TimeUnit;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29 import scala.concurrent.Future;
30
31 /**
32  * An akka SnapshotStore implementation that stores data in memory. This is intended for testing.
33  *
34  * @author Thomas Pantelis
35  */
36 public class InMemorySnapshotStore extends SnapshotStore {
37
38     static final Logger LOG = LoggerFactory.getLogger(InMemorySnapshotStore.class);
39
40     private static Map<String, List<StoredSnapshot>> snapshots = new ConcurrentHashMap<>();
41     private static final Map<String, CountDownLatch> snapshotSavedLatches = new ConcurrentHashMap<>();
42
43     public static void addSnapshot(String persistentId, Object snapshot) {
44         List<StoredSnapshot> snapshotList = snapshots.get(persistentId);
45
46         if(snapshotList == null) {
47             snapshotList = new ArrayList<>();
48             snapshots.put(persistentId, snapshotList);
49         }
50
51         synchronized (snapshotList) {
52             snapshotList.add(new StoredSnapshot(new SnapshotMetadata(persistentId, snapshotList.size(),
53                     System.currentTimeMillis()), snapshot));
54         }
55     }
56
57     @SuppressWarnings("unchecked")
58     public static <T> List<T> getSnapshots(String persistentId, Class<T> type) {
59         List<StoredSnapshot> stored = snapshots.get(persistentId);
60         if(stored == null) {
61             return Collections.emptyList();
62         }
63
64         List<T> retList;
65         synchronized (stored) {
66             retList = Lists.newArrayListWithCapacity(stored.size());
67             for(StoredSnapshot s: stored) {
68                 if(type.isInstance(s.data)) {
69                     retList.add((T) s.data);
70                 }
71             }
72         }
73
74         return retList;
75     }
76
77     public static void clear() {
78         snapshots.clear();
79     }
80
81     public static void addSnapshotSavedLatch(String persistenceId) {
82         snapshotSavedLatches.put(persistenceId, new CountDownLatch(1));
83     }
84
85     public static <T> T waitForSavedSnapshot(String persistenceId, Class<T> type) {
86         if(!Uninterruptibles.awaitUninterruptibly(snapshotSavedLatches.get(persistenceId), 5, TimeUnit.SECONDS)) {
87             throw new AssertionError("Snapshot was not saved");
88         }
89
90         return getSnapshots(persistenceId, type).get(0);
91     }
92
93     @Override
94     public Future<Option<SelectedSnapshot>> doLoadAsync(String persistenceId,
95             SnapshotSelectionCriteria snapshotSelectionCriteria) {
96         List<StoredSnapshot> snapshotList = snapshots.get(persistenceId);
97         if(snapshotList == null){
98             return Futures.successful(Option.<SelectedSnapshot>none());
99         }
100
101         synchronized(snapshotList) {
102             for(int i = snapshotList.size() - 1; i >= 0; i--) {
103                 StoredSnapshot snapshot = snapshotList.get(i);
104                 if(matches(snapshot, snapshotSelectionCriteria)) {
105                     return Futures.successful(Option.some(new SelectedSnapshot(snapshot.metadata,
106                             snapshot.data)));
107                 }
108             }
109         }
110
111         return Futures.successful(Option.<SelectedSnapshot>none());
112     }
113
114     private boolean matches(StoredSnapshot snapshot, SnapshotSelectionCriteria criteria) {
115         return snapshot.metadata.sequenceNr() <= criteria.maxSequenceNr() &&
116                 snapshot.metadata.timestamp() <= criteria.maxTimestamp();
117     }
118
119     @Override
120     public Future<Void> doSaveAsync(SnapshotMetadata snapshotMetadata, Object o) {
121         List<StoredSnapshot> snapshotList = snapshots.get(snapshotMetadata.persistenceId());
122
123         if(snapshotList == null){
124             snapshotList = new ArrayList<>();
125             snapshots.put(snapshotMetadata.persistenceId(), snapshotList);
126         }
127         synchronized (snapshotList) {
128             snapshotList.add(new StoredSnapshot(snapshotMetadata, o));
129         }
130
131         CountDownLatch latch = snapshotSavedLatches.get(snapshotMetadata.persistenceId());
132         if(latch != null) {
133             latch.countDown();
134         }
135
136         return Futures.successful(null);
137     }
138
139     @Override
140     public void onSaved(SnapshotMetadata snapshotMetadata) throws Exception {
141     }
142
143     @Override
144     public void doDelete(SnapshotMetadata snapshotMetadata) throws Exception {
145         List<StoredSnapshot> snapshotList = snapshots.get(snapshotMetadata.persistenceId());
146
147         if(snapshotList == null){
148             return;
149         }
150
151         synchronized (snapshotList) {
152             for(int i=0;i<snapshotList.size(); i++){
153                 StoredSnapshot snapshot = snapshotList.get(i);
154                 if(snapshotMetadata.equals(snapshot.metadata)){
155                     snapshotList.remove(i);
156                     break;
157                 }
158             }
159         }
160     }
161
162     @Override
163     public void doDelete(String persistentId, SnapshotSelectionCriteria snapshotSelectionCriteria)
164             throws Exception {
165         LOG.trace("doDelete: persistentId {}: maxSequenceNr: {}: maxTimestamp {}", persistentId,
166                 snapshotSelectionCriteria.maxSequenceNr(), snapshotSelectionCriteria.maxTimestamp());
167
168         List<StoredSnapshot> snapshotList = snapshots.get(persistentId);
169         if(snapshotList == null){
170             return;
171         }
172
173         synchronized (snapshotList) {
174             Iterator<StoredSnapshot> iter = snapshotList.iterator();
175             while(iter.hasNext()) {
176                 StoredSnapshot s = iter.next();
177                 if(matches(s, snapshotSelectionCriteria)) {
178                     LOG.trace("Deleting snapshot for sequenceNr: {}, timestamp: {}",
179                             s.metadata.sequenceNr(), s.metadata.timestamp());
180
181                     iter.remove();
182                 }
183             }
184         }
185     }
186
187     private static class StoredSnapshot {
188         private final SnapshotMetadata metadata;
189         private final Object data;
190
191         private StoredSnapshot(SnapshotMetadata metadata, Object data) {
192             this.metadata = metadata;
193             this.data = data;
194         }
195     }
196 }