-1
我正在使用sparksql數據框。映射數組並保留原始格式
df = sql.read.parquet("toy_data")
df.show()
+-----------+----------+
| x| y|
+-----------+----------+
| -4.5707927| -5.282721|
| -5.762503| -4.832158|
| 7.907721| 6.793022|
| 7.4408655| -6.601918|
| -4.2428184| -4.162871|
我有元組的列表的結構如下:
(行(X = -8.45811653137207,Y = -5.179722309112549),((-1819.748514533043,47.745243303477764),333))
第一個ele是一個點,第二個ele是一個(sum_of_points,number_of_points)元組。
當我由NUM_OF_POINTS個劃分sum_of_points,像這樣:
new_centers = center_sum_num.map(lambda tup: np.asarray(tup[1][0])/tup[1][1]).collect()
我得到以下,這是numpy的數組的數組。
[array([-0.10006594, -6.7719144 ]), array([-0.25844196, 5.28381418]), array([-5.12591623, -4.5685448 ]), array([ 5.40192709, -4.35950824])]
但是,我想,讓他們點的原始格式的,就像這樣:
[Row(x=-5.659833908081055, y=7.705344200134277), Row(x=3.17942214012146, y=-9.446121215820312), Row(x=9.128270149230957, y=4.5666022300720215), Row(x=-6.432034969329834, y=-4.432190895080566)]
含義,我不想numpy_arrays陣列 - 我想排的陣列(X = ...,y = ...)thingys。
我該怎麼做?
我的全碼被附加以供參考:
new_centers = [Row(x=-5.659833908081055, y=7.705344200134277), Row(x=3.17942214012146, y=-9.446121215820312), Row(x=9.128270149230957, y=4.5666022300720215), Row(x=-6.432034969329834, y=-4.432190895080566)]
while old_centers is None or not has_converged(old_centers, new_centers, epsilon) and iteration < max_iterations:
# update centers
old_centers = new_centers
center_pt_1 = points.rdd.map(lambda point: (old_centers[nearest_center(old_centers, point)[0]], (point, 1)))
note that nearest_center()[0] is the index
center_sum_num =center_pt_1.reduceByKey(lambda a, b: ((a[0][0] + b[0][0], a[0][1] + b[0][1]) ,a[1] + b[1]))
new_centers = center_sum_num.map(lambda tup: np.asarray(tup[1][0])/tup[1][1]).collect()
iteration += 1
return new_centers