2016-03-13 83 views
0

只是一個快速問題的傢伙。隨着熊貓,我們可以按如下方式創建一個數據幀,並設置一個標題:如何用PySparkSQL設置列標題?

import pandas as pd 
df = pd.read_csv('/file/path', sep='|', names = ['A','B']) 

與PySpark:

text_file = sc.textFile('path/file') 

在另一方面,儘管我都準備好讀的Spark SQL文件我沒找到如何設置標題和分隔符,或將數據集的每一列的名稱設置爲熊貓。有關如何使用PySparkSQL爲每列添加名稱的想法?

更新:

從@CafeFeed我試過如下:

from pyspark.sql import SQLContext 
sqlContext = SQLContext(sc) 

df_2 = sqlContext.read.format('com.databricks.spark.csv').options(header='false', delimiter='|').load('path') 
df_2 

不過,我得到這個異常:提前傢伙

--------------------------------------------------------------------------- 
Py4JJavaError        Traceback (most recent call last) 
<ipython-input-31-ad726583541b> in <module>() 
     2 sqlContext = SQLContext(sc) 
     3 
----> 4 df_2 = sqlContext.read.format('com.databricks.spark.csv').options(header='false', delimiter='|').load('/Users/user/GitHub/PySpark-Notes/ml-100k/u.user') 
     5 df_2 

/usr/local/Cellar/apache-spark/1.5.1/libexec/python/pyspark/sql/readwriter.pyc in load(self, path, format, schema, **options) 
    119   self.options(**options) 
    120   if path is not None: 
--> 121    return self._df(self._jreader.load(path)) 
    122   else: 
    123    return self._df(self._jreader.load()) 

/usr/local/Cellar/apache-spark/1.5.1/libexec/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args) 
    536   answer = self.gateway_client.send_command(command) 
    537   return_value = get_return_value(answer, self.gateway_client, 
--> 538     self.target_id, self.name) 
    539 
    540   for temp_arg in temp_args: 

/usr/local/Cellar/apache-spark/1.5.1/libexec/python/pyspark/sql/utils.pyc in deco(*a, **kw) 
    34  def deco(*a, **kw): 
    35   try: 
---> 36    return f(*a, **kw) 
    37   except py4j.protocol.Py4JJavaError as e: 
    38    s = e.java_exception.toString() 

/usr/local/Cellar/apache-spark/1.5.1/libexec/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 
    298     raise Py4JJavaError(
    299      'An error occurred while calling {0}{1}{2}.\n'. 
--> 300      format(target_id, '.', name), value) 
    301    else: 
    302     raise Py4JError(

Py4JJavaError: An error occurred while calling o67.load. 
: java.lang.ClassNotFoundException: Failed to load class for data source: com.databricks.spark.csv. 
    at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.lookupDataSource(ResolvedDataSource.scala:67) 
    at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:87) 
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:114) 
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:104) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:497) 
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) 
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) 
    at py4j.Gateway.invoke(Gateway.java:259) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:207) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: java.lang.ClassNotFoundException: com.databricks.spark.csv.DefaultSource 
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357) 
    at org.apache.spark.sql.execution.datasources.ResolvedDataSource$$anonfun$4$$anonfun$apply$1.apply(ResolvedDataSource.scala:60) 
    at org.apache.spark.sql.execution.datasources.ResolvedDataSource$$anonfun$4$$anonfun$apply$1.apply(ResolvedDataSource.scala:60) 
    at scala.util.Try$.apply(Try.scala:161) 
    at org.apache.spark.sql.execution.datasources.ResolvedDataSource$$anonfun$4.apply(ResolvedDataSource.scala:60) 
    at org.apache.spark.sql.execution.datasources.ResolvedDataSource$$anonfun$4.apply(ResolvedDataSource.scala:60) 
    at scala.util.Try.orElse(Try.scala:82) 
    at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.lookupDataSource(ResolvedDataSource.scala:60) 
    ... 14 more 

感謝。

+1

TL;博士:使用封裝選項,當你形成鮮明pyspark:'--packages com.databricks:火花csv_2.10:1.4.0' –

+0

@TristanReid感謝您的幫助! – tumbleweed

回答

1

隨着Spark CSV你閱讀的文本文件,並設置隔離帶delimiter選項:

df = sqlContext.read \ 
    .format('com.databricks.spark.csv') \ 
    .options(header='false', delimiter='|') \ 
    .load(path) 

架構/名稱可以使用schema方法進行設置:

sqlContext.read.schema(schema) 

其中架構是一個StructType

schema = StructType([ 
    StructField("A", StringType(), True), StructField("B", StringType(), True)]) 

或通過致電toDF

df.toDF(['A','B'])