2013-02-01 45 views
3

我們要測試,如果一個列有TTL(時間到現場)財產最終會從卡桑德拉完全刪除隨着空行其中包含它。卡桑德拉+赫克託,在試驗力壓實,檢查空行被刪除

正如我理解,該算法用於測試本品行是

  • 保存的對象時,設置TTL爲列當TTL時間經過
  • 等待,檢查返回的值是空
  • 等待當GC_GRACE_SECONDS perion通過
  • 檢查該行也被刪除

我沒有檢查最後一個項目。

正如我發現的(例如,herehere和其他地方),我需要運行壓實。類似的問題已經提出(例如Hector (Cassandra) Delete Anomaly),但我沒有找到任何幫助,谷歌搜索沒有什麼幫助。

所以,問題是,我怎麼能強迫壓實從我的集成測試(使用赫克託),以確保其行爲與預期相同?還有其他方法可以做到這一點嗎?

P.S.截斷列家族不是一種選擇。


這裏是詳細信息。

我的測試:

private static final String KEYSPACE = "KEYSPACE"; 
private static final String COLUMN_FAMILY = "COLUMN_FAMILY"; 

private static final int GC_CRACE_SECONDS = 5; 

// sut 
private CassandraService cassandraService; 

// dependencies 
private Cluster cluster = HFactory.getOrCreateCluster("tstCltr", 
    "localhost:9160"); 

private Keyspace keyspace; 

@BeforeClass 
public static void setupBeforeClass() { 
    EmbeddedCassandraDaemon.getEmbeddedCassandraDaemon(); 
} 

@Before 
public void setUp() throws Exception { 
    keyspace = createKeyspace(KEYSPACE, cluster, 
     new QuorumAllConsistencyLevelPolicy()); 
    cassandraService = new CassandraService(cluster, KEYSPACE, 
     COLUMN_FAMILY, GC_CRACE_SECONDS); 
} 

@Test 
public void rowGetsRemovedAfterGCGraceSeconds() throws Exception { 
    Object obj = "OBJECT"; 
    String rowKey = "key"; 
    String columnName = "columnName"; 
    logger.info("before persisting rows count is {}" + countRows()); 

    cassandraService.persistObjectWithTtl(rowKey, columnName, obj, 5); 

    logger.info("after persisting rows count is {}" + countRows()); 

    Object value = retrieve(rowKey, columnName); 
    assertNotNull(value); 

    logger.info("before TTL passes rows count is {}" + countRows()); 

    TimeUnit.SECONDS.sleep(6); 

    Object nullValue = retrieve(rowKey, columnName); 
    assertNull(nullValue); 

    logger.info("after TTL passes rows count is {}" + countRows()); 

    TimeUnit.SECONDS.sleep(10); 

    logger.info("wait 10 more seconds... rows count is {}" + countRows()); 
    System.out.println("================================" + countRows()); 

    TimeUnit.SECONDS.sleep(120); 

    int countRows = countRows(); 
    logger.info("wait 2 more minutes... rows count is {}" + countRows); 
    assertEquals(0, countRows); 
} 

代碼爲持續存在:一列族

public void persistObjectWithTtl(Object rowKey, Object columnName, 
     Object obj, int ttl) { 
    LOGGER.debug("Persist {}/{}", rowKey, columnName); 
    HColumn<Object, Object> column = createColumn(columnName, obj, 
      SERIALIZER, SERIALIZER); 
    column.setTtl(ttl); 
    executeInsertion(rowKey, column); 
} 

private void executeInsertion(Object rowKey, HColumn<Object, Object> column) { 
    Mutator<Object> mutator = createMutator(keyspace, SERIALIZER); 
    mutator.addInsertion(rowKey, this.columnFamilyName, column); 
    mutator.execute(); 
} 

設置GcGraceSeconds:

