2016-06-20 66 views
0

我使用弗林克流例子衣架提供&我想通過架IDs..following計算溫度組的總和是我的代碼:弗林克流而計算總和拋出異常

static Properties properties=new Properties(); 
    public static Properties getProperties() 
    { 
     properties.setProperty("bootstrap.servers", "54.210.139.57:9092"); 
     properties.setProperty("zookeeper.connect", "54.210.139.57:2181"); 
     //properties.setProperty("deserializer.class", "kafka.serializer.StringEncoder"); 
     //properties.setProperty("group.id", "akshay"); 
     properties.setProperty("auto.offset.reset", "earliest"); 
     return properties; 
    } 

@SuppressWarnings("rawtypes") 
public static void main(String[] args) throws Exception 
{ 
    StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment(); 
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 
    Properties props=Program.getProperties(); 
    //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 

    /*DataStream<String> dstream=env.addSource(new FlinkKafkaConsumer09<String>("TemperatureEvent",new SimpleStringSchema(), props)); 
    dstream.filter(dstream -> dstream.)*/ 
    DataStream<TemperatureEvent> dstream=env.addSource(new FlinkKafkaConsumer09<TemperatureEvent>("TemperatureEvent", new TemperatureEventSchema(), props)); 

    DataStream<TemperatureEvent> ds1=dstream.keyBy("rackId").sum(1); 

    ds1.print(); 
    env.execute("Temperature Consumer"); 
} 

當我嘗試執行此代碼,它拋出以下異常: 但下列情況除外完成該程序:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error. 
     at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520) 
     at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) 
     at org.apache.flink.client.program.Client.runBlocking(Client.java:248) 
     at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) 
     at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) 
     at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1192) 
     at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243) 
Caused by: java.lang.IndexOutOfBoundsException: Not 0th field selected for a simple type (non-tuple, non-array). 
     at org.apache.flink.streaming.util.FieldAccessor.create(FieldAccessor.java:78) 
     at org.apache.flink.streaming.api.functions.aggregation.SumAggregator.<init>(SumAggregator.java:39) 
     at org.apache.flink.streaming.api.datastream.KeyedStream.sum(KeyedStream.java:292) 
     at com.yash.main.Program.main(Program.java:38) 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:498) 
     at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) 

我使用的總和(1),因爲我第0個參數是rackId & 1參數是坦佩在pojo TemperatureEvent中定義如下:

public class TemperatureEvent 
{ 
    private int rackId; 
    private double temperature; 
    private long timeStamp; 

    public TemperatureEvent() 
    { 
    // TODO Auto-generated constructor stub 
    } 

public TemperatureEvent(int rackId, double temperature, long timeStamp) { 
    super(); 
    this.rackId = rackId; 
    this.temperature = temperature; 
    this.timeStamp = timeStamp; 
} 

public int getRackId() { 
    return rackId; 
} 

public void setRackId(int rackId) { 
    this.rackId = rackId; 
} 

public double getTemperature() { 
    return temperature; 
} 

public void setTemperature(double temperature) { 
    this.temperature = temperature; 
} 

public long getTimeStamp() { 
    return timeStamp; 
} 

public void setTimeStamp(long timeStamp) { 
    this.timeStamp = timeStamp; 
} 

@Override 
public String toString() { 
    //return String.format("TemperatureEvent [rackId=%s, temperature=%s, timeStamp=%s]",rackId, temperature, timeStamp); 
      String str=getRackId()+","+temperature+","+getTimeStamp(); 
      return str; 

} 

這個問題的解決方案是什麼?我如何通過rackID計算溫度組的總和?

回答

0

如果您的類型是元組類型,則只能在這些方法上使用基於索引的參數。在你的情況下,它應該與.sum("temperature")

+0

感謝幫助我解決問題的解決方案......現在假設如果我想計算窗口時間內rackID的溫度「平均值」而不是「總和」,我該如何計算平均值? – Akki