2011-09-04 85 views
1

我試圖實現使用Berkeley DB的併發持久的隊列。作爲首發我試圖做兩個過程,這兩個追加到DB:的Berkeley DB,併發隊列

#include <unistd.h> 
#include <sstream> 
#include <db_cxx.h> 

class Queue : public DbEnv 
{ 
    public: 
    Queue () : 
     DbEnv(0), 
     db(0) 
    { 
     set_flags(DB_CDB_ALLDB, 1); 
     open("/tmp/db", DB_INIT_LOCK | 
       DB_INIT_LOG | 
       DB_INIT_TXN | 
       DB_INIT_MPOOL | 
       DB_RECOVER | 
       DB_CREATE  | 
       DB_THREAD, 
       0); 

     db = new Db(this, 0); 
     db->set_flags(DB_RENUMBER); 
     db->open(NULL, "db", NULL, DB_RECNO, DB_CREATE | DB_AUTO_COMMIT | DB_THREAD, 0); 
    } 
    virtual ~Queue() 
    { 
     db->close(0); 
     delete db; 
     close(0); 
    } 

    protected: 
    Db * db; 
}; 

class Enqueue : public Queue 
{ 
    public: 
    Enqueue () : Queue() { } 
    virtual ~Enqueue() { } 

    bool push(const std::string& s) 
    { 
     int res; 
     DbTxn * txn; 

     try { 
      txn_begin(NULL, &txn, DB_TXN_SYNC | DB_TXN_WAIT); 

      db_recno_t k0[4]; // not sure how mutch data is needs??? 
      k0[0] = 0; 

      Dbt val((void*)s.c_str(), s.length()); 
      Dbt key((void*)&k0, sizeof(k0[0])); 
      key.set_ulen(sizeof(k0)); 
      key.set_flags(DB_DBT_USERMEM); 

      res = db->put(txn, &key, &val, DB_APPEND); 

      if(res == 0) { 
       txn->commit(0); 
       return true; 

      } else { 
       std::cerr << "push failed: " << res << std::endl; 
       txn->abort(); 
       return false; 

      } 
     } catch(DbException e) { 
      std::cerr << "DB What()" << e.what() << std::endl; 
      txn->abort(); 
      return false; 
     } catch(std::exception e) { 
      std::cerr << "What()" << e.what() << std::endl; 
      txn->abort(); 
      return false; 
     } catch(...) { 
      std::cerr << "Unknown error" << std::endl; 
      txn->abort(); 
      return false; 
     } 
    } 
}; 

using namespace std; 

int main(int argc, const char *argv[]) 
{ 
    fork(); 

    Enqueue e; 

    stringstream ss; 
    for(int i = 0; i < 10; i++){ 
     ss.str(""); 
     ss << "asdf" << i; 
     cout << ss.str() << endl; 
     if(! e.push(ss.str())) 
      break; 
    } 

    return 0; 
} 

編譯它:

g++ test.cxx -I/usr/include/db4.8 -ldb_cxx-4.8 

創建DB-DIR

mkdir /tmp/db 

當我運行我得到的所有樣的錯誤(分段故障,分配錯誤,有的時候它的實際工作)

我我確定我錯過了一些鎖定,但我不知道該怎麼做。所以,任何提示和/或建議來解決這個問題是最受歡迎的。

回答

2

只是爲了記錄在案,這裏是解決方案,我更googleing和trial'n'error後紛紛入駐上。

應用程序是一個打電話回家的過程,其中生成器是添加數據,以及消費者嘗試送回家吧。如果消費者未能將其送回家,則必須再試一次。當消費者試圖吸收數據時,數據庫不能阻塞生產者。

的代碼有一個文件鎖,將只允許一個用戶進程。

這裏是代碼:

#include <db_cxx.h> 
#include <sstream> 
#include <fstream> 
#include <vector> 

#include <boost/interprocess/sync/file_lock.hpp> 

class Queue : public DbEnv 
{ 
public: 
    Queue (bool sync) : 
     DbEnv(0), 
     db(0) 
    { 
     set_flags(DB_CDB_ALLDB, 1); 

     if(sync) 
      set_flags(DB_TXN_NOSYNC, 0); 
     else 
      set_flags(DB_TXN_NOSYNC, 1); 

     open("/tmp/db", DB_INIT_LOCK | 
      DB_INIT_LOG | DB_INIT_TXN | DB_INIT_MPOOL | 
      DB_REGISTER | DB_RECOVER | DB_CREATE | DB_THREAD, 
      0); 

     db = new Db(this, 0); 
     db->set_flags(DB_RENUMBER); 
     db->open(NULL, "db", NULL, DB_RECNO, DB_CREATE | DB_AUTO_COMMIT | DB_THREAD, 0); 
    } 
    virtual ~Queue() 
    { 
     db->close(0); 
     delete db; 
     close(0); 
    } 

protected: 
    Db * db; 
}; 

struct Transaction 
{ 
    Transaction() : t(0) { } 

    bool init(DbEnv * dbenv){ 
     try { 
      dbenv->txn_begin(NULL, &t, 0); 
     } catch(DbException e) { 
      std::cerr << "DB What()" << e.what() << std::endl; 
      return false; 
     } catch(std::exception e) { 
      std::cerr << "What()" << e.what() << std::endl; 
      return false; 
     } catch(...) { 
      std::cerr << "Unknown error" << std::endl; 
      return false; 
     } 
     return true; 
    } 

