2017-10-10 87 views
2

我想重寫Clojure中的Spark結構化流示例。在Clojure中寫Spark結構化流示例時出現錯誤

的例子是用Scala編寫如下:

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

(ns flambo-example.streaming-example 
    (:import [org.apache.spark.sql Encoders SparkSession Dataset Row] 
      [org.apache.spark.sql.functions] 
      )) 

(def spark 
    (-> 
    (SparkSession/builder) 
    (.appName "sample") 
    (.master "local[*]") 
    .getOrCreate) 
) 


(def lines 
    (-> spark 
     .readStream 
     (.format "socket") 
     (.option "host" "localhost") 
     (.option "port" 9999) 
     .load  
    ) 
) 

(def words 
    (-> lines 
     (.as (Encoders/STRING))  
     (.flatMap #(clojure.string/split % #" "))  
    )) 

上述代碼導致以下例外。

;;由java.lang.IllegalArgumentException引發 ;;找不到匹配的方法:flatMap for class ;; org.apache.spark.sql.Dataset

我該如何避免錯誤?

回答

1

您必須按照簽名。 Java的Dataset API提供的Dataset.flatMap兩種實現方式,一是這需要scala.Function1

def flatMap[U](func: (T) ⇒ TraversableOnce[U])(implicit arg0: Encoder[U]): Dataset[U] 

,第二個這需要星火自己o.a.s.api.java.function.FlatMapFunction

def flatMap[U](f: FlatMapFunction[T, U], encoder: Encoder[U]): Dataset[U] 

前者是沒有用處的,但是你應該能夠使用後者。 RDD API flambouses macros to create Spark friendly adapters可以通過flambo.api/fn進行訪問 - 我不確定這些是否可以直接與Datasets一起使用,但如果需要,您應該可以調整它們。

由於您不能依賴隱式Encoders您還必須提供與返回類型相匹配的顯式編碼器。

總體而言,你需要的東西左右:

(def words 
    (-> lines 
    (.as (Encoders/STRING))  
    (.flatMap f e)  
)) 

其中f實現FlatMapFunctioneEncoder。一個示例實現:

(def words 
    (-> lines 
     (.as (Encoders/STRING))  
     (.flatMap 
     (proxy [FlatMapFunction] [] 
      (call [s] (.iterator (clojure.string/split s #" ")))) 
     (Encoders/STRING)))) 

,但我想這是有可能找到一個更好的。

在實踐中,我會避​​免輸入Dataset,並專注於DataFrameDataset[Row])。

相關問題