我們要測試,如果一個列有TTL(時間到現場)財產最終會從卡桑德拉完全刪除隨着空行其中包含它。卡桑德拉+赫克託,在試驗力壓實,檢查空行被刪除
正如我理解,該算法用於測試本品行是
- 保存的對象時,設置TTL爲列當TTL時間經過
- 等待,檢查返回的值是空
- 等待當GC_GRACE_SECONDS perion通過
- 檢查該行也被刪除
我沒有檢查最後一個項目。
正如我發現的(例如,here或here和其他地方),我需要運行壓實。類似的問題已經提出(例如Hector (Cassandra) Delete Anomaly),但我沒有找到任何幫助,谷歌搜索沒有什麼幫助。
所以,問題是,我怎麼能強迫壓實從我的集成測試(使用赫克託),以確保其行爲與預期相同?還有其他方法可以做到這一點嗎?
P.S.截斷列家族不是一種選擇。
這裏是詳細信息。
我的測試:
private static final String KEYSPACE = "KEYSPACE";
private static final String COLUMN_FAMILY = "COLUMN_FAMILY";
private static final int GC_CRACE_SECONDS = 5;
// sut
private CassandraService cassandraService;
// dependencies
private Cluster cluster = HFactory.getOrCreateCluster("tstCltr",
"localhost:9160");
private Keyspace keyspace;
@BeforeClass
public static void setupBeforeClass() {
EmbeddedCassandraDaemon.getEmbeddedCassandraDaemon();
}
@Before
public void setUp() throws Exception {
keyspace = createKeyspace(KEYSPACE, cluster,
new QuorumAllConsistencyLevelPolicy());
cassandraService = new CassandraService(cluster, KEYSPACE,
COLUMN_FAMILY, GC_CRACE_SECONDS);
}
@Test
public void rowGetsRemovedAfterGCGraceSeconds() throws Exception {
Object obj = "OBJECT";
String rowKey = "key";
String columnName = "columnName";
logger.info("before persisting rows count is {}" + countRows());
cassandraService.persistObjectWithTtl(rowKey, columnName, obj, 5);
logger.info("after persisting rows count is {}" + countRows());
Object value = retrieve(rowKey, columnName);
assertNotNull(value);
logger.info("before TTL passes rows count is {}" + countRows());
TimeUnit.SECONDS.sleep(6);
Object nullValue = retrieve(rowKey, columnName);
assertNull(nullValue);
logger.info("after TTL passes rows count is {}" + countRows());
TimeUnit.SECONDS.sleep(10);
logger.info("wait 10 more seconds... rows count is {}" + countRows());
System.out.println("================================" + countRows());
TimeUnit.SECONDS.sleep(120);
int countRows = countRows();
logger.info("wait 2 more minutes... rows count is {}" + countRows);
assertEquals(0, countRows);
}
代碼爲持續存在:一列族
public void persistObjectWithTtl(Object rowKey, Object columnName,
Object obj, int ttl) {
LOGGER.debug("Persist {}/{}", rowKey, columnName);
HColumn<Object, Object> column = createColumn(columnName, obj,
SERIALIZER, SERIALIZER);
column.setTtl(ttl);
executeInsertion(rowKey, column);
}
private void executeInsertion(Object rowKey, HColumn<Object, Object> column) {
Mutator<Object> mutator = createMutator(keyspace, SERIALIZER);
mutator.addInsertion(rowKey, this.columnFamilyName, column);
mutator.execute();
}
設置GcGraceSeconds:
private void addColumnFamily(String keySpaceName, String columnFamilyName,
int gcGraceSeconds) {
ColumnFamilyDefinition columnFamilyDefinition =
createColumnFamilyDefinition(keySpaceName, columnFamilyName);
ThriftCfDef columnFamilyWithGCGraceSeconds =
new ThriftCfDef(columnFamilyDefinition);
columnFamilyWithGCGraceSeconds.setGcGraceSeconds(gcGraceSeconds);
cluster.addColumnFamily(columnFamilyWithGCGraceSeconds);
}
並且支持計數行代碼,:
和public int countRows() {
int rowCount = 100;
ObjectSerializer serializer = ObjectSerializer.get();
RangeSlicesQuery<Object, Object, Object> rangeSlicesQuery =
HFactory.createRangeSlicesQuery(keyspace, serializer,
serializer, serializer)
.setColumnFamily(COLUMN_FAMILY)
.setRange(null, null, false, 10)
.setRowCount(rowCount);
Object lastKey = null;
int i = 0;
while (true) {
rangeSlicesQuery.setKeys(lastKey, null);
QueryResult<OrderedRows<Object, Object, Object>> result =
rangeSlicesQuery.execute();
OrderedRows<Object, Object, Object> rows = result.get();
Iterator<Row<Object, Object, Object>> rowsIterator = rows.iterator();
if (lastKey != null && rowsIterator != null) {
rowsIterator.next();
}
while (rowsIterator.hasNext()) {
Row<Object, Object, Object> row = rowsIterator.next();
lastKey = row.getKey();
i++;
if (row.getColumnSlice().getColumns().isEmpty()) {
continue;
}
}
if (rows.getCount() < rowCount) {
break;
}
}
return i;
}
感謝。
更新:
的原因是數據量不enoght壓實運行,所以我需要把更多的數據,並刷新表更頻繁地到磁盤。所以我結束了以下測試案例:
@Test
public void rowGetsRemovedAfterGCGraceSeconds() throws Exception {
final int expectedAmount = 50000;
logger.info("before persisting rows count is {}", countRows());
for (int i = 0; i < expectedAmount; i++) {
String rowKey = RandomStringUtils.randomAlphanumeric(128);
Object obj = RandomStringUtils.randomAlphanumeric(1000);
cassandraService.persistObjectWithTtl(rowKey, COLUMN_NAME, obj, 20);
if (i % 100 == 0) {
StorageService.instance.forceTableFlush(KEYSPACE, COLUMN_FAMILY);
}
}
logger.info("causing major compaction...");
StorageService.instance.forceTableCompaction(KEYSPACE, COLUMN_FAMILY);
logger.info("after major compaction rows count is {}", countRows());
waitAtMost(Duration.TWO_MINUTES)
.pollDelay(Duration.TWO_SECONDS)
.pollInterval(Duration.ONE_HUNDRED_MILLISECONDS)
.until(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
int countRows = countRows();
logger.info("the rows count is {}", countRows);
return countRows < expectedAmount;
}
});
}
我試圖通過JConsole的,並導致壓實連接,但該行仍然存在。我在日誌中看到的是「compaction.CompactionManager - 在COLUMN_FAMILY中無法壓縮;如果您希望強制壓縮單個sstables,請使用forceUserDefinedCompaction(例如,對於墓碑集合)「 –
啊,你只需要首先刷新列族,然後就可以在同一個MBean上使用JMX方法:''forceTableFlush(keyspace,columnFamily)'' –
當我嘗試刷新列族第一,我得到相同的信息 –