2013-10-09 37 views
0

所以我的一般問題是「是否有可能讓Accumulo BatchScanner僅將每個範圍的第一個結果拉回來?」accumulo - batchscanner:每個範圍一個結果

現在關於我的用例的一些細節可能有更好的方法來解決這個問題。我有表示來自不同系統的消息的數據。可以有不同類型的消息。我的用戶希望能夠詢問系統的問題,例如「給我所有這些系統某個特定時間的最新消息」。

我的表佈局看起來像這樣

rowid: system_name, family: message_type, qualifier: masked_timestamp, value: message_text 

的想法是,用戶給了我他們關心,消息的類型,並有一定的時間戳系統的列表。我使用了蒙版時間戳,以便表格最先排序。這樣,當我掃描一個時間戳時,第一個結果是在那個時間之前的最近的結果。我正在使用BatchScanner,因爲我爲每個查詢搜索多個系統。我可以讓BatchScanner只獲取每個Range的第一個結果嗎?我無法指定特定的鍵,因爲最近的可能與用戶給出的日期時間不匹配。

目前,我正在使用BatchScanner並忽略除每個鍵的第一個結果以外的所有結果。它現在可以正常工作,但當我只關心每個系統/類型的第一個結果時,通過網絡撤回特定系統/類型的所有數據似乎是一種浪費。

編輯

使用FirstEntryInRowIterator我嘗試

@Test 
public void testFirstEntryIterator() throws Exception 
{ 
    Connector connector = new MockInstance("inst").getConnector("user", new PasswordToken("password")); 
    connector.tableOperations().create("testing"); 

    BatchWriter writer = writer(connector, "testing"); 
    writer.addMutation(mutation("row", "fam", "qual1", "val1")); 
    writer.addMutation(mutation("row", "fam", "qual2", "val2")); 
    writer.addMutation(mutation("row", "fam", "qual3", "val3")); 
    writer.close(); 

    Scanner scanner = connector.createScanner("testing", new Authorizations()); 
    scanner.addScanIterator(new IteratorSetting(50, FirstEntryInRowIterator.class)); 

    Key begin = new Key("row", "fam", "qual2"); 
    scanner.setRange(new Range(begin, begin.followingKey(PartialKey.ROW_COLFAM_COLQUAL))); 

    int numResults = 0; 
    for (Map.Entry<Key, Value> entry : scanner) 
    { 
     Assert.assertEquals("qual2", entry.getKey().getColumnQualifier().toString()); 
     numResults++; 
    } 

    Assert.assertEquals(1, numResults); 
} 

我的目標是返回的條目將成爲( 「行」, 「FAM」, 「qual2」, 「值2」),但我得到0結果。它幾乎看起來像迭代器在範圍之前被應用?我還沒有挖掘到這一點。

回答

3

這聽起來像是使用Accumulo的SortedKeyValueIterator之一,特別是FirstEntryInRowIterator(包含在accumulo-core工件中)的一個很好的用例。

使用FirstEntryInRowIterator創建IteratorSetting並將其添加到BatchScanner。這將返回該system_name中的第一個鍵/值,然後停止避免客戶端忽略所有其他結果的開銷。

的FirstEntryInRowIterator的快速修改可能會得到你想要的東西:

/* 
* Licensed to the Apache Software Foundation (ASF) under one or more 
* contributor license agreements. See the NOTICE file distributed with 
* this work for additional information regarding copyright ownership. 
* The ASF licenses this file to You under the Apache License, Version 2.0 
* (the "License"); you may not use this file except in compliance with 
* the License. You may obtain a copy of the License at 
* 
*  http://www.apache.org/licenses/LICENSE-2.0 
* 
* Unless required by applicable law or agreed to in writing, software 
* distributed under the License is distributed on an "AS IS" BASIS, 
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
* See the License for the specific language governing permissions and 
* limitations under the License. 
*/ 
package org.apache.accumulo.core.iterators; 

import java.io.IOException; 
import java.util.Collection; 
import java.util.HashMap; 
import java.util.Map; 

import org.apache.accumulo.core.client.IteratorSetting; 
import org.apache.accumulo.core.data.ByteSequence; 
import org.apache.accumulo.core.data.Key; 
import org.apache.accumulo.core.data.PartialKey; 
import org.apache.accumulo.core.data.Range; 
import org.apache.accumulo.core.data.Value; 
import org.apache.hadoop.io.Text; 

public class FirstEntryInRangeIterator extends SkippingIterator implements OptionDescriber { 

    // options 
    static final String NUM_SCANS_STRING_NAME = "scansBeforeSeek"; 

