我有一個pyspark應用程序。我複製了一個配置單元表到我的hdfs目錄,& python我sqlContext.sql
在這個表上的查詢。現在這個變量是我調用rows
的數據幀。我需要隨機洗牌rows
,所以我不得不將它們轉換爲行列表rows_list = rows.collect()
。所以然後我shuffle(rows_list)
洗刷列表到位。我採取隨機行的數量我需要x
:保存行列表到pyspark中的蜂巢表
for r in range(x): allrows2add.append(rows_list[r])
現在我想allrows2add保存爲蜂巢表或追加現有的蜂巢表(較容易做到)。問題是,我不能做到這一點:
all_df = sc.parallelize(allrows2add).toDF()
不能做到這一點,模式無法推斷 ValueError: Some of types cannot be determined by the first 100 rows, please try again with sampling
沒有把整個架構。 rows
的模式有117列,所以我不想輸入它們。有沒有辦法提取rows
的模式來幫助我使allrows2add數據框或以某種方式保存爲配置單元表? 我可以做 rows.printSchema()
但不知道如何把它變成一個模式格式作爲變量傳遞toDF()
而不必解析所有文字
感謝
增加對循環信息
#Table is a List of Rows from small Hive table I loaded using
#query = "SELECT * FROM Table"
#Table = sqlContext.sql(query).collect()
for i in range(len(Table)):
rows = sqlContext.sql(qry)
val1 = Table[i][0]
val2 = Table[i][1]
count = Table[i][2]
x = 100 - count
#hivetemp is a table that I copied from Hive to my hfs using:
#create external table IF NOT EXISTS hive temp LIKE hivetableIwant2copy LOCATION "/user/name/hiveBackup";
#INSERT OVERWRITE TABLE hivetemp SELECT * FROM hivetableIwant2copy;
query = "SELECT * FROM hivetemp WHERE col1<>\""+val1+"\" AND col2 ==\""+val2+"\" ORDER BY RAND() LIMIT "+str(x)
rows = sqlContext.sql(query)
rows = rows.withColumn("col4", lit(10))
rows = rows.withColumn("col5", lit(some_string))
#writing to parquet is heck slow AND I can't work with pandas due to the library not installed on the server
rows.saveAsParquetFile("rows"+str(i)+".parquet")
#tried this before and heck slow also
#rows_list = rows.collect()
#shuffle(rows_list)
非常感謝這項工作。我去訪問'schema'值。我試圖解決的一件事是爲什麼這麼慢(無論是將數據框轉換爲行列表,還是寫入parquet文件或嘗試附加蜂巢表等簡單的東西) - 但這可能與我的系統與api本身。 – KBA
不看你的數據/代碼,我不能確定。你的輸入文件是否分割成分區?如果它是單個分區,則Spark不會並行加載。 –
我只是編輯我上面的原始帖子,以顯示更多信息。我對Spark很新,所以我不是100%確定的,但是我從一個配置單元表(我從配置單元服務器複製到我的hdfs目錄)加載了我的輸入。如果您有任何建議,或者可以將我指向資源(我也在Scala中嘗試了這一點,那麼Scala代碼將會很好) - 那太棒了!非常感謝 – KBA