2013-03-11 58 views
2

在我們的ActivePivot項目中,我們聚合了大量的模擬數據向量(向量長度可以達到一百萬個值),並且內存消耗非常高。使用ActivePivot進行稀疏向量聚合

大多數時候,向量中的大多數值都是零。 ActivePivot可以利用它來壓縮矢量嗎?我們還可以聚合壓縮的矢量嗎?

回答

3

ActivePivot不會自動檢測到你聚合的向量是稀疏的並且應用了一些壓縮機制,但是由於ActivePivot是基於對象的,你可以編寫自己的聚合函數,將聚合壓縮向量(或者任何其他類型的數據實際上需要)。

現在,如果你需要從零開始矢量壓縮的例子這裏是一個簡單的:

/* 
* (C) Quartet FS 2013 
* ALL RIGHTS RESERVED. This material is the CONFIDENTIAL and PROPRIETARY 
* property of Quartet Financial Systems Limited. Any unauthorized use, 
* reproduction or transfer of this material is strictly prohibited 
*/ 
package com.quartetfs.biz.pivot.aggfun.impl; 

import java.util.Arrays; 

import com.quartetfs.fwk.IClone; 

/** 
* 
* Vector of primitive doubles, compressed by zero-elimination. 
* 
* @author Quartet FS 
* 
*/ 
public class DoubleVector implements IClone<DoubleVector> { 

    /** Underlying data */ 
    protected final double[] data; 

    /** Private internal constructor */ 
    private DoubleVector(double[] data) { 
     this.data = data; 
    } 

    /** 
    * Create a compressed vector from a raw vector. 
    * 
    * @param raw 
    * @return compressed vector 
    */ 
    public static DoubleVector compress(double[] raw) { 
     final int length = raw.length; 

     // How many 64-bits slots do we need to mark all of our values? 
     int bucketCount = 0; 
     while((bucketCount << 6) < length) { bucketCount++; } 

     // Count non-zeroes 
     int nonZeroes = 0; 
     for(int i = 0; i < length; i++) { 
      nonZeroes += raw[i] == 0.0 ? 0 : 1; 
     } 

     // Initialize the data structure: 
     // - one slot to store the size of the final vector 
     // - n bits packed in buckets to mark zeroes and non-zeroes 
     // - the non-zero doubles 
     final double[] data = new double[1 + bucketCount + nonZeroes]; 
     data[0] = Double.longBitsToDouble((long) length); 

     // Mark and copy the non-zeroes 
     nonZeroes = 0; 
     for(int b = 0; b < bucketCount; b++) { 

      // Clear bucket 
      data[1 + b] = Double.longBitsToDouble(0L); 

      final int from = b << 6; 
      final int to = Math.min(length, (b+1) << 6); 
      for(int i = from; i < to; i++) { 
       double value = raw[i]; 
       if(value != 0.0) { 
        // Mark the non-zero value and copy the value 
        int bucketIdx = i >>> 6; 
        int shift = i & 0x3F; // Keep 6 bits 
        final long bit = 1L << shift; 

        long bucket = Double.doubleToLongBits(data[1 + bucketIdx]); 
        bucket = bucket | bit; 
        data[1 + bucketIdx] = Double.longBitsToDouble(bucket); 

        // Copy the value 
        data[1 + bucketCount + nonZeroes++] = value; 
       } 
      } 
     } 

     return new DoubleVector(data); 
    } 

    /** Deep clone implementation */ 
    public DoubleVector clone() { 
     return new DoubleVector(data.clone()); 
    } 

    public int length() { 
     return (int) Double.doubleToLongBits(data[0]); 
    } 

    /** 
    * 
    * Add this vector to another vector, then return the result. 
    * 
    * @param vector 
    * @return sum vector 
    */ 
    public DoubleVector add(DoubleVector other) { 
     return add(other, false); 
    } 

    /** 
    * 
    * Add this vector to another vector, then return the result. 
    * 
    * @param vector 
    * @param negative if true, the vector is actually subtracted 
    * @return sum vector 
    */ 
    public DoubleVector add(DoubleVector other, boolean negative) { 

     final int length = (int) Double.doubleToLongBits(this.data[0]); 
     if((int) Double.doubleToLongBits(other.data[0]) != length) { 
      throw new IllegalArgumentException("Cannot aggregate vectors of different lengths."); 
     } 


     // How many 64-bits slots do we need to mark all of our values? 
     int bucketCount = 0; 
     while((bucketCount << 6) < length) { bucketCount++; } 

     // How many non-zeroes does the result bear? 
     // (we do not try to detect new zeroes caused by the sum) 
     int nonZeroes = 0; 
     for(int b = 0; b < bucketCount; b++) { 
      nonZeroes += Long.bitCount(Double.doubleToLongBits(this.data[1 + b]) | Double.doubleToLongBits(other.data[1 + b])); 
     } 

     // Allocate the data of the result 
     final double[] result = new double[1 + bucketCount + nonZeroes]; 
     result[0] = Double.longBitsToDouble(length); 
     for(int b = 0; b < bucketCount; b++) { 
      result[1 + b] = Double.longBitsToDouble(Double.doubleToLongBits(this.data[1 + b]) | Double.doubleToLongBits(other.data[1 + b])); 
     } 

     // Loop on both vectors, and sum 
     int a = 0; 
     int b = 0; 
     int c = 0; 
     for(int i = 0; i < length; i++) { 

      final int bucketIdx = i >>> 6; 
      final int shift = i & 0x3F; // Keep 6 bits 
      final long bucketA = Double.doubleToLongBits(this.data[bucketIdx + 1]); 
      final long bucketB = Double.doubleToLongBits(other.data[bucketIdx + 1]); 

      long bitA = (bucketA >>> shift) & 0x1L; 
      long bitB = (bucketB >>> shift) & 0x1L; 

      double valueA = bitA == 0L ? 0.0 : this.data[1 + bucketCount + a++]; 
      double valueB = bitB == 0L ? 0.0 : other.data[1 + bucketCount + b++]; 
      if(bitA != 0L || bitB != 0L) { 
       result[1 + bucketCount + c++] = valueA + (negative ? -valueB : valueB); 
      } 
     } 

     return new DoubleVector(result); 
    } 


