2017-02-03 84 views
1

我需要使用多個線程來計算PI作業。問題是 - 當使用多個線程時,大部分時間總和計算不正確(小於它應該)。我花了相當長的一段時間來搞清楚,所以也許我忽略了一些明顯的東西。希望你的幫助。使用WinAPI的多線程。計算PI

P.S.有關於同樣任務的問題。但是,我的錯誤是不同的。

這是我的代碼。

#include <stdio.h> 
#include <Windows.h> 
#include <ctime> 

#define THREAD_COUNT 2 
#define N 10000000 
#define BLOCK_SIZE 43087 

typedef struct { 
    unsigned startPos; 
    int threadInd; 
    double threadSum; 
    bool isCalculating; 
} ThreadArg; 

HANDLE* threads; 
HANDLE* events; 

DWORD WINAPI ThreadProc(LPVOID); 
void calculate(double* sum, unsigned start); 

int main() { 
    unsigned cursor = 0; 
    ThreadArg* params = NULL; 
    threads = NULL; 

    if (THREAD_COUNT > 1) { 
     params = (ThreadArg*)malloc(sizeof(ThreadArg)*(THREAD_COUNT - 1)); 
     threads = (HANDLE*)malloc(sizeof(HANDLE)*(THREAD_COUNT - 1)); 
     events = (HANDLE*)malloc(sizeof(HANDLE)*(THREAD_COUNT - 1)); 
    } 
    for (int i = 0; i < THREAD_COUNT - 1; ++i) { 
     ThreadArg arg; 
     arg.isCalculating = true; 
     arg.startPos = cursor; 
     arg.threadInd = i; 
     arg.threadSum = 0; 
     params[i] = arg; 
     threads[i] = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)ThreadProc, (LPVOID)(&params[i]), CREATE_SUSPENDED, 0); 
     events[i] = CreateEvent(NULL, TRUE, TRUE, NULL); 
     cursor += BLOCK_SIZE; 
    } 

    unsigned startTime = clock(); 
    for (int i = 0; i < THREAD_COUNT - 1; ++i) { 
     ResumeThread(threads[i]); 
    } 

    double mainSum = 0; 
    while (cursor < N) { 
     for (int i = 0; i < THREAD_COUNT - 1; ++i) { 
      if (!params[i].isCalculating) { 
       params[i].startPos = cursor; 
       SetEvent(events[i]); 
       cursor += BLOCK_SIZE; 
      } 
     } 
     calculate(&mainSum, cursor); 
     cursor += BLOCK_SIZE; 
    } 

    bool allFinished; 
    do { 
     allFinished = true; 
     for (int i = 0; i < THREAD_COUNT - 1; ++i) { 
      if (params[i].isCalculating) { 
       allFinished = false; 
       break; 
      } 
     } 
    } while(!allFinished); 

    for (int i = 0; i < THREAD_COUNT - 1; ++i) { 
     mainSum += params[i].threadSum; 
     CloseHandle(threads[i]); 
    } 

    printf("Time: %d\n", clock() - startTime); 
    printf("PI = %.7f\n", mainSum/N); 
    free(threads); 
    free(params); 
    system("pause"); 
} 

DWORD WINAPI ThreadProc(LPVOID lParam) { 
    ThreadArg* param = (ThreadArg*)lParam; 
    while (true) { 
     ResetEvent(events[param->threadInd]); 
     calculate(&param->threadSum, param->startPos); 
     InterlockedDecrement((LONG*)&param->isCalculating); 
     WaitForSingleObject(events[param->threadInd], INFINITE); 
     InterlockedIncrement((LONG*)&param->isCalculating); 
    } 
} 

void calculate(double* sum, unsigned start) { 
    int endLoop = start + BLOCK_SIZE; 
    double x; 
    for (unsigned i = start; i < endLoop; ++i) { 
     if (i < N) { 
      x = (i + 0.5)/N; 
      *sum += 4/(1 + x*x); 
     } else break; 
    } 
} 
+0

你試過沒有線程運行相同的代碼?即直接調用'ThreadProc'替換'ResumeThread'調用,並檢查一切正常。 – yeputons

+0

它似乎工作正常。所有計算都在主線程中發生時也可以正常工作。它在計算結束時開始出現故障,因爲結果在小數點後的第一位或第二位出現波動 – Madhas

+1

然後它看起來像競爭條件,這可能是由於線程之間缺乏適當的同步。我可以看到,你使用了一些像isCalculating這樣的變量/字段來進行_some_同步,甚至是原子操作(這聽起來對於這種問題來說太難了)。我建議你爲每個線程分配一個事件(「完成計算」),然後在主線程中使用'WaitForMultipleObjects'而不是for循環。 – yeputons

