我想按產品價格對產品進行排序。下面是代碼 -如何刪除pyspark中產品價格爲空值的記錄
products = sc.textFile("/user/cloudera/sqoop_import/products")
獲取產品類別ID從第2列
productsMap = products.map(lambda rec: (rec.split(",")[1], rec))
集團從字符串類型轉換產品價格線按類別ID
productsGroupBy = productsMap.groupByKey()
按產品價格排序浮動:
for i in productsGroupBy.map(lambda rec: sorted(rec[1], key=lambda k: float(k.split(",")[4]))).collect(): print(i)
我無法爲具有空值的產品價格輸入幾個值。那麼,有沒有辦法刪除這個特定字段的空值的記錄。請找出錯誤日誌如下─
17/05/28 00:48:25 ERROR Executor: Exception in task 2.0 in stage 1.0 (TID 6) org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main process() File "/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream vs = list(itertools.islice(iterator, batch)) File "", line 2, in File "", line 2, in ValueError: could not convert string to float:
你試過像'productsGroupBy = productsMap.filter(拉姆達行: yourProductPriceColumn!=「Null」)。groupByKey()'在進入* for循環之前*? (否則,此過濾器也可以與* productsGroupBy *一起使用)。由於您已經導入了字符串值,因此使用此過濾功能,您將擁有RDD,而無需productPrice爲「Null」的行,因此您可以將字符串轉換爲float。 – titiro89
非常感謝@ titiro89 !!我運行了下面的代碼並得到了下面的結果:'for i in productsMap.filter(lambda line:line.split(「,」)[4] ==「」).collect():print(i)' 。 .. ** 685,31,TaylorMade SLDR鐵桿 - (鋼)4-PW,AW ,, 899.99,http://images.acmesports.sports/TaylorMade+SLDR+Irons+-+%28Steel%29+4-PW %2C + AW ** –
此處,product_price已移至第5個索引,而不是第4個索引。所以,這樣的記錄應該被跳過! –