2015-05-19 50 views
13

這是我的問題:我需要管理我的程序無法讀取或寫入的遠程連續緩衝區中的內存。它需要具有malloc()/ free()語義,並支持設置最小對齊和分片迴避(儘可能)。由於我無法直接讀取或寫入此緩衝區,因此需要使用本地結構來管理所有分配。我已經使用升壓保存單獨結構的內存堆分配器庫?

,因此,如果內部升壓東西可按摩要做到這一點,那將是巨大的。但是,我並不反對使用C庫或類似的東西。

舉個例子,我需要的非IPC版本:

boost::interprocess::basic_managed_external_buffer< 
        char, 
        boost::interprocess::rbtree_best_fit< 
              boost::interprocess::mutex_family, 
              boost::interprocess::offset_ptr<void>, 
              SOME_ALIGNMENT>, 
        boost::interprocess::iset_index> 

最好使用malloc /免費的語義,而不是新/刪除 但是沒有它曾經實際讀取或寫入底層緩衝區(和保持在一個單獨的緩衝器中的所有分配信息/數據結構)

任何想法?

P.S.我不希望boost :: interprocess示例是誤導,我只是熟悉界面,所以使用它作爲一個例子。應用程序並不是真正的進程間,分配器只能從我的應用程序中使用。

具體來說,我想是能夠與分配大小從128個字節一路管理16GB外部緩衝器至512MB。這是嚴格的64位代碼,但即使如此,我仍希望指針類型是模板參數,因此我可以明確地使用uint64_t。

+0

緩衝區有多大(或很小)?它可以*非常*大? –

+0

具體而言,我希望能夠管理一個16GB的外部緩衝區,分配大小從128字節一直到512MB。那*非常大嗎? –

+0

在什麼電腦上?在超高速計算機上有一個TB的RAM,它不會那麼大!你應該編輯你的問題,而不是在評論中添加信息。 –

回答

0

我發佈了關於我們實際完成的更新。我最終實現了自己的遠程內存分配器(下面的源代碼)。它在靈性上類似於answer Sam suggests,但使用boost侵入式RB樹來避免在釋放,加入等時進行一些日誌(N)查找。它是線程安全的並支持各種遠程指針/偏移量類型作爲模板參數。這在很多方面可能都不是很理想,但對於我們需要做的事情來說,它已經足夠好了。如果你發現錯誤,請告訴我。

/* 
* Thread-safe remote memory allocator 
* 
* Author: Yuriy Romanenko 
* Copyright (c) 2015 Lytro, Inc. 
* 
*/ 

#pragma once 

#include <memory> 
#include <mutex> 
#include <cstdint> 
#include <cstdio> 
#include <functional> 

#include <boost/intrusive/rbtree.hpp> 

namespace bi = boost::intrusive; 

template<typename remote_ptr_t = void*, 
     typename remote_size_t = size_t, 
     typename remote_uintptr_t = uintptr_t> 
class RemoteAllocator 
{ 
    /* Internal structure used for keeping track of a contiguous block of 
    * remote memory. It can be on one or two of the following RB trees: 
    * Free Chunks (sorted by size) 
    * All Chunks (sorted by remote pointer) 
    */ 
    struct Chunk 
    { 
     bi::set_member_hook<> mRbFreeChunksHook; 
     bi::set_member_hook<> mRbAllChunksHook; 

     remote_uintptr_t mOffset; 
     remote_size_t mSize; 
     bool mFree; 

     Chunk(remote_uintptr_t off, remote_size_t sz, bool fr) 
       : mOffset(off), mSize(sz), mFree(fr) 
     { 

     } 

     bool contains(remote_uintptr_t off) 
     { 
      return (off >= mOffset) && (off < mOffset + mSize); 
     } 
    private: 
     Chunk(const Chunk&); 
     Chunk& operator=(const Chunk&); 
    }; 

