2013-04-02 54 views
6

任何人都可以提供一個我如何在Node.js中實現MySQL事務的例子。我試圖讓我的頭部使用node-mysql驅動程序和node-mysql-queue。Node.js mysql事務

據我所知,使用node-mysql-queue大大降低了Node.js的異步性質,因爲新的查詢必須等到現有的查詢完成。爲了解決這個問題,有人試圖將node-mysql-queue和node-mysql的連接池功能結合起來。即爲每個新的http請求啓動一個新的mysql連接,並在各個連接上啓動事務隊列?

回答

7

下面的交易例子添加到文檔一個月前:

https://github.com/felixge/node-mysql#transactions

connection.beginTransaction(function(err) { 
    if (err) { throw err; } 
    connection.query('INSERT INTO posts SET title=?', title, function(err, result) { 
    if (err) { 
     connection.rollback(function() { 
     throw err; 
     }); 
    } 

    var log = 'Post ' + result.insertId + ' added'; 

    connection.query('INSERT INTO log SET data=?', log, function(err, result) { 
     if (err) { 
     connection.rollback(function() { 
      throw err; 
     }); 
     } 
     connection.commit(function(err) { 
     if (err) { 
      connection.rollback(function() { 
      throw err; 
      }); 
     } 
     console.log('success!'); 
     }); 
    }); 
    }); 
}); 
+3

這並沒有真正解決問題的異步。連接應該專門用於交易,直到交易完成。 – bvs

0

我想出了使用遞歸功能的解決方案。

var sql = 'INSERT INTO logs SET data = ?'; 

// array of rows to insert 
var rows = [[/*first row*/], [/*additional row*/]]; 

connection.beginTransaction(function (err) { 

    if (err) { 
     throw err; 
    } 

    var insertEachRow = function() { 

     var row = rows.shift(); 

     if (! row) { 
      // Done, now commit 
      return noMoreRows(); 
     } 

     connection.query(sql, row, function (err, result) { 
      if (err) { 
       connection.rollback(function() { 
        throw err; 
       }); 
      } 

      insertEachRow(); 
     }); 
    }; 

    var noMoreRows = function() { 
     connection.commit(function (err) { 
      if (err) { 
       connection.rollback(function() { 
        throw err; 
       }); 
      } 
      console.log('success!'); 
     }); 
    }; 

    insertEachRow(); 
}); 
1

我正在使用以下方法。我的模型中有一個添加函數,用於執行數據庫操作。

add : function (data, callback) { 

    //Begin transaction 
    connection.beginTransaction(function(err) { 
     if (err) { 
      throw err; 
     } 

     var user_query = "INSERT INTO `calldata`.`users` (`username`, `password`, `enabled`, `accountNonExpired`, `accountNonLocked`, `credentialsNonExpired`) VALUES ('" + data.mobile + "', '" + sha1(data.password) + "', '1', '1', '1', '1')"; 
     connection.query(user_query, function(err, results) { 
      if (err) { 
       return connection.rollback(function() { 
        throw err; 
       }); 
      } 

      var accnt_dtls_query = "INSERT INTO `calldata`.`accnt_dtls` (`req_mob_nmbr`, `usr_nme`, `dvce_id`, `mngr_id`, `cmpny_id`, `actve_flg`, `crtd_on`, `usr`) VALUES (" + data.mobile + ", '" + data.name + "', '', " + data.managerId + ", " + data.companyId + ", 1, now(), '" + data.mobile+ "')"; 

      connection.query(accnt_dtls_query, function(err, results) { 
       if (err) { 
        return connection.rollback(function() { 
         throw err; 
        }); 
       } 
       var user_role_query = "INSERT INTO `calldata`.`user_roles` (`username`, `ROLE`) VALUES ('" + data.mobile + "', '" + data.role + "')"; 

       connection.query(user_role_query, function(err, result) { 
        if (err) { 
         return connection.rollback(function() { 
          throw err; 
         }); 
        } 

        //add an entry to manager table 
        var mngr_dtls_query = "INSERT INTO `calldata`.`mngr_dtls` (`mngr_nm`, `cmpny_id`, `crtd_on`, `usr_nm`, `eml_id`) VALUES ('" + data.name + "'," + data.companyId + " , now(), '" + data.mobile + "', '" + data.mobile + "')"; 
        connection.query(mngr_dtls_query, function(err, result) { 
         if (err) { 
          return connection.rollback(function() { 
           throw err; 
          }); 
         } 
         console.log('Changed ' + result.changedRows + ' results'); 
         connection.commit(function (err) { 
          console.log('Commiting transaction.....'); 
          if (err) { 
           return connection.rollback(function() { 
            throw err; 
           }); 
          } 

          console.log('Transaction Complete.'); 
          connection.end(); 
          callback(null, result); 
         }); 
        }); 
       }); 
      }); 
     }); 
    }); 
    //transaction ends here 
} 

,並從控制器調用:

agentAccountModel.add(data, function(err, results) { 
       if(err) 
       { 
        res.status(500); 
        res.json({ 
         "status": 500, 
         "message": err 
        }); 
       } 

       res.status(200); 
       res.json({ 
        "status": 200, 
        "message": "Saved successfully" 

       }); 
      }); 
1

我花了一些時間寫的節點的MySQL提供的交易實例的通用版本,所以我想我會在這裏分享。我使用Bluebird作爲我的承諾庫,並使用它來'promisify'連接對象,它簡化了異步邏輯。

const Promise = ('bluebird'); 
const mysql = ('mysql'); 

/** 
* Run multiple queries on the database using a transaction. A list of SQL queries 
* should be provided, along with a list of values to inject into the queries. 
* @param {array} queries  An array of mysql queries. These can contain `?`s 
*        which will be replaced with values in `queryValues`. 
* @param {array} queryValues An array of arrays that is the same length as `queries`. 
*        Each array in `queryValues` should contain values to 
*        replace the `?`s in the corresponding query in `queries`. 
*        If a query has no `?`s, an empty array should be provided. 
* @return {Promise}   A Promise that is fulfilled with an array of the 
*        results of the passed in queries. The results in the 
*        returned array are at respective positions to the 
*        provided queries. 
*/ 
function transaction(queries, queryValues) { 
    const connection = mysql.createConnection(databaseConfigs); 
    Promise.promisifyAll(connection); 
    return connection.connectAsync() 
    .then(connection.beginTransactionAsync()) 
    .then(() => { 
     const queryPromises = []; 

     queries.forEach((query, index) => { 
      queryPromises.push(connection.queryAsync(query, queryValues[index])); 
     }); 
     return Promise.all(queryPromises); 
    }) 
    .then(results => { 
     return connection.commitAsync() 
     .then(connection.endAsync()) 
     .then(() => { 
      return results; 
     }); 
    }) 
    .catch(err => { 
     return connection.rollbackAsync() 
     .then(connection.endAsync()) 
     .then(() => { 
      return Promise.reject(err); 
     }); 
    }); 
} 

如果你想爲你的問題建議使用池,您可以很容易地myPool.getConnection(...)切換createConnection線,並與connection.release()切換connection.end線。

0

我發現了一個有用的鏈接,它使用節點js mysql pooling與事務。數據庫連接池總是有用的。我們可以檢查此鏈接

https://github.com/mysqljs/mysql