我有一個場景,其中將有一個列表包含網站和代碼塊來抓取這些網站。 是否可以實現多線程方式,以便每個線程從列表中取5個或更多網站並獨立抓取,並確保它們不會採用由另一個線程收集的同一網站。共享列表上的多線程
List <String> websiteList;
//crawling code block here
我有一個場景,其中將有一個列表包含網站和代碼塊來抓取這些網站。 是否可以實現多線程方式,以便每個線程從列表中取5個或更多網站並獨立抓取,並確保它們不會採用由另一個線程收集的同一網站。共享列表上的多線程
List <String> websiteList;
//crawling code block here
您可以使用LinkedBlockingQueue
,將所有websiteList放入此隊列並在每個線程之間共享此隊列。現在所有的線程都會輪詢這個隊列,這是一個阻塞操作,它確保一個元素是隊列只被一個線程獲取。
類似:
String site;
while((site=queue.poll(timeout, TimeUnit.SECONDS))!=null)
{
//process site
}
你可以嘗試DoubleBufferedList。這允許您從多個線程向列表中添加列表和條目,並使用多個線程以完全無鎖的方式從列表中獲取列表。
public class DoubleBufferedList<T> {
// Atomic reference so I can atomically swap it through.
// Mark = true means I am adding to it so momentarily unavailable for iteration.
private AtomicMarkableReference<List<T>> list = new AtomicMarkableReference<>(newList(), false);
// Factory method to create a new list - may be best to abstract this.
protected List<T> newList() {
return new ArrayList<>();
}
// Get and replace the current list.
public List<T> get() {
// Atomically grab and replace the list with an empty one.
List<T> empty = newList();
List<T> it;
// Replace an unmarked list with an empty one.
if (!list.compareAndSet(it = list.getReference(), empty, false, false)) {
// Failed to replace!
// It is probably marked as being appended to but may have been replaced by another thread.
// Return empty and come back again soon.
return Collections.<T>emptyList();
}
// Successfull replaced an unmarked list with an empty list!
return it;
}
// Grab and lock the list in preparation for append.
private List<T> grab() {
List<T> it;
// We cannot fail so spin on get and mark.
while (!list.compareAndSet(it = list.getReference(), it, false, true)) {
// Spin on mark - waiting for another grabber to release (which it must).
}
return it;
}
// Release the list.
private void release(List<T> it) {
// Unmark it - should this be a compareAndSet(it, it, true, false)?
if (!list.attemptMark(it, false)) {
// Should never fail because once marked it will not be replaced.
throw new IllegalMonitorStateException("It changed while we were adding to it!");
}
}
// Add an entry to the list.
public void add(T entry) {
List<T> it = grab();
try {
// Successfully marked! Add my new entry.
it.add(entry);
} finally {
// Always release after a grab.
release(it);
}
}
// Add many entries to the list.
public void add(List<T> entries) {
List<T> it = grab();
try {
// Successfully marked! Add my new entries.
it.addAll(entries);
} finally {
// Always release after a grab.
release(it);
}
}
// Add a number of entries.
@SafeVarargs
public final void add(T... entries) {
// Make a list of them.
add(Arrays.<T>asList(entries));
}
}
我建議這些3解決方案之一:
保持簡單
synchronized(list) {
// get and remove 5 websites from the list
}
如果你可以改變列表類型,你可以使用
BlockingQueue
如果你不能改變列表類型,你可以使用
Collections.synchronizedList(list)
雅緻>簡單 – 2013-04-23 10:04:04
你可以用它可以被所有感興趣的消費者可以共享,例如BlockingQueue
(注意,錯誤處理跳過清晰度):
public static void main(String[] args) throws Exception {
// for test purposes add 10 integers
final BlockingQueue<Integer> queue = new LinkedBlockingDeque<Integer>();
for (int i = 0; i < 10; i++) {
queue.add(i); //
}
new Thread(new MyRunnable(queue)).start();
new Thread(new MyRunnable(queue)).start();
new Thread(new MyRunnable(queue)).start();
}
static class MyRunnable implements Runnable {
private Queue<Integer> queue;
MyRunnable(Queue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
while(!queue.isEmpty()) {
Integer data = queue.poll();
if(data != null) {
System.out.println(Thread.currentThread().getName() + ": " + data);
}
}
}
}
當Queue
是空的Threads
將退出與程序將結束。
更喜歡使用BlockingQueue,因此您的線程可以從中獲取信息並在完成爬網工作時填充您的列表。 – 2013-04-23 09:59:03
如果您的websiteList在線程開始運行前已準備就緒,並且在運行期間不會更改,那麼只需將列表分區5,然後爲每個分區啓動一個線程。 – George 2013-04-23 10:07:54