2012-11-20 124 views
3

如何通過行數讀取和分割/分塊文件?C++ - 如何分塊文件進行同步/異步處理?

我想分割一個文件到不同的緩衝區,同時確保一條線不會在兩個或多個緩衝區之間分割。我打算將這些緩衝區傳遞到它們自己的pthread中,以便它們可以執行某種類型的同步/異步處理。

我已閱讀reading and writing in chunks on linux using c下面的答案,但我不認爲它完全回答了確保一條線不會分成兩個或多個緩衝區的問題。

+2

這些類型的方案從未按照預期工作。你有一個擁有多核的處理器,這是讓多個線程高效工作的好方法。但是,你仍然只有一個磁盤,線程只是等待輪到它讀取。 –

+0

@HansPassant:如果OP知道任務是CPU綁定的,那麼可能並非如此。但是,是的,你很可能是正確的。雖然'pbzip'和'xz'都在塊級使用這種技術,效果很好。 – Omnifarious

+0

我其實並沒有想到這一點。自那時起,我就想到了解決我的宏觀問題的更簡潔的方法,而不涉及使用我提出的問題。儘管如此,我還是好奇的! – Ken

回答

1

我會選擇以字節爲單位的塊大小。然後,我會在文件中尋找適當的位置,並一次讀取少量的字節,直到獲得換行符。

第一個塊的最後一個字符是換行符。第二個塊的第一個字符是換行符之後的字符。

總是尋求一個pagesize()邊界並一次讀入pagesize()字節來搜索你的換行符。這樣可以確保您只需從磁盤中獲取所需的最低限額即可找到您的界限。您可以嘗試一次讀取128字節或其他內容。但是你可能會冒更多的系統調用的風險。

我寫了一個示例程序,用於計算字母頻率。當然,這在很大程度上毫無意義地分解爲線程,因爲它幾乎可以肯定IO限制。換行符也不重要,因爲它不是面向行的。但是,這只是一個例子。另外,它非常依賴於你有一個相當完整的C++ 11實現。

他們主要功能是這樣的:

// Find the offset of the next newline given a particular desired offset. 
off_t next_linestart(int fd, off_t start) 
{ 
    using ::std::size_t; 
    using ::ssize_t; 
    using ::pread; 

    const size_t bufsize = 4096; 
    char buf[bufsize]; 

    for (bool found = false; !found;) { 
     const ssize_t result = pread(fd, buf, bufsize, start); 
     if (result < 0) { 
     throw ::std::system_error(errno, ::std::system_category(), 
            "Read failure trying to find newline."); 
     } else if (result == 0) { 
     // End of file 
     found = true; 
     } else { 
     const char * const nl_loc = ::std::find(buf, buf + result, '\n'); 
     if (nl_loc != (buf + result)) { 
      start += ((nl_loc - buf) + 1); 
      found = true; 
     } else { 
      start += result; 
     } 
     } 
    } 
    return start; 
} 

還要注意,我用pread。當你有多個線程從文件的不同部分讀取時,這是絕對必要的。

文件描述符是您的線程之間的共享資源。當一個線程使用普通函數從文件讀取時,它會改變關於這個共享資源(文件指針)的細節。文件指針是文件中下一次讀取的位置。

在您每次閱讀之前,只需使用lseek就不會有幫助,因爲它會在lseekread之間引入爭用條件。

pread函數允許您從文件中的特定位置讀取一堆字節。它也不會改變文件指針。除了不改變文件指針的事實外,它就像在同一個調用中合併lseekread一樣。

+0

你怎麼知道你什麼時候換了換行符? – Ken

+1

@Ken:掃描您爲新行字符讀取的緩衝區。記下緩衝區起始處的文件偏移量(你在哪裏搜索到)和新行緩衝區內的偏移量。添加這兩個,你得到一個接近你的塊大小的換行符的文件偏移量。 – Omnifarious

+0

@Ken - 我主要是給你寫了一個使用我的技術的示例程序。我不會去努力確保文件訪問是頁面對齊的。如果你真的想盡可能快地做出事情,你應該確保所有的文件訪問都是頁面對齊的,如果可能的話,那些緩衝區也是頁面對齊的。這允許OS通過簡單地將緩衝區映射到進程的地址空間而不是複製來優化讀取。 – Omnifarious

2

文件是如何編碼的?如果每個字節表示一個字符,我會做到以下幾點:

  1. 存儲器映射使用mmap()文件。
  2. 通過根據適當的塊大小對其進行計算來告訴他們大致的開始和結束。
  3. 通過查找下一個'\n'找到每個作業的實際開始和結束。
  4. 同時處理相應的塊。
  5. 請注意,第一塊需要特殊處理,因爲它的開始不是近似的,而是確切的。
0

定義緩衝區的類。爲每個頁面分配一個大小爲頁面大小倍數的大型緩衝區空間和一個開始/結束索引,一種從傳入流中讀取緩衝區的方法以及另一個* buffer實例作爲參數的'lineParse'方法。

製作一些*緩衝區並將它們存儲在生產者 - 消費者池隊列中。打開文件,從池中獲取一個緩衝區,並從頭到尾讀入緩衝區空間(返回錯誤/ EOF的布爾值)。從池中獲取另一個*緩衝區並將其傳遞到前一個的lineparse()。在那裏,從數據的末尾向後搜索,尋找newLine。找到後,重新加載最後一行的末尾索引和memcpy片段(如果有的話 - 你可能偶爾會很幸運:),進入新的,傳遞的*緩衝區並設置其開始索引。第一個緩衝區現在有整行,可以排隊等待處理這些行的線程。第二個緩衝區具有從第一個緩衝區複製而來的行的片段,並且更多數據可以從磁盤讀取到其開始索引處的緩衝區空間中。

線處理線程可以將'used'*緩衝區回收到池中。

繼續下去,直到EOF(或錯誤:)。

如果可以,請將方法添加到執行緩衝區處理的緩衝區類中。

使用大緩衝區類並從結尾解析將會比持續讀取小比特,從一開始就尋找換行符更加高效。線程間通信速度慢,可以通過的緩衝區越大越好。

使用緩衝池消除了連續的新/刪除並提供了流控制 - 如果磁盤讀取線程比處理更快,則池將清空,並且磁盤讀取線程將阻塞,直到某些使用的緩衝區被回收。這可以防止內存失控。

請注意,如果您使用多個處理線程,則緩衝區可能會「無序」處理 - 這可能或可能不重要。

通過確保與磁盤讀取延遲並行處理的行的優點大於線程間通信的開銷,您只能在這種情況下獲得收益 - 在線程之間傳遞小緩衝區很可能是反向的,生產力。

整體速度較快,但延遲較大的網絡磁盤可能會遇到最大的加速。