2012-12-07 59 views
5

我打算使用Play部署應用程序,並且以前從未使用過他們的「作業」。我的部署將足夠大,需要不同的Play服務器進行負載均衡,但我的計算量不夠大,需要hadoop/storm/others。運行多個服務器時的定期作業

我的問題是,我如何在Play中處理這種情況?如果我在Play中設置一項工作來運行每分鐘,我不希望每個服務器都同時執行完全相同的操作。

我只能找到this answer但我不喜歡任何這些選項。

那麼,是否有任何工具或最佳實踐來協調工作,或者我必須從頭開始做些什麼?

+0

帕拉科,你找到解決方案嗎? – angelokh

+0

嗨Angelohk。嗯,是的,這個問題真的很老,從那以後我多次遇到這個場景,並且已經應用​​了不同的解決方案來解決這個問題,但是這個想法一直是相同的,這是圍繞某種併發控制鎖定在數據庫中。不同的數據庫引擎允許我做不同的事情。儘管如此,關於併發訪問數據庫以獲取來自不同播放作業的鎖本身(或鎖),您需要小心實施。 – palako

回答

0

我會親自使用一個實例運行作業,只是爲了簡單。另外,如果你想更好地控制執行和更好的併發性,並行處理,你可以看看使用Akka而不是Jobs。

0

您可以使用一個表在數據庫中存儲jobLock,但你必須檢查/在一個單獨的事務更新此鎖(你必須使用JPA.newEntityManager此)

我JobLock類使用某個LockMode枚舉

package enums; 

public enum LockMode { 
    FREE, ACQUIRED; 
} 

這裏是JobLock類

package models; 

import java.util.Date; 
import java.util.List; 

import javax.persistence.Entity; 
import javax.persistence.EntityManager; 
import javax.persistence.EnumType; 
import javax.persistence.Enumerated; 
import javax.persistence.Version; 

import play.Logger; 
import play.Play; 
import play.data.validation.Required; 
import play.db.jpa.JPA; 
import play.db.jpa.Model; 
import utils.Parser; 
import enums.LockMode; 
import exceptions.ServiceException; 

/** 
* Technical class that allows to manage a lock in the database thus we can 
* synchronize multiple instances that thus cannot run the same job at the same 
* time 
* 
* @author sebastien 
*/ 
@Entity 
public class JobLock extends Model { 

    private static final Long MAX_ACQUISITION_DELAY = Parser.parseLong(Play.configuration.getProperty(
      "job.lock.acquisitiondelay", "10000")); 

    @Required 
    public String jobName; 

    public Date acquisitionDate; 

    @Required 
    @Enumerated(EnumType.STRING) 
    public LockMode lockMode; 

    @Version 
    public int version; 

    // STATIC METHODS 
    // ~~~~~~~~~~~~~~~ 

    /** 
    * Acquire the lock for the type of job identified by the name parameter. 
    * Acquisition of the lock is done on a separate transaction thus is 
    * transaction is as small as possible and other instances will see the lock 
    * acquisition sooner. 
    * <p> 
    * If we do not do that, the other instances will be blocked until the 
    * instance that acquired the lock have finished is businees transaction 
    * which could be long on a job. 
    * </p> 
    * 
    * @param name 
    *   the name that identifies a job category, usually it is the job 
    *   simple class name 
    * @return the lock object if the acquisition is successfull, null otherwise 
    */ 
    public static JobLock acquireLock(String name) { 
     EntityManager em = JPA.newEntityManager(); 
     try { 
      em.getTransaction().begin(); 
      List<JobLock> locks = em.createQuery("from JobLock where jobName=:name", JobLock.class) 
        .setParameter("name", name).setMaxResults(1).getResultList(); 
      JobLock lock = locks != null && !locks.isEmpty() ? locks.get(0) : null; 
      if (lock == null) { 
       lock = new JobLock(); 
       lock.jobName = name; 
       lock.acquisitionDate = new Date(); 
       lock.lockMode = LockMode.ACQUIRED; 
       em.persist(lock); 
      } else { 
       if (LockMode.ACQUIRED.equals(lock.lockMode)) { 
        if ((System.currentTimeMillis() - lock.acquisitionDate.getTime()) > MAX_ACQUISITION_DELAY) { 
         throw new ServiceException(String.format(
           "Lock is held for too much time : there is a problem with job %s", name)); 
        } 
        return null; 
       } 
       lock.lockMode = LockMode.ACQUIRED; 
       lock.acquisitionDate = new Date(); 
       lock.willBeSaved = true; 
      } 
      em.flush(); 
      em.getTransaction().commit(); 
      return lock; 
     } catch (Exception e) { 
      // Do not log exception here because it is normal to have exception 
      // in case of multi-node installation, this is the way to avoid 
      // multiple job execution 
      if (em.getTransaction().isActive()) { 
       em.getTransaction().rollback(); 
      } 
      // Maybe we have to inverse the test and to define which exception 
      // is not problematic : exception that denotes concurrency in the 
      // database are normal 
      if (e instanceof ServiceException) { 
       throw (ServiceException) e; 
      } else { 
       return null; 
      } 
     } finally { 
      if (em.isOpen()) { 
       em.close(); 
      } 
     } 
    } 