    ~Transaction(){ if(t!=0) t->abort(); } 

    void abort() { t->abort(); t = 0; } 
    void commit() { t->commit(0); t = 0; } 

    DbTxn * t; 
}; 

struct Cursor 
{ 
    Cursor() : c(0) { } 

    bool init(Db * db, DbTxn * t) { 
     try { 
      db->cursor(t, &c, 0); 
     } catch(DbException e) { 
      std::cerr << "DB What()" << e.what() << std::endl; 
      return false; 
     } catch(std::exception e) { 
      std::cerr << "What()" << e.what() << std::endl; 
      return false; 
     } catch(...) { 
      std::cerr << "Unknown error" << std::endl; 
      return false; 
     } 
     return true; 
    } 

    ~Cursor(){ if(c!=0) c->close(); } 
    void close(){ c->close(); c = 0; } 
    Dbc * c; 
}; 

class Enqueue : public Queue 
{ 
public: 
    Enqueue (bool sync) : Queue(sync) { } 
    virtual ~Enqueue() { } 

    bool push(const std::string& s) 
    { 
     int res; 
     Transaction transaction; 

     if(! transaction.init(this)) 
      return false; 

     try { 
      db_recno_t k0[4]; // not sure how mutch data is needs??? 
      k0[0] = 0; 

      Dbt val((void*)s.c_str(), s.length()); 
      Dbt key((void*)&k0, sizeof(k0[0])); 
      key.set_ulen(sizeof(k0)); 
      key.set_flags(DB_DBT_USERMEM); 

      res = db->put(transaction.t, &key, &val, DB_APPEND); 

      if(res == 0) { 
       transaction.commit(); 
       return true; 

      } else { 
       std::cerr << "push failed: " << res << std::endl; 
       return false; 

      } 

     } catch(DbException e) { 
      std::cerr << "DB What()" << e.what() << std::endl; 
      return false; 
     } catch(std::exception e) { 
      std::cerr << "What()" << e.what() << std::endl; 
      return false; 
     } catch(...) { 
      std::cerr << "Unknown error" << std::endl; 
      return false; 
     } 
    } 
}; 

const char * create_file(const char * f){ 
    std::ofstream _f; 
    _f.open(f, std::ios::out); 
    _f.close(); 
    return f; 
} 

class Dequeue : public Queue 
{ 
public: 
    Dequeue (bool sync) : 
     Queue(sync), 
     lock(create_file("/tmp/db-test-pop.lock")), 
     number_of_records_(0) 
    { 
     std::cout << "Trying to get exclusize access to database" << std::endl; 
     lock.lock(); 
    } 

    virtual ~Dequeue() 
    { 
    } 

    bool pop(size_t number_of_records, std::vector<std::string>& records) 
    { 
     if(number_of_records_ != 0) // TODO, warning 
      abort(); 

     Cursor cursor; 
     records.clear(); 

     if(number_of_records_ != 0) 
      abort(); // TODO, warning 

     // Get a cursor 
     try { 
      db->cursor(0, &cursor.c, 0); 
     } catch(DbException e) { 
      std::cerr << "DB What()" << e.what() << std::endl; 
      abort(); 
      return false; 
     } 

     // Read and delete 
     try { 
      Dbt val; 

      db_recno_t k0 = 0; 
      Dbt key((void*)&k0, sizeof(k0)); 

      for(size_t i = 0; i < number_of_records; i ++) { 
       int get_res = cursor.c->get(&key, &val, DB_NEXT); 

       if(get_res == 0) 
        records.push_back(std::string((char *)val.get_data(), val.get_size())); 
       else 
        break; 
      } 

      number_of_records_ = records.size(); 
      if(number_of_records_ == 0) { 
       abort(); 
       return false; 
      } else { 
       return true; 
      } 

     } catch(DbException e) { 
      std::cerr << "DB read/delete What() " << e.what() << std::endl; 
      abort(); 
      return false; 
     } catch(std::exception e) { 
      std::cerr << "DB read/delete What() " << e.what() << std::endl; 
      abort(); 
      return false; 
     } 
    } 

    bool commit() 
    { 
     if(number_of_records_ == 0) 
      return true; 

     Transaction transaction; 
     Cursor  cursor; 

     if(! transaction.init(this)) 
      return false; 

     if(! cursor.init(db, transaction.t)) 
      return false; 

     // Read and delete 
     try { 
      Dbt val; 

      db_recno_t k0 = 0; 
      Dbt key((void*)&k0, sizeof(k0)); 

      for(size_t i = 0; i < number_of_records_; i ++) { 
       int get_res = cursor.c->get(&key, &val, DB_NEXT); 

       if(get_res == 0) 
        cursor.c->del(0); 
       else 
        break; // this is bad! 
      } 

      number_of_records_ = 0; 
      cursor.close(); 
      transaction.commit(); 

      return true; 

     } catch(DbException e) { 
      std::cerr << "DB read/delete What() " << e.what() << std::endl; 
      return false; 
     } catch(std::exception e) { 
      std::cerr << "DB read/delete What() " << e.what() << std::endl; 
      return false; 
     } 
    } 

    void abort() 
    { 
     number_of_records_ = 0; 
    } 

private: 
    boost::interprocess::file_lock lock; 
    size_t number_of_records_; 
    sigset_t orig_mask; 
}; 

請讓我知道,如果你看到任何錯誤,或者瞭解一個更簡單的方法來做到這一點。