我目前正在開發一個應用程序,通過IP網絡遠程監視少數對象。網絡中的每個節點都會週期性地發送來自傳感器的更新數據(電壓,電流,溫度等)。如何從多個線程獲取數據?
我開始一個新的線程來處理每個遠程對象。但是我有一個從線程傳輸數據的問題。
將數據傳輸到主線程的最佳方式是什麼?我應該用我有什麼或不同的東西去?
我目前正在開發一個應用程序,通過IP網絡遠程監視少數對象。網絡中的每個節點都會週期性地發送來自傳感器的更新數據(電壓,電流,溫度等)。如何從多個線程獲取數據?
我開始一個新的線程來處理每個遠程對象。但是我有一個從線程傳輸數據的問題。
將數據傳輸到主線程的最佳方式是什麼?我應該用我有什麼或不同的東西去?
也許嘗試將您的數據放入BlockingQueue並在主線程中取數據。
實施例:
public class Producer implements Runnable { //this class puts objects with data into BlockingQueue
private BlockingQueue<Object> queue;
public Producer(BlockingQueue<Object> q) {
this.queue = q;
}
@Override
public void run() {
//place fot your instruction
Object yourData = new Object();
}
try {
queue.put(yourData);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
這是從BlockingQueue的讀數據
public class Consumer implements Runnable{
private BlockingQueue<Object> queue;
public Consumer(BlockingQueue<Object> q){
this.queue=q;
}
@Override
public void run() {
try{
Object date = queue.take()
// your operations witha data
}catch(InterruptedException e) {
e.printStackTrace();
}
}
}
,或者使用的ExecutorService和未來類
的常用方法向此類有些問題是將IO從處理中分離出來,所以在你的情況下你需要從套接字讀取線程並將數據傳遞給工作線程進行處理。
插槽IO
與插座IO開始,因爲Java 7中的NIO API已經可用,這意味着你可以離開,以避免主線程阻塞到操作系統所需的線程。
您可以監聽傳入連接,並開始使用AsynchronousServerSocketChannel和AsynchronousSocketChannel這樣寫着:
AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(5000));
serverSocketChannel.accept(null, new AcceptAndReadHandler());
凡CompletionHandler s就這個樣子:
private class AcceptAndReadHandler implements CompletionHandler<AsynchronousSocketChannel,Void>
{
public void completed(AsynchronousSocketChannel channel, Void attribute)
{
serverSocketChannel.accept(null, this);
ReadHandler readHandler = new ReadHandler(channel);
channel.read(readHandler.getBuffer(), Void, readHandler)
}
}
public class ReadHandler implements CompletionHandler<Integer,Void>
{
private ByteBuffer buffer;
public ReadHandler(AsynchronousSocketChannel channel)
{
this.channel = channel;
this.buffer = ByteBuffer.allocate(1024);
}
public ByteBuffer getBuffer() { return this.buffer; }
public void completed(Integer read, Void attribute)
{
byte[] data = new byte[read];
buffer.get(data);
...
ReadHandler readHandler = new ReadHandler(channel);
channel.read(readHandler.getBuffer(), Void, readHandler).
}
}
的ReadHandler
需要足夠聰明,能夠將接收到的數據拼湊在一起,因爲不能保證塊在發送時將接收到數據。
兩種常見的方法是在每個消息的前面添加長度或使用分隔符。您可能還想看看像Netty這樣的API,我相信您不必編寫邏輯來重構消息。
處理
一旦你有一個完整的消息,你需要對其進行處理。要做到這一點simiplest辦法是放在一個隊列,以便您的代碼可能看起來像:
public static void main(String[] args)
{
boolean running = true;
Runtime.getRuntime().addShutdownHook(() -> running = false);
BlockingQueue<byte[]> queue = new ArrayBlockingQueue<>();
AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(5000));
serverSocketChannel.accept(null, new AcceptAndReadHandler(queue));
while (running)
{
byte[] data = queue.take();
...
}
}
對於這個工作你需要爲了修改CompletionHandler s到通過queue
,使ReadHandler
可以加data
就可以了。
如果你想多線程的處理,那麼你應該使用Executor,像這樣:
public static void main(String[] args)
{
boolean running = true;
Runtime.getRuntime().addShutdownHook(() -> running = false);
Executor executor = Executors.newFixedThreadPool(10);
AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(5000));
serverSocketChannel.accept(null, new AcceptAndReadHandler(executor));
while (running)
{
...
}
}
同樣,你需要修改CompletionHandler S,這一次這樣你就可以通過executor
爲ReadHandler
提交的處理邏輯爲。
這兩種方法的重要區別在於,對於多線程示例,不能保證處理順序。
如果您發佈了一些代碼 – Hazaart
,可能會更容易回答您的問題「但是,我從傳輸數據中遇到問題。」什麼問題? – Brandon
「我應該用我擁有的東西去」,那麼,你有什麼? – Korashen