3

運用spotifySCIO編寫Dataflow一個任務寫入PubSub的流雲存儲,下面的兩個例子e.g1e.g2PubSubGCS,但得到以下錯誤的下面的代碼錯誤使用數據流

錯誤

Exception in thread "main" java.lang.IllegalArgumentException: Write can only be applied to a Bounded PCollection 

代碼

object StreamingPubSub { 
    def main(cmdlineArgs: Array[String]): Unit = { 
// set up example wiring 
val (opts, args) = ScioContext.parseArguments[ExampleOptions](cmdlineArgs) 
val dataflowUtils = new DataflowExampleUtils(opts) 
dataflowUtils.setup() 

val sc = ScioContext(opts) 


sc.pubsubTopic(opts.getPubsubTopic) 
.timestampBy { 
    _ => new Instant(System.currentTimeMillis() - (scala.math.random * RAND_RANGE).toLong) 
    } 
.withFixedWindows((Duration.standardHours(1))) 
.groupBy(_ => Unit) 
.toWindowed 
.toSCollection 
.saveAsTextFile(args("output")) 


val result = sc.close() 

// CTRL-C to cancel the streaming pipeline 
    dataflowUtils.waitToFinish(result.internal) 
    } 
} 

我可能混合了界PCollection窗口的概念,是有辦法做到這一點還是需要應用一些改造要做到這一點,任何人都可以協助這一

回答

3

我相信下面的SCIO的saveAsTextFile使用Dataflow的Write轉換,它只支持有限的PCollections。儘管這是我們正在研究的內容,但數據流並未提供直接API來將無界PCollection編寫到Google雲端存儲。

要在某個地方堅持無限的PCollection,請考慮BigQuery,Datastore或Bigtable等。在SCIO的API中,您可以使用例如saveAsBigQuery

+0

感謝您的快速響應 – DAR