2016-11-22 166 views
4

亞馬遜已經宣佈他們的new FIFO SQS service,我想在Laravel隊列中使用它來解決一些併發問題。如何在Laravel隊列中容納Amazon FIFO SQS?

我創建了幾個新隊列並更改了配置。但是,我有一個MissingParameter錯誤,說

The request must contain the parameter MessageGroupId. 

所以我修改了文件vendor/laravel/framework/src/Illuminate/Queue/SqsQueue.php

public function pushRaw($payload, $queue = null, array $options = []) 
{ 
    $response = $this->sqs->sendMessage(['QueueUrl' => $this->getQueue($queue), 'MessageBody' => $payload, 
     'MessageGroupId' => env('APP_ENV', getenv('APP_ENV'))]); 

    return $response->get('MessageId'); 
} 

public function later($delay, $job, $data = '', $queue = null) 
{ 
    $payload = $this->createPayload($job, $data); 

    $delay = $this->getSeconds($delay); 

    return $this->sqs->sendMessage([ 
     'QueueUrl' => $this->getQueue($queue), 'MessageBody' => $payload, 'DelaySeconds' => $delay, 
     'MessageGroupId' => env('APP_ENV', getenv('APP_ENV')) 
    ])->get('MessageId'); 
} 

我使用APP_ENV作爲組ID(這是一個消息隊列所以實際上它不很重要,我只希望一切都是先進先出)。

但我仍然收到相同的錯誤消息。我怎麼修復它?任何幫助,將不勝感激。

(順便說一句,這裏擁有SDK定義sendMessage?我可以爲它找到一個存根,但我沒有找到具體的實現)

回答

8

我想指出其他人可能會遇到同樣的問題,雖然編輯SqsQueue.php的作品,它很容易被重置composer installcomposer update。另一種方法是爲SQS FIFO實施新的Illuminate\Queue\Connectors\ConnectorInterface,然後將其添加到Laravel的隊列管理器中。

我的做法如下:

  1. 創建一個新的SqsFifoQueue類,它擴展Illuminate\Queue\SqsQueue但支持SQS FIFO。
  2. 創建一個新的SqsFifoConnector類擴展Illuminate\Queue\Connectors\SqsConnector,該類將使用SqsFifoQueue建立連接。
  3. 創建一個新的SqsFifoServiceProvider,將SqsFifoConnector註冊到Laravel的隊列管理器。
  4. SqsFifoServiceProvider加到您的config/app.php
  5. 更新config/queue.php使用新的SQS FIFO隊列驅動程序。

實施例:

  1. 創建一個新的類SqsFifoQueue延伸Illuminate\Queue\SqsQueue但支持SQS FIFO。

    <?php 
    
    class SqsFifoQueue extends \Illuminate\Queue\SqsQueue 
    { 
        public function pushRaw($payload, $queue = null, array $options = []) 
        { 
         $response = $this->sqs->sendMessage([ 
          'QueueUrl' => $this->getQueue($queue), 
          'MessageBody' => $payload, 
          'MessageGroupId' => uniqid(), 
          'MessageDeduplicationId' => uniqid(), 
         ]); 
    
         return $response->get('MessageId'); 
        } 
    } 
    
  2. 創建一個新的SqsFifoConnector類,它擴展Illuminate\Queue\Connectors\SqsConnector會建立使用SqsFifoQueue連接。

    <?php 
    
    use Aws\Sqs\SqsClient; 
    use Illuminate\Support\Arr; 
    
    class SqsFifoConnector extends \Illuminate\Queue\Connectors\SqsConnector 
    { 
        public function connect(array $config) 
        { 
         $config = $this->getDefaultConfiguration($config); 
    
         if ($config['key'] && $config['secret']) { 
          $config['credentials'] = Arr::only($config, ['key', 'secret']); 
         } 
    
         return new SqsFifoQueue(
          new SqsClient($config), $config['queue'], Arr::get($config, 'prefix', '') 
         ); 
        } 
    } 
    
  3. 創建一個新的SqsFifoServiceProvider是註冊SqsFifoConnector到Laravel的隊列管理器。

    <?php 
    
    class SqsFifoServiceProvider extends \Illuminate\Support\ServiceProvider 
    { 
        public function register() 
        { 
         $this->app->afterResolving('queue', function ($manager) { 
          $manager->addConnector('sqsfifo', function() { 
           return new SqsFifoConnector; 
          }); 
         }); 
        } 
    } 
    
  4. 添加SqsFifoServiceProviderconfig/app.php

    <?php 
    
    return [ 
        'providers'  => [ 
         ... 
         SqsFifoServiceProvider::class, 
        ], 
    ]; 
    
  5. 更新config/queue.php使用新的SQS FIFO隊列驅動程序。

    <?php 
    
    return [ 
    
        'default' => 'sqsfifo', 
    
        'connections' => [ 
         'sqsfifo' => [ 
          'driver' => 'sqsfifo', 
          'key' => 'my_key' 
          'secret' => 'my_secret', 
          'queue' => 'my_queue_url', 
          'region' => 'my_sqs_region', 
         ], 
        ], 
    ]; 
    

那麼你的隊列現在應該支持SQS FIFO隊列。

無恥插頭:在上面的步驟上工作時,我創建了一個laravel-sqs-fifo composer包來處理這個問題,在https://github.com/maqe/laravel-sqs-fifo

+1

我不得不在我的前綴中加入'config/queue.php'中的'sqsfifo'連接,但除此之外,這個工作完美無瑕! –

0
MessageGroupId

除此之外,它需要一個MessageDeduplicationId或啓用基於內容的重複數據刪除。