2011-11-01 36 views
3

我試圖實現基於多線程叉的檢查點方案使用叉與setjmp/longjmp。我希望我的解決方案能夠發揮作用,但如預期那樣。代碼如下所示,用於檢查點/回滾的示例用法。使用fork與setjmp/longjmp

主要想法是爲自己分配線程堆棧,就像使用函數pthread_create_with_stack完成的那樣,然後在主線程中使用一個分支。分叉進程(檢查點)在開始時暫停,當喚醒(回滾)時,分叉進程的主線程通過調用pthread_create重新創建線程,並在原始進程中使用與線程相同的堆棧。另外longjmp在開始的線程例程中完成,以便當進程被分支爲檢查點時跳轉到代碼中的相同點。請注意,所有setjmp調用都在函數my_pthread_barrier_wait內完成,以便沒有線程獲得鎖定。

我覺得這裏的問題是setjmp/lonjmp。請問getcontext/savecontext/makecontext幫助這裏,還是其他什麼?甚至可以使用setjmp/longjmp這樣的方式,它的工作原理?任何解決方案將不勝感激。

#include <pthread.h> 
#include <stdio.h> 
#include <stdlib.h> 
#include <math.h> 
#include <unistd.h> 
#include <semaphore.h> 
#include <signal.h> 
#include <sys/types.h> 
#include <setjmp.h> 

#define PERFORM_JMP 

#define NUM_THREADS 4 

void *stackAddr[NUM_THREADS]; 
pthread_t thread[NUM_THREADS]; 
jmp_buf buf[NUM_THREADS]; 
pthread_attr_t attr[NUM_THREADS]; 
pthread_barrier_t bar; 
sem_t sem; 
pid_t cp_pid; 
int rollbacked; 
int iter; 
long thread_id[NUM_THREADS]; 

void *BusyWork(void *t); 

void sig_handler(int signum) 
{ 
    printf("signal_handler posting sem!\n"); 
    sem_post(&sem); 
} 

int pthread_create_with_stack(void *(*start_routine) (void *), int tid) 
{ 
    const size_t STACKSIZE = 0xC00000; //12582912 
    size_t i; 
    pid_t pid; 
    int rc; 

    printf("tid = %d\n", tid); 

    pthread_attr_init(&attr[tid]); 
    stackAddr[tid] = malloc(STACKSIZE); 
    pthread_attr_setstack(&attr[tid], stackAddr[tid], STACKSIZE); 

    thread_id[tid] = tid; 
    rc = pthread_create(&thread[tid], &attr[tid], start_routine, (void*)&thread_id[tid]); 

    if (rc) 
    { 
     printf("ERROR; return code from pthread_create() is %d\n", rc); 
     exit(-1); 
    } 

    return rc; 
} 

pid_t checkpoint() 
{ 
    pid_t pid; 
    int t, rc; 

    switch (pid=fork()) 
    { 
    case -1: 
     perror("fork"); 
     break; 
    case 0:   // child process starts 
     sem_wait(&sem); 
     rollbacked = 1; 
     printf("case 0: rollbacked = 1, my pid is %d\n", getpid()); 
     for(t = 1; t < NUM_THREADS; t++) 
     { 
      printf("checkpoint: creating thread %d again\n", t); 
      rc = pthread_create(&thread[t], &attr[t], BusyWork, (void*)&thread_id[t]); 
      if (rc) 
      { 
       printf("ERROR; return code from pthread_create() is %d\n", rc); 
       exit(-1); 
      } 
     } 
     return 1; // child process ends 
    default:  // parent process starts 
     return pid; 
    } 
} 

void restart_from_checkpoint(pid_t pid) 
{ 
    printf("Restart_from_checkpoint, sending signal to %d!\n", pid); 
    kill(pid, SIGUSR1); 
    exit(0); 
} 

