2016-04-27 57 views
3

我有一個pyspark應用程序。我複製了一個配置單元表到我的hdfs目錄,& python我sqlContext.sql在這個表上的查詢。現在這個變量是我調用rows的數據幀。我需要隨機洗牌rows,所以我不得不將它們轉換爲行列表rows_list = rows.collect()。所以然後我shuffle(rows_list)洗刷列表到位。我採取隨機行的數量我需要x保存行列表到pyspark中的蜂巢表

for r in range(x): allrows2add.append(rows_list[r]) 現在我想allrows2add保存爲蜂巢表或追加現有的蜂巢表(較容易做到)。問題是,我不能做到這一點:

all_df = sc.parallelize(allrows2add).toDF()不能做到這一點,模式無法推斷 ValueError: Some of types cannot be determined by the first 100 rows, please try again with sampling

沒有把整個架構。 rows的模式有117列,所以我不想輸入它們。有沒有辦法提取rows的模式來幫助我使allrows2add數據框或以某種方式保存爲配置單元表? 我可以做 rows.printSchema()但不知道如何把它變成一個模式格式作爲變量傳遞toDF()而不必解析所有文字

感謝

增加對循環信息

#Table is a List of Rows from small Hive table I loaded using 
#query = "SELECT * FROM Table" 
#Table = sqlContext.sql(query).collect() 

for i in range(len(Table)): 

    rows = sqlContext.sql(qry) 
    val1 = Table[i][0] 
    val2 = Table[i][1] 
    count = Table[i][2] 
    x = 100 - count 

#hivetemp is a table that I copied from Hive to my hfs using: 
#create external table IF NOT EXISTS hive temp LIKE hivetableIwant2copy LOCATION "/user/name/hiveBackup"; 
#INSERT OVERWRITE TABLE hivetemp SELECT * FROM hivetableIwant2copy; 

    query = "SELECT * FROM hivetemp WHERE col1<>\""+val1+"\" AND col2 ==\""+val2+"\" ORDER BY RAND() LIMIT "+str(x) 

    rows = sqlContext.sql(query) 
    rows = rows.withColumn("col4", lit(10)) 
    rows = rows.withColumn("col5", lit(some_string)) 
#writing to parquet is heck slow AND I can't work with pandas due to the library not installed on the server 
    rows.saveAsParquetFile("rows"+str(i)+".parquet") 
#tried this before and heck slow also 
    #rows_list = rows.collect() 
    #shuffle(rows_list) 

回答

11

當架構不能被推斷時,通常有一個原因。 toDFcreateDataFrame函數的語法糖,默認情況下它僅使用前100行(despite the docs表示僅使用第一行)來確定模式應該是什麼。要改變這種情況,你可以增加採樣率來看看你的數據更大比例:

df = rdd.toDF(sampleRatio=0.2) 
# or... 
df = sqlContext.createDataFrame(rdd, samplingRatio=0.2) 

它也可能是您的隨機樣本碰巧只取空值行對一些特定列。如果是這樣的話,您可以create a schema from scratch像這樣:

from pyspark.sql.types import * 
# all DataFrame rows are StructType 
# can create a new StructType with combinations of StructField 
schema = StructType([ 
    StructField("column_1", StringType(), True), 
    StructField("column_2", IntegerType(), True), 
    # etc. 
]) 
df = sqlContext.createDataFrame(rdd, schema=schema) 

或者,你可以從你通過訪問schema值先前創建的數據框的模式:

schema = df1.schema 
df2 = sqlContext.createDataFrame(rdd, schema=schema) 

請注意,如果您的RDD的行不是StructType(又名)對象而不是字典或列表,您將無法從它們創建數據框。如果你的RDD行是字典,你可以將它們轉換爲這樣的對象:

rdd = rdd.map(lambda x: pyspark.sql.Row(**x)) 
# ** is to unpack the dictionary since the Row constructor 
# only takes keyword arguments 
+0

非常感謝這項工作。我去訪問'schema'值。我試圖解決的一件事是爲什麼這麼慢(無論是將數據框轉換爲行列表,還是寫入parquet文件或嘗試附加蜂巢表等簡單的東西) - 但這可能與我的系統與api本身。 – KBA

+1

不看你的數據/代碼,我不能確定。你的輸入文件是否分割成分區?如果它是單個分區,則Spark不會並行加載。 –

+0

我只是編輯我上面的原始帖子,以顯示更多信息。我對Spark很新,所以我不是100%確定的,但是我從一個配置單元表(我從配置單元服務器複製到我的hdfs目錄)加載了我的輸入。如果您有任何建議,或者可以將我指向資源(我也在Scala中嘗試了這一點,那麼Scala代碼將會很好) - 那太棒了!非常感謝 – KBA