1

斜交數據分發我在弗林克此Java代碼:阿帕奇弗林克:上KeyedStream

env.setParallelism(6); 

//Read from Kafka topic with 12 partitions 
DataStream<String> line = env.addSource(myConsumer); 

//Filter half of the records 
DataStream<Tuple2<String, Integer>> line_Num_Odd = line_Num.filter(new FilterOdd()); 
DataStream<Tuple3<String, String, Integer>> line_Num_Odd_2 = line_Num_Odd.map(new OddAdder()); 

//Filter the other half 
DataStream<Tuple2<String, Integer>> line_Num_Even = line_Num.filter(new FilterEven()); 
DataStream<Tuple3<String, String, Integer>> line_Num_Even_2 = line_Num_Even.map(new EvenAdder()); 

//Join all the data again 
DataStream<Tuple3<String, String, Integer>> line_Num_U = line_Num_Odd_2.union(line_Num_Even_2); 

//Window 
DataStream<Tuple3<String, String, Integer>> windowedLine_Num_U_K = line_Num_U 
       .keyBy(1) 
       .window(TumblingProcessingTimeWindows.of(Time.seconds(10))) 
       .reduce(new Reducer()); 

的問題是,在窗口應該能夠並行= 2來處理,因爲有數據的兩個diferent基團與在Tuple3的第二個字符串中鍵「odd」和「even」。所有東西都以並行6運行,但不是運行parallelism = 1的窗口,我只是需要它具有並行性= 2,因爲我的要求。

在代碼中使用的函數如下:

public static class FilterOdd implements FilterFunction<Tuple2<String, Integer>> { 

    public boolean filter(Tuple2<String, Integer> line) throws Exception { 
     Boolean isOdd = (Long.valueOf(line.f0.split(" ")[0]) % 2) != 0; 
     return isOdd; 
    } 
}; 


public static class FilterEven implements FilterFunction<Tuple2<String, Integer>> { 

    public boolean filter(Tuple2<String, Integer> line) throws Exception { 
     Boolean isEven = (Long.valueOf(line.f0.split(" ")[0]) % 2) == 0; 
     return isEven; 
    } 
}; 

public static class OddAdder implements MapFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>> { 

    public Tuple3<String, String, Integer> map(Tuple2<String, Integer> line) throws Exception { 
     Tuple3<String, String, Integer> newLine = new Tuple3<String, String, Integer>(line.f0, "odd", line.f1); 
     return newLine; 
    } 
}; 


public static class EvenAdder implements MapFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>> { 

    public Tuple3<String, String, Integer> map(Tuple2<String, Integer> line) throws Exception { 
     Tuple3<String, String, Integer> newLine = new Tuple3<String, String, Integer>(line.f0, "even", line.f1); 
     return newLine; 
    } 
}; 

public static class Reducer implements ReduceFunction<Tuple3<String, String, Integer>> { 

    public Tuple3<String, String, Integer> reduce(Tuple3<String, String, Integer> line1, 
      Tuple3<String, String, Integer> line2) throws Exception { 
     Long sum = Long.valueOf(line1.f0.split(" ")[0]) + Long.valueOf(line2.f0.split(" ")[0]); 
     Long sumTS = Long.valueOf(line1.f0.split(" ")[1]) + Long.valueOf(line2.f0.split(" ")[1]); 
     Tuple3<String, String, Integer> newLine = new Tuple3<String, String, Integer>(String.valueOf(sum) + 
       " " + String.valueOf(sumTS), line1.f1, line1.f2 + line2.f2); 
     return newLine; 
    } 
}; 

感謝您的幫助!

解決方法:我已經將鍵的內容從「odd」和「even」更改爲「odd0000」和「even1111」,現在它正在正常工作。

回答

1

密鑰通過哈希分區分配給工作線程。這意味着鍵值被散列,並且線程由模#rorkers確定。使用兩個鍵和兩個線程很有可能兩個鍵都分配給同一個線程。

您可以嘗試使用散列值在兩個線程之間分佈的不同鍵值。

+0

謝謝!我已經將它從「奇數」/「偶數」改爲「odd0000」/「even1111」,現在它正在工作:D。唯一的是我有兩個工人,兩個線程都在同一臺機器上,有什麼辦法可以強制每個線程在不同的工作人員? – Franmoti

+0

這取決於您的設置。您可以使用一個插槽啓動工作人員,但取決於您是使用YARN,Mesos還是其他工具運行,您無法控制工作人員的啓動位置。 –

+0

我正在運行獨立模式。是的,我想過一個任務槽,但我想有一些性能..hahah我會離開它,但吞吐量受到兩個工作人員之一:( – Franmoti

相關問題