    /** 
    * @return the decoded content of the vector 
    */ 
    public double[] content() { 

     // How many 64-bits slots do we need to mark all of our values? 
     final int length = (int) Double.doubleToLongBits(data[0]); 
     int bucketCount = 0; 
     while((bucketCount << 6) < length) { bucketCount++; } 


     final double[] content = new double[length]; 

     int nonZeroes = 0; 
     for(int i = 0; i < length; i++) { 

      final int bucketIdx = i >>> 6; 
      final int shift = i & 0x3F; // Keep 6 bits 
      final long bucket = Double.doubleToLongBits(data[bucketIdx + 1]); 

      if(((bucket >>> shift) & 0x1L) != 0L) { 
       // The bit is set, this is a non-zero value 
       content[i] = data[1 + bucketCount + nonZeroes++]; 
      } 

     } 

     return content; 
    } 

    /** @return the compression ratio as a percentage */ 
    public double compressionRatio() { 
     long originalSize = 16L + 8 * Double.doubleToLongBits(data[0]); 
     long compressedSize = 16L + 8L + 16L + 8 * data.length; 

     return 0.01 * (100L * compressedSize/originalSize); 
    } 


    @Override 
    public String toString() { 
     return Arrays.toString(content()); 
    } 


    /** Useful helper method that can be used from the debugger */ 
    protected static String binaryString(final long b) { 
     final String binary = Long.toBinaryString(b); 
     final int length = binary.length(); 
     final StringBuffer buffer = new StringBuffer(); 
     if(binary.length() <= 64) { 
      for(int i = 0; i < (64 - length); i++) { buffer.append('0'); } 
      buffer.append(binary); 
      return buffer.toString(); 
     } else { 
      return binary.substring(length - 64); 
     } 
    } 

    /** 
    * Some test. 
    * 
    * @param args 
    */ 
    public static void main(String[] args) { 
     double[] v1 = new double[] { 0.0, 0.0, 2.0, 0.0, 0.0, 5.0, 0.0, 6.0, 7.0, 8.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0 }; 
     double[] v2 = new double[] { 10.0, 10.0, 8.0, 0.0, 0.0, 5.0, 0.0, 4.0, 3.0, 2.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 10.0 }; 
     double[] sum = v1.clone(); 
     for(int i = 0; i < sum.length; i++) { sum[i] += v2[i]; } 
     DoubleVector c1 = DoubleVector.compress(v1); 
     DoubleVector c2 = DoubleVector.compress(v2); 
     DoubleVector cSum = c1.add(c2); 
     System.out.println(Arrays.toString(v1) + " -> " + c1 + " (" + c1.compressionRatio() * 100 + "%)"); 
     System.out.println(Arrays.toString(v2) + " -> " + c2 + " (" + c2.compressionRatio() * 100 + "%)"); 
     System.out.println(Arrays.toString(sum) + " -> " + cSum + " (" + cSum.compressionRatio() * 100 + "%)"); 
    } 

} 

這裏是基本代碼聚集那些有ActivePivot聚合函數:

/* 
* (C) Quartet FS 2013 
* ALL RIGHTS RESERVED. This material is the CONFIDENTIAL and PROPRIETARY 
* property of Quartet Financial Systems Limited. Any unauthorized use, 
* reproduction or transfer of this material is strictly prohibited 
*/ 
package com.quartetfs.biz.pivot.aggfun.impl; 

import com.quartetfs.fwk.QuartetPluginValue; 

/** 
* Aggregation function that sums compressed vectors of doubles. 
* 
* @author Quartet FS 
* 
*/ 
@QuartetPluginValue(interfaceName = "com.quartetfs.biz.pivot.aggfun.IAggregationFunction") 
public class DoubleVectorSum extends GenericAggregationFunction<DoubleVector, DoubleVector> { 

    /** serialVersionUID */ 
    private static final long serialVersionUID = 11699698472584733L; 

    public DoubleVectorSum() { 
     super("VectorSum"); 
    } 

    @Override 
    public String description() { return "Function to sum compressed double vectors"; } 

    @Override 
    protected DoubleVector aggregate(boolean removal, DoubleVector aggregate, DoubleVector input) { 
     return aggregate.add(input, removal); 
    } 

    @Override 
    protected DoubleVector merge(boolean removal, DoubleVector main, DoubleVector contribution) { 
     return main.add(contribution, removal); 
    } 

    @Override 
    protected DoubleVector cloneAggregate(DoubleVector aggregate) { 
     return aggregate.clone(); 
    } 

}