2014-02-05 34 views
2

我目前正在測試gearman來處理來自web前端的並行任務/請求,gearman cleint通過Ajax接收帶post params的請求,然後創建任務並將它們發送給gearman worker。 21個工作進程實例正在運行,以同時處理來自不同客戶端的多個請求。一次只能有一個客戶端請求工作正常,但當多個客戶端同時請求時,客戶端將得到錯誤的結果以獲取他們請求的信息。 例如,如果客戶端A爲customer_id 123請求的信息,並且客戶端B爲customer_id 456請求了信息並且兩個請求同時觸發,則客戶端A將得到客戶端B的結果,並且客戶端B將得到客戶端A的結果。我嘗試將不同工作人員的職能分開,但存在同樣的問題。請幫我在我的代碼上找到問題。gearman php並行任務對錯誤請求的響應

我使用CURL-multi沒有問題(here),但最近決定嘗試和測試gearman,看看我能否獲得更多的性能。

這裏是我的客戶和員工代碼:

客戶端代碼:

<?php 

header("Expires: Mon, 26 Jul 1997 05:00:00 GMT"); // Date in the past 
header("Cache-Control: no-store, no-cache, must-revalidate"); // HTTP/1.1 
header("Cache-Control: post-check=0, pre-check=0", false); 
header("Pragma: no-cache"); 


$order_id = $_POST['order_id']; 
$customer_id = $_POST['customer_id']; 

//some validation code here 


class DataCollector extends GearmanClient{ 
    private $data  = array(); 
    private $tmpArr = array(); 

    function addData($content){ 
     if($content){ 
      $this->tmpArr = json_decode($content, true); 
      $this->data  = array_merge($this->tmpArr, $this->data); 
      //$this->data[] = json_decode($content, true); 
     } 
    } 
    function getData(){ 
     return $this->data; 
    } 
    function outputData(){ 
     echo json_encode($this->getData()); 
    } 

    function taskCompleted($task){ 

     $this->addData($task->data()); 

    } 

} 


$collector = new DataCollector(); 

$collector->addServer(); 

# set a function to be called when the work is complete 
$collector->setCompleteCallback(array($collector, "taskCompleted")); 


//params to pass to worker 
$queryStr = array(
     "order_id" => $order_id, 
     "customer_id" => $customer_id 
); 

$postData = serialize($queryStr); 

# add tasks to be executed in parallel in Gearman server 
$collector->addTask("getdata_orderDetails", $postData, null, "1"); 
$collector->addTask("getdata_customerDetails", $postData, null, "2"); 

# run the tasks in parallel 
$collector->runTasks(); 


# output the data 
$collector->outputData(); 

?> 

工人代碼:

<?php 