回答

0

你根本不需要事件。工作線程永遠不能等待。所有這些都可以通過互鎖操作來完成。

class CALC_TASK 
{ 
    C_ASSERT(sizeof(double)==sizeof(__int64)); 

    union { 
     double _sum; 
     __int64 _value; 
    }; 

    double _step; 
    LONG _startPos, _maxPos, _blockSize; 

    static double calculate(LONG i, LONG count, double step) 
    { 
     double x, sum = 0.0; 

     do 
     { 
      x = (i-- - 0.5) * step; 
      sum += 4.0/(1.0 + x*x); 
     } while (--count); 

     return step * sum; 
    } 

    void add(double x) 
    { 
     union { 
      double sum; 
      __int64 value; 
     }; 

     union { 
      double new_sum; 
      __int64 new_value; 
     }; 

     sum = _sum; 

     for (; ; value = new_value) 
     { 
      new_sum = sum + x; 

      new_value = _InterlockedCompareExchange64(&_value, new_value, value); 

      if (new_value == value) 
      { 
       return; 
      } 
     } 
    } 

    BOOL getBlock(LONG& pos, LONG& count) 
    { 
     LONG startPos, newPos, curPos; 

     for (startPos = _startPos ; startPos < _maxPos; startPos = curPos) 
     { 
      if ((newPos = startPos + _blockSize) > _maxPos) 
      { 
       newPos = _maxPos; 
      } 

      curPos = _InterlockedCompareExchange(&_startPos, newPos, startPos); 

      if (curPos == startPos) 
      { 
       pos = newPos, count = newPos - startPos; 
       return TRUE; 
      } 
     } 

     return FALSE; 
    } 

    void calculateEx() 
    { 
     LONG i, count; 

     double step = _step; 

     while (getBlock(i, count)) 
     { 
      add(calculate(i, count, step)); 
     } 
    } 

public: 

    double getsum() 
    { 
     return _sum; 
    } 

    CALC_TASK(LONG maxPos, LONG blockSize) 
    { 
     _startPos = 0, _blockSize = blockSize, _maxPos = maxPos, _sum = 0.0, _step = 1.0/(double)maxPos; 
    } 

    static DWORD CALLBACK ThreadProc(PVOID pct) 
    { 
     reinterpret_cast<CALC_TASK*>(pct)->calculateEx(); 
     return 0; 
    } 
}; 

double test(LONG maxPos, LONG blockSize) 
{ 
    SYSTEM_INFO si; 
    GetNativeSystemInfo(&si); 
    PHANDLE phThreads = 0; 
    ULONG nThreads = 0; 

    CALC_TASK ct(maxPos, blockSize); 

    if (1 < si.dwNumberOfProcessors) 
    { 
     ULONG n = si.dwNumberOfProcessors - 1; 

     phThreads = (HANDLE*)alloca(n * sizeof(HANDLE)); 

     do 
     { 
      if (*phThreads = CreateThread(NULL, PAGE_SIZE, CALC_TASK::ThreadProc, &ct, 0, 0)) 
      { 
       nThreads++, phThreads++; 
      } 
     } while (--n); 
    } 

    CALC_TASK::ThreadProc(&ct); 

    if (nThreads) 
    { 
     WaitForMultipleObjects(nThreads, phThreads - nThreads, TRUE, INFINITE); 
     do 
     { 
      CloseHandle(*--phThreads); 
     } while (--nThreads); 
    } 

    return ct.getsum(); 
} 

void testEx() 
{ 
    ULONG dwStart = GetTickCount(); 

    double d = test(0x10000000, 0x8000); 

    dwStart = GetTickCount() - dwStart; 

    DbgPrint("pi = %.15f, %u milliseconds\n", d, dwStart); 
} 
在我和8芯測試

/胎面它採取約0.5秒0x10000000步驟。與單線程這需要〜3.5秒。

作爲替代,你可以使用openMP

double calculateOMP(int num_steps) 
{ 
    double x, sum = 0.0, step = 1.0/(double) num_steps; 

#pragma omp parallel for reduction(+:sum) private(x) 

    for (int i = num_steps; i > 0; --i) 
    { 
     x = (i - 0.5) * step; 
     sum += 4.0/(1.0 + x*x); 
    } 

    return step * sum; 
} 

爲此,你需要使用/openmp選項並添加vcomp.lib進行接頭輸入