2017-10-07 83 views
0

我想調用的RxJS方式AWS lambda函數:如何調用AWS LAMBDA與Observable.bindNodeCallback(lambda.invoke)

invokeLambda(): Observable<string> { 
    const lambda = new AWS.Lambda({region: environment.region, apiVersion: "2015-03-31"}); 
    const invoke$ = (functionName, payload, invocationType = "RequestResponse") => { 
    return Observable.bindNodeCallback(lambda.invoke)(); 
}; 

return invoke$(environment.functionName, {}).map((result: InvocationResponse) => JSON.parse(<string>result.Payload)); 

}

但是當我嘗試:

this.myService.invokeLambda().subscribe(() => { dosomething(); } 

我得到的錯誤:

page.html:5 ERROR TypeError: this.makeRequest is not a function 
at svc.(anonymous function) 

我做錯了什麼?

+2

似乎上下文不綁定,你試過'bindNodeCallback(lambda.invoke.bind(lambda))...'? –

+0

像一個迷人的工作。謝謝 –

回答

0

我建議創建一個門面以將http請求分發到作爲AWS Lambda代理的Amazon API網關。您可以將AWS SDK事件處理程序包裝爲可觀察值。在以下示例中,我使用rxjs搜索身份池名稱,並確定名稱是否存在於我的AWS賬戶內的Amazon Cognito聯合身份身份池列表中。我將Lambda函數與AWS Step Functions集成以增強rxjs「下一個」通道功能。 Rxjs,AWS Lambda和AWS Step Functions重試任務狀態定義允許我通過分配cognitoidentity.listIdentityPools名稱來「繼續」Amazon Cognito聯合身份標識池名稱存在搜索。 Step Functions在使用rxjs時可以遞歸執行lambda函數。通過將第一次執行lambda函數返回的分頁標記作爲參數返回到下一個通過step函數狀態遞歸執行lambda函數。即

exports.handler = (event, context, callback) => { 

    AWS.config.region = "..."; 

    AWS.config.apiVersions = { 
    cognitoidentity: '2014-06-30' 
    }; 

    var cognitoidentity = new AWS.CognitoIdentity(); 

    var params = { 
    MaxResults: 1, 
    get NextToken() { 
     if (event.NextToken == null || undefined) { 
     return null; 
     } else { 
     return event.NextToken; 
     } 
    } 
    }; 

    var eventResult = { 
    identityPoolName: '', 
    identityPoolId: '', 
    NextToken: '', 
    NextState: '', 
    error: '', 
    errorReason: '' 
    }; 

function listIdentityPoolsObservable(params) { 
    return Rx.Observable.create(observer => { 

    cognitoidentity.listIdentityPools(params) 
     .on('success', function(response) { 
     observer.next(response.data); 
     }) 
     .on('error', function(error, response) { 
     observer.error(error); 
     }) 
     .on('complete', function(response) { 
     if (response.error) { 
      observer.error(response.error) 
     } else { 
      observer.complete(response); 
     } 
     }).send(); 
    }); 
    }; 

    const source$ = listIdentityPoolsObservable(params) 
    .share() 
    .observeOn(Rx.Scheduler.asap); 

    const identityPoolsSource$ = source$.map(x => { 
    return x.IdentityPools; 
    }) 
    .flatMap(x => { 
    return x; 
    }) 
    .filter(x => x.IdentityPoolName === event.identityPoolName) 
    .map(x => { 
    if (x.IdentityPoolName === event.identityPoolName) { 
     var dataArr = [x.IdentityPoolName, x.IdentityPoolId]; 
     return dataArr; 
    } 
    }) 
    .defaultIfEmpty(false); 

    const nextTokenSource$ = source$ 
    .filter(x => x.NextToken != null || undefined) 
    .map(x => { 
     if (x.NextToken != null || undefined) { 
     return x.NextToken; 
     } 
    }) 
    .defaultIfEmpty(false); 

    var identityAndToken = Rx.Observable 
    .forkJoin(identityPoolsSource$, nextTokenSource$) 
    .subscribe(x => { 
     //conditional statements... 
     callback(null, eventResult); 
    }); 

    function ExceptionExistence(eventResult) { 
    this.name = eventResult.errorName; 
    this.errorReason = eventResult.errorReason; 
    }; 
    ExceptionExistence.prototype = new Error(); 

};