private void addColumnFamily(String keySpaceName, String columnFamilyName, 
      int gcGraceSeconds) { 
    ColumnFamilyDefinition columnFamilyDefinition = 
     createColumnFamilyDefinition(keySpaceName, columnFamilyName); 

    ThriftCfDef columnFamilyWithGCGraceSeconds = 
     new ThriftCfDef(columnFamilyDefinition); 
    columnFamilyWithGCGraceSeconds.setGcGraceSeconds(gcGraceSeconds); 

    cluster.addColumnFamily(columnFamilyWithGCGraceSeconds); 
} 

並且支持計數行代碼,​​:

public int countRows() { 
    int rowCount = 100; 

    ObjectSerializer serializer = ObjectSerializer.get(); 
    RangeSlicesQuery<Object, Object, Object> rangeSlicesQuery = 
      HFactory.createRangeSlicesQuery(keyspace, serializer, 
       serializer, serializer) 
        .setColumnFamily(COLUMN_FAMILY) 
        .setRange(null, null, false, 10) 
        .setRowCount(rowCount); 

    Object lastKey = null; 

    int i = 0; 
    while (true) { 
     rangeSlicesQuery.setKeys(lastKey, null); 

     QueryResult<OrderedRows<Object, Object, Object>> result = 
      rangeSlicesQuery.execute(); 
     OrderedRows<Object, Object, Object> rows = result.get(); 
     Iterator<Row<Object, Object, Object>> rowsIterator = rows.iterator(); 

     if (lastKey != null && rowsIterator != null) { 
      rowsIterator.next(); 
     } 

     while (rowsIterator.hasNext()) { 
      Row<Object, Object, Object> row = rowsIterator.next(); 
      lastKey = row.getKey(); 
      i++; 

      if (row.getColumnSlice().getColumns().isEmpty()) { 
       continue; 
      } 
     } 

     if (rows.getCount() < rowCount) { 
      break; 
     } 

    } 

    return i; 
} 

感謝。


更新:

的原因是數據量不enoght壓實運行,所以我需要把更多的數據,並刷新表更頻繁地到磁盤。所以我結束了以下測試案例:

@Test 
public void rowGetsRemovedAfterGCGraceSeconds() throws Exception { 
    final int expectedAmount = 50000; 

    logger.info("before persisting rows count is {}", countRows()); 

    for (int i = 0; i < expectedAmount; i++) { 
     String rowKey = RandomStringUtils.randomAlphanumeric(128); 
     Object obj = RandomStringUtils.randomAlphanumeric(1000); 
     cassandraService.persistObjectWithTtl(rowKey, COLUMN_NAME, obj, 20); 

     if (i % 100 == 0) { 
      StorageService.instance.forceTableFlush(KEYSPACE, COLUMN_FAMILY); 
     } 
    } 

    logger.info("causing major compaction..."); 
    StorageService.instance.forceTableCompaction(KEYSPACE, COLUMN_FAMILY); 
    logger.info("after major compaction rows count is {}", countRows()); 

    waitAtMost(Duration.TWO_MINUTES) 
     .pollDelay(Duration.TWO_SECONDS) 
     .pollInterval(Duration.ONE_HUNDRED_MILLISECONDS) 
     .until(new Callable<Boolean>() { 
      @Override 
      public Boolean call() throws Exception { 
       int countRows = countRows(); 
       logger.info("the rows count is {}", countRows); 
       return countRows < expectedAmount; 
      } 
     }); 
} 

全碼:test class and sut

回答

1

因爲你是使用Java,您可以輕鬆地強制通過JMX壓實使用org.apache.cassandra.db.StorageService MBean的forceTableCompaction(keyspace, columnFamily)方法。

+0

我試圖通過JConsole的,並導致壓實連接,但該行仍然存在。我在日誌中看到的是「compaction.CompactionManager - 在COLUMN_FAMILY中無法壓縮;如果您希望強制壓縮單個sstables,請使用forceUserDefinedCompaction(例如,對於墓碑集合)「 –

+0

啊,你只需要首先刷新列族,然後就可以在同一個MBean上使用JMX方法:''forceTableFlush(keyspace,columnFamily)'' –

+0

當我嘗試刷新列族第一,我得到相同的信息 –