2016-09-12 62 views
0

我有一個有兩列的數據框:idvalue。我想根據另一張地圖更新值。org.apache.thrift.transport.TTransportException在更新時創建新的RDD

df.collect.foreach({ 
    df[value] = if (df[id] != 'unknown') mapper.value(df[id]) else df[value] 
    }) 

這是正確的使用方法嗎?

我嘗試:

import com.mapping.data.model.MappingUtils 
import com.mapping.data.model.CountryInfo 


val mappingPath = "s3://.../"  
val input = sc.textFile(mappingPath) 

輸入是jsons的列表,其中每一行是JSON其中我映射到POJO類CountryInfo使用MappingUtils它負責JSON解析和轉換:

val MappingsList = input.map(x=> { 
        val countryInfo = MappingUtils.getCountryInfoString(x); 
        (countryInfo.getItemId(), countryInfo) 
       }).collectAsMap 

MappingsList: scala.collection.Map[String,com.mapping.data.model.CountryInfo] 


def showCountryInfo(x: Option[CountryInfo]) = x match { 
     case Some(s) => s 
    } 


val events = sqlContext.sql("select itemId EventList") 

val itemList = events.map(row => { 
    val itemId = row.getAs[String](1); 
    val çountryInfo = showTitleInfo(MappingsList.get(itemId)); 
    val country = if (countryInfo.getCountry() == 'unknown)' "US" else countryInfo.getCountry() 
    val language = countryInfo.getLanguage() 

    Row(itemId, country, language) 
     }) 

但我不斷收到此錯誤:

org.apache.thrift.transport.TTransportException at 
org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132) 
at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84) at 

org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:362) at 
org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:284) at 

org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:191) at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69) at org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService$Client.recv_interpret(RemoteInterpreterService.java:220) at org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService$Client.interpret(RemoteInterpreterService.java:205) at org.apache.zeppelin.interpreter.remote.RemoteInterpreter.interpret(RemoteInterpreter.java:211) at 

org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:93) at org.apache.zeppelin.notebook.Paragraph.jobRun(Paragraph.java:207) at org.apache.zeppelin.scheduler.Job.run(Job.java:170) at org.apache.zeppelin.scheduler.RemoteScheduler$JobRunner.run(RemoteScheduler.java:304) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 

我使用的Spark 1.6

+0

你應該解釋一點! –

+0

@AlbertoBonsanto我想更新基於映射器的列'值' – Swetha

+1

我不明白,什麼映射器?什麼是ID的使用?什麼是DF的模式? –

回答

1

你的問題有點模棱兩可。

不要收集不必要的大型RDD。

當在RDD上發出收集操作時,數據集將被複制到驅動程序 ,即主節點。如果 數據集太大而不適合內存,將拋出內存異常; take或takeSample可以是 ,用於僅檢索元素的上限數量。

你是collect方法做的方式是不正確的(如果是的大數據幀它可能會導致OOM)..

1)要更新任何列或添加新的列,您可以使用withColumn

DataFrame withColumn(java.lang.String colName, Column col) 
Returns a new DataFrame by adding a column or replacing the existing column that has the same name. 

2)查看基於另一個數據結構的條件..

可以使用when otherwise語法像下面

Apache Spark, add an "CASE WHEN ... ELSE ..." calculated column to an existing DataFrame example

import org.apache.spark.sql.functions._ 
val sqlcont = new org.apache.spark.sql.SQLContext(sc) 
val df1 = sqlcont.jsonRDD(sc.parallelize(Array(
     """{"year":2012, "make": "Tesla", "model": "S", "comment": "No Comment", "blank": ""}""", 
     """{"year":1997, "make": "Ford", "model": "E350", "comment": "Get one", "blank": ""}""", 
     """{"year":2015, "make": "Chevy", "model": "Volt", "comment": "", "blank": ""}""" 
    ))) 

val makeSIfTesla = udf {(make: String) => 
    if(make == "Tesla") "S" else make 
} 
df1.withColumn("make", makeSIfTesla(df1("make"))).show 

以上也可以達到這樣的..

val rdd = sc.parallelize(
     List((2012,"Tesla","S"), (1997,"Ford","E350"), (2015,"Chevy","Volt")) 
) 
    val sqlContext = new SQLContext(sc) 

    // this is used to implicitly convert an RDD to a DataFrame. 
    import sqlContext.implicits._ 

    val dataframe = rdd.toDF() 

    dataframe.foreach(println) 

dataframe.map(row => { 
    val row1 = row.getAs[String](1) 
    val make = if (row1.toLowerCase == "tesla") "S" else row1 
    Row(row(0),make,row(2)) 
    }).collect().foreach(println) 

//[2012,S,S] 
//[1997,Ford,E350] 
//[2015,Chevy,Volt] 
+0

它適用於小型查詢,但當我複雜時,我總是收到錯誤 – Swetha

+0

您的意思是說使用'collect'你會遇到大數據錯誤? –

+0

更新了錯誤的問題 – Swetha