我試圖實現基於多線程叉的檢查點方案使用叉與setjmp/longjmp。我希望我的解決方案能夠發揮作用,但如預期那樣。代碼如下所示,用於檢查點/回滾的示例用法。使用fork與setjmp/longjmp
主要想法是爲自己分配線程堆棧,就像使用函數pthread_create_with_stack完成的那樣,然後在主線程中使用一個分支。分叉進程(檢查點)在開始時暫停,當喚醒(回滾)時,分叉進程的主線程通過調用pthread_create重新創建線程,並在原始進程中使用與線程相同的堆棧。另外longjmp在開始的線程例程中完成,以便當進程被分支爲檢查點時跳轉到代碼中的相同點。請注意,所有setjmp調用都在函數my_pthread_barrier_wait內完成,以便沒有線程獲得鎖定。
我覺得這裏的問題是setjmp/lonjmp。請問getcontext/savecontext/makecontext幫助這裏,還是其他什麼?甚至可以使用setjmp/longjmp這樣的方式,它的工作原理?任何解決方案將不勝感激。
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <math.h>
#include <unistd.h>
#include <semaphore.h>
#include <signal.h>
#include <sys/types.h>
#include <setjmp.h>
#define PERFORM_JMP
#define NUM_THREADS 4
void *stackAddr[NUM_THREADS];
pthread_t thread[NUM_THREADS];
jmp_buf buf[NUM_THREADS];
pthread_attr_t attr[NUM_THREADS];
pthread_barrier_t bar;
sem_t sem;
pid_t cp_pid;
int rollbacked;
int iter;
long thread_id[NUM_THREADS];
void *BusyWork(void *t);
void sig_handler(int signum)
{
printf("signal_handler posting sem!\n");
sem_post(&sem);
}
int pthread_create_with_stack(void *(*start_routine) (void *), int tid)
{
const size_t STACKSIZE = 0xC00000; //12582912
size_t i;
pid_t pid;
int rc;
printf("tid = %d\n", tid);
pthread_attr_init(&attr[tid]);
stackAddr[tid] = malloc(STACKSIZE);
pthread_attr_setstack(&attr[tid], stackAddr[tid], STACKSIZE);
thread_id[tid] = tid;
rc = pthread_create(&thread[tid], &attr[tid], start_routine, (void*)&thread_id[tid]);
if (rc)
{
printf("ERROR; return code from pthread_create() is %d\n", rc);
exit(-1);
}
return rc;
}
pid_t checkpoint()
{
pid_t pid;
int t, rc;
switch (pid=fork())
{
case -1:
perror("fork");
break;
case 0: // child process starts
sem_wait(&sem);
rollbacked = 1;
printf("case 0: rollbacked = 1, my pid is %d\n", getpid());
for(t = 1; t < NUM_THREADS; t++)
{
printf("checkpoint: creating thread %d again\n", t);
rc = pthread_create(&thread[t], &attr[t], BusyWork, (void*)&thread_id[t]);
if (rc)
{
printf("ERROR; return code from pthread_create() is %d\n", rc);
exit(-1);
}
}
return 1; // child process ends
default: // parent process starts
return pid;
}
}
void restart_from_checkpoint(pid_t pid)
{
printf("Restart_from_checkpoint, sending signal to %d!\n", pid);
kill(pid, SIGUSR1);
exit(0);
}
void take_checkpoint_or_rollback(int sig_diff)
{
if (cp_pid)
{
if (sig_diff)
{
printf("rollbacking\n");
if (!rollbacked)
restart_from_checkpoint(cp_pid);
}
else
{
kill(cp_pid, SIGKILL);
cp_pid = checkpoint();
printf("%d: cp_pid = %d!\n", getpid(), cp_pid);
}
}
else
cp_pid = checkpoint();
}
void my_pthread_barrier_wait(int tid, pthread_barrier_t *pbar)
{
pthread_barrier_wait(pbar);
#ifdef PERFORM_JMP
if (tid == 0)
{
if (!rollbacked)
{
take_checkpoint_or_rollback(++iter == 4);
}
}
if (setjmp(buf[tid]) != 0) {}
else {}
printf("%d: %d is waiting at the second barrier!\n", getpid(), tid);
#endif
pthread_barrier_wait(pbar);
}
void *BusyWork(void *t)
{
volatile int i;
volatile long tid = *((long*)t);
volatile double result = 0.0;
printf("thread %ld in BusyWork!\n", tid);
#ifdef PERFORM_JMP
if (rollbacked)
{
printf("hmm, thread %ld is now doing a longjmp, goodluck!\n", tid);
longjmp(buf[tid], 1);
}
#endif
printf("Thread %ld starting...\n",tid);
for (i = 0; i < 10; i++)
{
result += (tid+1) * i;
printf("%d: tid %ld: result = %g\n", getpid(), tid, result);
my_pthread_barrier_wait(tid, &bar);
}
printf("Thread %ld done. Result = %g\n", tid, result);
//pthread_exit((void*) t);
}
int main (int argc, char *argv[])
{
int rc;
long t;
void *status;
/* Initialize and set thread detached attribute */
pthread_barrier_init(&bar, NULL, NUM_THREADS);
#ifdef PERFORM_JMP
signal(SIGUSR1, sig_handler);
sem_init(&sem, 0, 0);
#endif
for(t = 1; t < NUM_THREADS; t++)
{
printf("Main: creating thread %ld\n", t);
rc = pthread_create_with_stack(BusyWork, t); // This is the line 52
if (rc)
{
printf("ERROR; return code from pthread_create() is %d\n", rc);
exit(-1);
}
}
thread_id[0] = 0;
BusyWork(&thread_id[0]);
/* Free attribute and wait for the other threads */
for(t=1; t<NUM_THREADS; t++)
{
rc = pthread_join(thread[t], &status);
if (rc)
{
printf("ERROR; return code from pthread_join() is %d\n", rc);
exit(-1);
}
printf("Main: completed join with thread %ld having a status"
"of %ld\n",t,(long)status);
}
printf("Main: program completed. Exiting.\n");
pthread_exit(NULL);
}
好點,在這種情況下,我做了一個簡化的假設,我使用自己的線程ID,但是像你說的那樣,有些事情使用真正的線程ID,就像本例中的pthread_barrier_wait一樣。 – MetallicPriest