    /** 
    * Release the lock on the database thus another instance can take it. This 
    * action change the {@link #lockMode} and set {@link #acquisitionDate} to 
    * null. This is done in a separate transaction that can have visibility on 
    * what happens on the database during the time of the business transaction 
    * 
    * @param lock 
    *   the lock to release 
    * @return true if we managed to relase the lock and false otherwise 
    */ 
    public static boolean releaseLock(JobLock lock) { 
     EntityManager em = JPA.newEntityManager(); 

     if (lock == null || LockMode.FREE.equals(lock.lockMode)) { 
      return false; 
     } 

     try { 
      em.getTransaction().begin(); 
      lock = em.find(JobLock.class, lock.id); 
      lock.lockMode = LockMode.FREE; 
      lock.acquisitionDate = null; 
      lock.willBeSaved = true; 
      em.persist(lock); 
      em.flush(); 
      em.getTransaction().commit(); 
      return true; 
     } catch (Exception e) { 
      if (em.getTransaction().isActive()) { 
       em.getTransaction().rollback(); 
      } 
      Logger.error(e, "Error during commit of lock release"); 
      return false; 
     } finally { 
      if (em.isOpen()) { 
       em.close(); 
      } 
     } 
    } 
} 

,這裏是我的LockAwareJob使用該鎖

package jobs; 

import models.JobLock; 
import notifiers.ExceptionMails; 
import play.Logger; 
import play.jobs.Job; 

public abstract class LockAwareJob<V> extends Job<V> { 

    @Override 
    public final void doJob() throws Exception { 
     String name = this.getClass().getSimpleName(); 
     try { 
      JobLock lock = JobLock.acquireLock(name); 
      if (lock != null) { 
       Logger.info("Starting %s", name); 
       try { 
        doJobWithLock(lock); 
       } finally { 
        if (!JobLock.releaseLock(lock)) { 
         Logger.error("Lock acquired but cannot be released for %s", name); 
        } 
        Logger.info("End of %s", name); 
       } 
      } else { 
       Logger.info("Another node is running %s : nothing to do", name); 
      } 
     } catch (Exception ex) { 
      ExceptionMails.exception(ex, String.format("Error while executing job %s", name)); 
      throw ex; 
     } 
    } 

    @Override 
    public final V doJobWithResult() throws Exception { 
     String name = this.getClass().getSimpleName(); 
     try { 
      JobLock lock = JobLock.acquireLock(name); 
      if (lock != null) { 
       Logger.info("Starting %s", name); 
       try { 
        return resultWithLock(lock); 
       } finally { 
        if (!JobLock.releaseLock(lock)) { 
         Logger.error("Lock acquired but cannot be released for %s", name); 
        } 
        Logger.info("End of %s", name); 
       } 
      } else { 
       Logger.info("Another node is running %s : nothing to do", name); 
       return resultWithoutLock(); 
      } 
     } catch (Exception ex) { 
      ExceptionMails.exception(ex, String.format("Error while executing job %s", name)); 
      throw ex; 
     } 
    } 

    public void doJobWithLock(JobLock lock) throws Exception { 
    } 

    public V resultWithLock(JobLock lock) throws Exception { 
     doJobWithLock(lock); 
     return null; 
    } 

    public V resultWithoutLock() throws Exception { 
     return null; 
    } 
} 

在我的log4j.properties我添加了一個專用線,以避免每次一個實例失敗獲取具有錯誤的作業鎖定

log4j.logger.org.hibernate.event.def.AbstractFlushingEventListener=FATAL 

有了這個解決方案,您還可以使用JobLock ID存儲參數與此工作相關聯(例如上次運行日期)

相關問題