    struct ChunkCompareSize : public std::binary_function <Chunk,Chunk,bool> 
    { 
     bool operator() (const Chunk& x, const Chunk& y) const 
     { 
      return x.mSize < y.mSize; 
     } 
    }; 
    struct ChunkCompareOffset : public std::binary_function <Chunk,Chunk,bool> 
    { 
     bool operator() (const Chunk& x, const Chunk& y) const 
     { 
      return x.mOffset < y.mOffset; 
     } 
    }; 

    typedef bi::rbtree<Chunk, 
         bi::member_hook<Chunk, 
             bi::set_member_hook<>, 
             &Chunk::mRbFreeChunksHook>, 
         bi::compare<ChunkCompareSize> > FreeChunkTree; 

    typedef bi::rbtree<Chunk, 
         bi::member_hook<Chunk, 
             bi::set_member_hook<>, 
             &Chunk::mRbAllChunksHook>, 
         bi::compare<ChunkCompareOffset> > AllChunkTree; 

    // Thread safety lock 
    std::mutex mLock; 
    // Size of the entire pool 
    remote_size_t mSize; 
    // Start address of the pool 
    remote_ptr_t mStartAddr; 

    // Tree of free chunks 
    FreeChunkTree mFreeChunks; 
    // Tree of all chunks 
    AllChunkTree mAllChunks; 

    // This removes the chunk from both trees 
    Chunk *unlinkChunk(Chunk *c) 
    { 
     mAllChunks.erase(mAllChunks.iterator_to(*c)); 
     if(c->mFree) 
     { 
      mFreeChunks.erase(mFreeChunks.iterator_to(*c)); 
     } 
     return c; 
    } 

    // This reinserts the chunk into one or two trees, depending on mFree 
    Chunk *relinkChunk(Chunk *c) 
    { 
     mAllChunks.insert_equal(*c); 
     if(c->mFree) 
     { 
      mFreeChunks.insert_equal(*c); 
     } 
     return c; 
    } 

    /* This assumes c is 'free' and walks the mAllChunks tree to the left 
    * joining any contiguous free chunks into this one 
    */ 
    bool growFreeLeft(Chunk *c) 
    { 
     auto it = mAllChunks.iterator_to(*c); 
     if(it != mAllChunks.begin()) 
     { 
      it--; 
      if(it->mFree) 
      { 
       Chunk *left = unlinkChunk(&(*it)); 
       unlinkChunk(c); 
       c->mOffset = left->mOffset; 
       c->mSize = left->mSize + c->mSize; 
       delete left; 
       relinkChunk(c); 
       return true; 
      } 
     } 
     return false; 
    } 
    /* This assumes c is 'free' and walks the mAllChunks tree to the right 
    * joining any contiguous free chunks into this one 
    */ 
    bool growFreeRight(Chunk *c) 
    { 
     auto it = mAllChunks.iterator_to(*c); 
     it++; 
     if(it != mAllChunks.end()) 
     { 
      if(it->mFree) 
      { 
       Chunk *right = unlinkChunk(&(*it)); 
       unlinkChunk(c); 
       c->mSize = right->mSize + c->mSize; 
       delete right; 
       relinkChunk(c); 
       return true; 
      } 
     } 
     return false; 
    } 

public: 
    RemoteAllocator(remote_size_t size, remote_ptr_t startAddr) : 
     mSize(size), mStartAddr(startAddr) 
    { 
     /* Initially we create one free chunk the size of the entire managed 
     * memory pool, and add it to both trees 
     */ 
     Chunk *all = new Chunk(reinterpret_cast<remote_uintptr_t>(mStartAddr), 
           mSize, 
           true); 
     mAllChunks.insert_equal(*all); 
     mFreeChunks.insert_equal(*all); 
    } 

    ~RemoteAllocator() 
    { 
     auto it = mAllChunks.begin(); 

     while(it != mAllChunks.end()) 
     { 
      Chunk *pt = unlinkChunk(&(*it++)); 
      delete pt; 
     } 
    } 