void take_checkpoint_or_rollback(int sig_diff) 
{ 
    if (cp_pid) 
    { 
     if (sig_diff) 
     { 
      printf("rollbacking\n"); 
      if (!rollbacked) 
       restart_from_checkpoint(cp_pid); 
     } 
     else 
     { 
      kill(cp_pid, SIGKILL); 
      cp_pid = checkpoint(); 
      printf("%d: cp_pid = %d!\n", getpid(), cp_pid); 
     } 
    } 
    else 
     cp_pid = checkpoint(); 
} 

void my_pthread_barrier_wait(int tid, pthread_barrier_t *pbar) 
{ 
    pthread_barrier_wait(pbar); 
#ifdef PERFORM_JMP 
    if (tid == 0) 
    { 
     if (!rollbacked) 
     { 
      take_checkpoint_or_rollback(++iter == 4); 
     } 
    } 
    if (setjmp(buf[tid]) != 0) {} 
    else {} 
    printf("%d: %d is waiting at the second barrier!\n", getpid(), tid); 
#endif 
    pthread_barrier_wait(pbar); 
} 

void *BusyWork(void *t) 
{ 
    volatile int i; 
    volatile long tid = *((long*)t); 
    volatile double result = 0.0; 

    printf("thread %ld in BusyWork!\n", tid); 
#ifdef PERFORM_JMP 
    if (rollbacked) 
    { 
    printf("hmm, thread %ld is now doing a longjmp, goodluck!\n", tid); 
    longjmp(buf[tid], 1); 
    } 
#endif 
    printf("Thread %ld starting...\n",tid); 
    for (i = 0; i < 10; i++) 
    { 
     result += (tid+1) * i; 
     printf("%d: tid %ld: result = %g\n", getpid(), tid, result); 
     my_pthread_barrier_wait(tid, &bar); 
    } 
    printf("Thread %ld done. Result = %g\n", tid, result); 
    //pthread_exit((void*) t); 
} 

int main (int argc, char *argv[]) 
{ 
    int rc; 
    long t; 
    void *status; 

    /* Initialize and set thread detached attribute */ 
    pthread_barrier_init(&bar, NULL, NUM_THREADS); 
#ifdef PERFORM_JMP 
    signal(SIGUSR1, sig_handler); 
    sem_init(&sem, 0, 0); 
#endif 
    for(t = 1; t < NUM_THREADS; t++) 
    { 
     printf("Main: creating thread %ld\n", t); 
     rc = pthread_create_with_stack(BusyWork, t); // This is the line 52 
     if (rc) 
     { 
     printf("ERROR; return code from pthread_create() is %d\n", rc); 
     exit(-1); 
     } 
    } 

    thread_id[0] = 0; 
    BusyWork(&thread_id[0]); 

    /* Free attribute and wait for the other threads */ 
    for(t=1; t<NUM_THREADS; t++) 
    { 
     rc = pthread_join(thread[t], &status); 
     if (rc) 
     { 
     printf("ERROR; return code from pthread_join() is %d\n", rc); 
     exit(-1); 
     } 
     printf("Main: completed join with thread %ld having a status" 
      "of %ld\n",t,(long)status); 
    } 

    printf("Main: program completed. Exiting.\n"); 
    pthread_exit(NULL); 
} 

回答

4

你想要做的事情根本不可能。 fork從根本上與同步不兼容。即使您可以在子進程中可靠地重新創建線程,它們也會擁有新的線程ID,因此它們不會是它應該擁有的鎖的所有者。

執行檢查點設置的唯一方法是使用高級操作系統支持。這必須包含單獨的pid命名空間,以便程序的檢查點副本具有相同的pid,並且其所有線程都具有相同的線程ID。即使如此,如果它正在與其他進程或外部世界進行通信,它也不會起作用。我相信有一些工具可以在Linux上做到這一點,但我對它們並不熟悉,在這一點上,你會遇到一些問題,在適當的地方詢問是否有更好的方法來實現你正在嘗試的去做。

+0

好點,在這種情況下,我做了一個簡化的假設,我使用自己的線程ID,但是像你說的那樣,有些事情使用真正的線程ID,就像本例中的pthread_barrier_wait一樣。 – MetallicPriest