    // iterator predecessor seek options to pass through 
    private Range latestRange; 
    private Collection<ByteSequence> latestColumnFamilies; 
    private boolean latestInclusive; 

    // private fields 
    private Text lastRowFound; 
    private int numscans; 

    /** 
    * convenience method to set the option to optimize the frequency of scans vs. seeks 
    */ 
    public static void setNumScansBeforeSeek(IteratorSetting cfg, int num) { 
    cfg.addOption(NUM_SCANS_STRING_NAME, Integer.toString(num)); 
    } 

    // this must be public for OptionsDescriber 
    public FirstEntryInRangeIterator() { 
    super(); 
    } 

    public FirstEntryInRangeIterator(FirstEntryInRangeIterator other, IteratorEnvironment env) { 
    super(); 
    setSource(other.getSource().deepCopy(env)); 
    } 

    @Override 
    public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) { 
    return new FirstEntryInRangeIterator(this, env); 
    } 

    @Override 
    public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException { 
    super.init(source, options, env); 
    String o = options.get(NUM_SCANS_STRING_NAME); 
    numscans = o == null ? 10 : Integer.parseInt(o); 
    } 

    // this is only ever called immediately after getting "next" entry 
    @Override 
    protected void consume() throws IOException { 
    if (finished == true || lastRowFound == null) 
     return; 
    int count = 0; 
    while (getSource().hasTop() && lastRowFound.equals(getSource().getTopKey().getRow())) { 

     // try to efficiently jump to the next matching key 
     if (count < numscans) { 
     ++count; 
     getSource().next(); // scan 
     } else { 
     // too many scans, just seek 
     count = 0; 

     // determine where to seek to, but don't go beyond the user-specified range 
     Key nextKey = getSource().getTopKey().followingKey(PartialKey.ROW); 
     if (!latestRange.afterEndKey(nextKey)) 
      getSource().seek(new Range(nextKey, true, latestRange.getEndKey(), latestRange.isEndKeyInclusive()), latestColumnFamilies, latestInclusive); 
     else { 
      finished = true; 
      break; 
     } 
     } 
    } 
    lastRowFound = getSource().hasTop() ? getSource().getTopKey().getRow(lastRowFound) : null; 
    } 

    private boolean finished = true; 

    @Override 
    public boolean hasTop() { 
    return !finished && getSource().hasTop(); 
    } 

    @Override 
    public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException { 
    // save parameters for future internal seeks 
    latestRange = range; 
    latestColumnFamilies = columnFamilies; 
    latestInclusive = inclusive; 
    lastRowFound = null; 

    super.seek(range, columnFamilies, inclusive); 
    finished = false; 

    if (getSource().hasTop()) { 
     lastRowFound = getSource().getTopKey().getRow(); 
     if (range.beforeStartKey(getSource().getTopKey())) 
     consume(); 
    } 
    } 

    @Override 
    public IteratorOptions describeOptions() { 
    String name = "firstEntry"; 
    String desc = "Only allows iteration over the first entry per range"; 
    HashMap<String,String> namedOptions = new HashMap<String,String>(); 
    namedOptions.put(NUM_SCANS_STRING_NAME, "Number of scans to try before seeking [10]"); 
    return new IteratorOptions(name, desc, namedOptions, null); 
    } 

    @Override 
    public boolean validateOptions(Map<String,String> options) { 
    try { 
     String o = options.get(NUM_SCANS_STRING_NAME); 
     if (o != null) 
     Integer.parseInt(o); 
    } catch (Exception e) { 
     throw new IllegalArgumentException("bad integer " + NUM_SCANS_STRING_NAME + ":" + options.get(NUM_SCANS_STRING_NAME), e); 
    } 
    return true; 
    } 

} 
+0

它看起來像原來的海報正在創造每行一個範圍。如果使用FirstEntryInRowIterator,則甚至不需要這樣做。批量掃描可能仍然有用,但不能實現此特定功能。 – Christopher

+0

感謝您的回覆elserj。我開始玩這個,但一直沒有得到我想要的。我想深入一點。查看我的編輯,瞭解我目前看到的內容。 – jeff

+0

啊,所以FirstEntryInRowIterator試圖從你提供的範圍的startKey中找到你行中的第一個鍵/值(在你的例子中,它將你從「row fam:qual2」提供的範圍的startKey更改爲「行」)。 因此,迭代器會嘗試返回關鍵字「rom fam:qual1」,這恰好落在您提供的範圍之外,因此您將得不到結果。 – elserj

相關問題