    remote_ptr_t malloc(remote_size_t bytes) 
    { 
     std::unique_lock<std::mutex> lock(mLock); 
     auto fit = mFreeChunks.lower_bound(
        Chunk(reinterpret_cast<remote_uintptr_t>(mStartAddr), 
          bytes, 
          true)); 

     /* Out of memory */ 
     if(fit == mFreeChunks.end()) 
      return remote_ptr_t{0}; 

     Chunk *ret = &(*fit); 
     /* We need to split the chunk because it's not the exact size */ 
     /* Let's remove the node */ 
     mFreeChunks.erase(fit); 

     if(ret->mSize != bytes) 
     { 
      Chunk *right, *left = ret; 

      /* The following logic decides which way the heap grows 
      * based on allocation size. I am not 100% sure this actually 
      * helps with fragmentation with such a big threshold (50%) 
      * 
      * Check if we will occupy more than half of the chunk, 
      * in that case, use the left side. */ 
      if(bytes > ret->mSize/2) 
      { 
       right = new Chunk(left->mOffset + bytes, 
            left->mSize - bytes, 
            true); 
       relinkChunk(right); 

       left->mSize = bytes; 
       left->mFree = false; 

       ret = left; 
      } 
      /* We'll be using less than half, let's use the right side. */ 
      else 
      { 
       right = new Chunk(left->mOffset + left->mSize - bytes, 
            bytes, 
            false); 

       relinkChunk(right); 

       left->mSize = left->mSize - bytes; 
       mFreeChunks.insert_equal(*left); 

       ret = right; 
      } 
     } 
     else 
     { 
      ret->mFree = false; 
     } 

     return reinterpret_cast<remote_ptr_t>(ret->mOffset); 
    } 

    remote_ptr_t malloc_aligned(remote_size_t bytes, remote_size_t alignment) 
    { 
     remote_size_t bufSize = bytes + alignment; 
     remote_ptr_t mem = this->malloc(bufSize); 
     remote_ptr_t ret = mem; 
     if(mem) 
     { 
      remote_uintptr_t offset = reinterpret_cast<remote_uintptr_t>(mem); 
      if(offset % alignment) 
      { 
       offset = offset + (alignment - (offset % alignment)); 
      } 
      ret = reinterpret_cast<remote_ptr_t>(offset); 
     } 
     return ret; 
    } 

    void free(remote_ptr_t ptr) 
    { 
     std::unique_lock<std::mutex> lock(mLock); 
     Chunk ref(reinterpret_cast<remote_uintptr_t>(ptr), 0, false); 
     auto it = mAllChunks.find(ref); 
     if(it == mAllChunks.end()) 
     { 
      it = mAllChunks.upper_bound(ref); 
      it--; 
     } 
     if(!(it->contains(ref.mOffset)) || it->mFree) 
      throw std::runtime_error("Could not find chunk to free"); 

     Chunk *chnk = &(*it); 
     chnk->mFree = true; 
     mFreeChunks.insert_equal(*chnk); 

     /* Maximize space */ 
     while(growFreeLeft(chnk)); 
     while(growFreeRight(chnk)); 
    } 

    void debugDump() 
    { 
     std::unique_lock<std::mutex> lock(mLock); 
     int i = 0; 
     printf("----------- All chunks -----------\n"); 
     for(auto it = mAllChunks.begin(); it != mAllChunks.end(); it++) 
     { 
      printf(" [%d] %lu -> %lu (%lu) %s\n", 
       i++, 
       it->mOffset, 
       it->mOffset + it->mSize, 
       it->mSize, 
       it->mFree ? "(FREE)" : "(NOT FREE)"); 
     } 
     i = 0; 
     printf("----------- Free chunks -----------\n"); 
     for(auto it = mFreeChunks.begin(); it != mFreeChunks.end(); it++) 
     { 
      printf(" [%d] %lu -> %lu (%lu) %s\n", 
       i++, 
       it->mOffset, 
       it->mOffset + it->mSize, 
       it->mSize, 
       it->mFree ? "(FREE)" : "(NOT FREE)"); 
     } 
    } 
}; 
1

