2017-04-26 36 views
0

我正在學習Spark以便使用它的ML模塊來構建分類器。從MongoDB加載/反序列化矩陣到pySpark,添加新的向量列

我已經成功地將Pandas用於此任務,但是數據量已經增長,現在它們不適合RAM。我也有使用Dask的積極經驗,但其機器學習庫尚未準備好生產。

我的數據存儲在MongoDB中,幷包含小圖像,與cPickle序列化。

這裏是剪斷了他們的創作代碼:

import os 
import numpy as np 
import pymongo 
from bson.binary import Binary 

records = [] 
for file_path in file_paths: 
    for r in file(fn): 
     normalized_image = np.random.rand(120, 40) 
     this_result = {'file_name': os.path.basename(file_path), 
         'normalized_image' : Binary(cPickle.dumps(normalized_image, protocol=2)), 
         # other data 
         } 
     records.append(this_result) 


client = pymongo.MongoClient(MONGO_CREDENTIALS) 
db_name = 'database_name' 
client.drop_database(db_name) 
database = client[db_name] 
collection = database['data_sample'] 
collection.insert_many(records) 

我從ML文檔是分類與向量和矩陣工作,pyspark.ml.linalg模塊定義的理解。

我已經設法使用官方的MongoDB Spark連接器將數據從數據庫讀取到Spark DataFrame。 然而,他們仍然連載:

from pyspark.sql import SparkSession 
import pyspark.ml as ml 

spark = SparkSession \ 
    .builder \ 
    .appName("sparktest") \ 
    .config("spark.mongodb.input.uri", "mongodb://mongo.server/database_name.data_sample") \ 
    .getOrCreate() 
df = spark.read.format("com.mongodb.spark.sql.DefaultSource").load().drop('_id') 
df.printSchema() 


root 
|-- file_name: string (nullable = true) 
|-- normalized_image: binary (nullable = true) 
|-- parea: double (nullable = true) 
|-- sns: double (nullable = true) 
.... 

如何反序列化呢?

我還需要計算這些圖像中的每一個的直方圖並將它們作爲結果數據幀的新列存儲。

回答

0

到現在爲止,我已經來到了以下解決方案

from pyspark.sql.functions import UserDefinedFunction 

def deserialize_calc_histogram(ser_image): 
    return ml.linalg.Vectors.dense(get_histogram(data=cPickle.loads(str(ser_image)), 
           scale_factor=(4, 1.4))) 

histo = UserDefinedFunction(deserialize_calc_histogram, ml.linalg.VectorUDT()) 
encode_boolean = UserDefinedFunction(lambda b: int(b), pyspark.sql.types.IntegerType()) 


df = spark.read.format("com.mongodb.spark.sql.DefaultSource").load().\ 
      select(['bool_label', 'is_train', 'normalized_image']) 

train = df.filter(df.is_train == True).\ 
     withColumn("norm_histogram", histo("normalized_image")).\ 
     withColumn("label", encode_boolean("bool_label")).\ 
     drop('normalized_image') 

df.printSchema() 

輸出

root 
|-- label: boolean (nullable = true) 
|-- is_train: boolean (nullable = true) 
|-- normalized_image: binary (nullable = true)