class Worker{ 
    private $worker; 
    static $conn; 

public function __construct(){ 

    try{ 
     self::$conn = oci_connect($user, $pass, $db); // create db connection 
    } 
    catch (Exception $e) { 
     echo "ERROR: " . $e->getMessage(); 
    } 

    $this->worker = new GearmanWorker(); 
    $this->worker->addServer(); 

    # Register functions 
    $this->worker->addFunction("getdata_orderDetails", array($this, "getdata_orderDetails_fn")); 
    $this->worker->addFunction("getdata_customerDetails", array($this, "getdata_customerDetails_fn")); 
} 

public function run(){ 

    while (1) { 
     //print "Waiting for job...\n"; 
     $this->worker->work(); 
     if ($this->worker->returnCode() != GEARMAN_SUCCESS) { 
     echo "return_code: " . $this->worker->returnCode() . "\n"; 
     break; 
     } 
    } 



} 


static function getdata_orderDetails_fn($job){ 


     if(!self::$conn){ 
     $responseArr = array(
      'response_status'   => -1,   //failed 
      'response_message'   => 'Database connection lost', 
      'response_id'    => 'DatabaseConnectionErr' 
      ); 

     return json_encode($responseArr); 
     } 

    $postData = unserialize($job->workload()); 

    $order_id = $postData['order_id']; 
    $customer_id = $postData['customer_id']; 

    $sql = "select order_id, order_status, create_date from customer_order where order_id= :order_id"; 
    $stmt = oci_parse(self::$conn, $sql); 
    oci_bind_by_name($stmt, ":order_id", $order_id, -1); 
    oci_execute($stmt); 
    oci_fetch($stmt); 
    $order_id = oci_result($stmt, 'ORDER_ID'); 
    $order_status = oci_result($stmt, 'ORDER_STATUS'); 
    $create_date = oci_result($stmt, 'CREATE_DATE'); 
    oci_free_statement($stmt); 


    $responseArr = array(
     'response_status' => 1, 
     'response_message' => 'success', 
     'response_id'  => 'order_details', 
     'order_id'   => $order_id, 
     'order_status'  => $order_status, 
     'create_date'   => $create_date 
    ); 

    // send result 
    return json_encode($responseArr); 
} 

static function getdata_customerDetails_fn($job){ 


     if(!self::$conn){ 
     $responseArr = array(
      'response_status' => -1,   //failed 
      'response_message' => 'Database connection lost', 
      'response_id'  => 'DatabaseConnectionErr' 
      ); 

     return json_encode($responseArr); 
     } 

    $postData = unserialize($job->workload()); 

    $order_id = $postData['order_id']; 
    $customer_id = $postData['customer_id']; 

    $sql = "select customer_id, customer_fname, customer_lname, customer_address, customer_contact where customer_id= :customer_id"; 
    $stmt = oci_parse(self::$conn, $sql); 
    oci_bind_by_name($stmt, ":customer_id", $customer_id, -1); 
    oci_execute($stmt); 
    oci_fetch($stmt); 
    $customer_id  = oci_result($stmt, 'CUSTOMER_ID'); 
    $customer_fname  = oci_result($stmt, 'CUSTOMER_FNAME'); 
    $customer_lname  = oci_result($stmt, 'CUSTOMER_LNAME'); 
    $customer_address = oci_result($stmt, 'CUSTOMER_ADDRESS'); 
    $customer_contact = oci_result($stmt, 'CUSTOMER_CONTACT'); 
    oci_free_statement($stmt); 

    $responseArr = array(
     'response_id'  => 'customer_details', 
     'response_status' => 1, 
     'response_message' => 'success', 
     'customer_id'  => $customer_id, 
     'customer_fname' => $customer_fname, 
     'customer_lname' => $customer_lname, 
     'customer_address' => $customer_address, 
     'customer_contact' => $customer_contact 
    ); 

    // send result 
    return json_encode($responseArr); 

} 

}//class 


//start worker 
    $worker = new Worker(); 
    $worker->run(); 


?> 

回答

0

我從Gearman的羣體得到幫助後解決了問題通過使每個任務基於$ postData而是唯一的。 這樣,如果兩個或更多的客戶端請求相同的信息(例如order_id或customer_id),gearman會排隊併發請求具有相同的唯一任務ID,並將回覆所有客戶端具有相同的結果(即一個請求將被處理):

$collector->addTask("getdata_orderDetails", $postData, null, md5($order_id); 
$collector->addTask("getdata_customerDetails", $postData, null, md5($customer_id); 
1

我認爲問題是,你的任務ID不是唯一。嘗試唯一的ID分配給每個任務,或者根本不指定任務ID: https://groups.google.com/forum/m/#!topic/gearman/q3EV7mvHKDs

我的情況的解決方案是:

$collector->addTask("getdata_orderDetails", $postData); 
$collector->addTask("getdata_customerDetails", $postData); 

# run the tasks in parallel 
$collector->runTasks(); 
+0

謝謝,你是賴特。我在這裏得到了同樣的答案:https://groups.google.com/forum/m/#!topic/gearman/q3EV7mvHKDs –

+0

你的答案也是正確的,但對於我的情況,最好是根據$生成任務唯一標識符postData,請參閱下面的答案。 –