我不知道,關我的帽子頂部的任何執行罐頭可以使用的。但是,這似乎並不是特別難以實現,只需在C++標準庫中使用花園型容器。

我會建議使用兩個std::map S和一個std::multimap一個簡單的方法。假設bufaddr_t是一個不透明的整數,表示外部緩衝區中的地址。由於我們談論的是16個緩衝區,它必須是一個64位地址:

typedef uint64_t memblockaddr_t; 

同上分配的塊的大小。

typedef uint64_t memblocksize_t; 

你可以,我想,用別的東西memblockaddr_t,只要不透明數據類型有嚴格的弱序。

第一部分很簡單。保持所有分配的塊的軌跡:

std::map<memblockaddr_t, memblocksize_t> allocated; 

所以,當你成功地在外部緩衝區分配的內存塊,你在這裏插入。當你想取消分配一塊內存時,你在這裏查找分配塊的大小,並刪除映射條目。夠簡單。

但是,當然,是不是故事的全部。現在,我們需要跟蹤可用的,未分配的內存塊。讓我們做這樣說:

typedef std::multimap<memblocksize_t, memblockaddr_t> unallocated_t; 

unallocated_t unallocated; 

std::map<memblockaddr_t, unallocated_t::iterator> unallocated_lookup; 

unallocated是所有未分配塊的集合在你的外部緩衝器,由塊大小鍵。關鍵是塊大小。因此,當您需要分配特定大小的內存塊時,可以簡單地使用lower_bound()方法(或者upper_bound(),如果您願意的話)立即找到第一個塊,其大小至少與您想要分配的塊相同。

而且既然你可以有很多相同大小的塊,unallocated必須是std::multimap

此外,unallocated_lookup是一張映射,由每個未分配塊的地址作爲鍵值,爲您提供此塊在unallocated中的條目的迭代器。爲什麼你需要這一點將在一瞬間變得清晰。

所以:

初始化與單個條目的新的,完全未分配的緩衝液:

memblockaddr_t beginning=0; // Or, whatever represents the start of the buffer. 
auto p=unallocated.insert(std::make_pair(BUFFER_SIZE, beginning)).first; 
unallocated_lookup.insert(std::make_pair(beginning, p)); 

然後:)

  1. 要分配的塊,使用LOWER_BOUND(/ UPPER_BOUND ()的方法,正如我上面提到的,找到至少與第一個未分配塊相同的塊,並從unallocatedunallocated_lookup中刪除其條目。如果它超出你的需要,將多餘的部分返還給池,就好像你不需要的額外部分被重新分配(下面的步驟3)。最後,將它插入allocated數組中,以便記住分配的塊有多大。

  2. 解除分配塊,看看它的allocated陣中,得到它的大小,從allocated數組中刪除它,然後:

  3. 將其插入unallocatedunallocated_lookup,類似於如何初始未分配的塊被插入,參見上文。

  4. 但是你還沒有完成。然後必須使用unallocated_lookup來簡單地查找內存緩衝區中的前一個未分配的塊和下一個未分配的塊。如果其中一個或兩個緊鄰新釋放的塊,則必須將它們合併在一起。這應該是一個非常明顯的過程。您可以簡單地通過從unallocatedunallocated_lookup正式刪除相鄰塊的運動,然後釋放單個合併塊。

這就是unallocated_lookup的真正目的,是能夠很容易地合併鄰近的未分配塊。

據我所知,所有上述操作都帶有對數的複雜性。它們完全基於std::mapstd::multimap的對數複雜度方法,而沒有其他。

最後:

根據您的應用程序的行爲,可以輕鬆地調整實施,內部圍捕分配的大塊您所希望的任何倍數的大小。或者調整分配策略 - 從最小的塊中分配足夠大的數據以滿足分配請求,或者從大的未分配的塊中分配(簡單的,使用end()找到它)等等。

這是您自己實現的一個優勢 - 您總是可以靈活地調整自己的實現,那麼您通常會使用一些外部庫。