我打算使用Play部署應用程序,並且以前從未使用過他們的「作業」。我的部署將足夠大,需要不同的Play服務器進行負載均衡,但我的計算量不夠大,需要hadoop/storm/others。運行多個服務器時的定期作業
我的問題是,我如何在Play中處理這種情況?如果我在Play中設置一項工作來運行每分鐘,我不希望每個服務器都同時執行完全相同的操作。
我只能找到this answer但我不喜歡任何這些選項。
那麼,是否有任何工具或最佳實踐來協調工作,或者我必須從頭開始做些什麼?
我打算使用Play部署應用程序,並且以前從未使用過他們的「作業」。我的部署將足夠大,需要不同的Play服務器進行負載均衡,但我的計算量不夠大,需要hadoop/storm/others。運行多個服務器時的定期作業
我的問題是,我如何在Play中處理這種情況?如果我在Play中設置一項工作來運行每分鐘,我不希望每個服務器都同時執行完全相同的操作。
我只能找到this answer但我不喜歡任何這些選項。
那麼,是否有任何工具或最佳實踐來協調工作,或者我必須從頭開始做些什麼?
您可以使用數據庫標誌,如下所述:Pere Villega爲Playframework concurrent jobs management提供了兩項工作。
但我認爲Guillaume Bort在Google Groups上使用Memcache的解決方案是最好的解決方案。似乎有一個Play 2模塊:https://github.com/mumoshu/play2-memcached
我會親自使用一個實例運行作業,只是爲了簡單。另外,如果你想更好地控制執行和更好的併發性,並行處理,你可以看看使用Akka而不是Jobs。
您可以使用一個表在數據庫中存儲jobLock,但你必須檢查/在一個單獨的事務更新此鎖(你必須使用JPA.newEntityManager此)
我JobLock類使用某個LockMode枚舉
package enums;
public enum LockMode {
FREE, ACQUIRED;
}
這裏是JobLock類
package models;
import java.util.Date;
import java.util.List;
import javax.persistence.Entity;
import javax.persistence.EntityManager;
import javax.persistence.EnumType;
import javax.persistence.Enumerated;
import javax.persistence.Version;
import play.Logger;
import play.Play;
import play.data.validation.Required;
import play.db.jpa.JPA;
import play.db.jpa.Model;
import utils.Parser;
import enums.LockMode;
import exceptions.ServiceException;
/**
* Technical class that allows to manage a lock in the database thus we can
* synchronize multiple instances that thus cannot run the same job at the same
* time
*
* @author sebastien
*/
@Entity
public class JobLock extends Model {
private static final Long MAX_ACQUISITION_DELAY = Parser.parseLong(Play.configuration.getProperty(
"job.lock.acquisitiondelay", "10000"));
@Required
public String jobName;
public Date acquisitionDate;
@Required
@Enumerated(EnumType.STRING)
public LockMode lockMode;
@Version
public int version;
// STATIC METHODS
// ~~~~~~~~~~~~~~~
/**
* Acquire the lock for the type of job identified by the name parameter.
* Acquisition of the lock is done on a separate transaction thus is
* transaction is as small as possible and other instances will see the lock
* acquisition sooner.
* <p>
* If we do not do that, the other instances will be blocked until the
* instance that acquired the lock have finished is businees transaction
* which could be long on a job.
* </p>
*
* @param name
* the name that identifies a job category, usually it is the job
* simple class name
* @return the lock object if the acquisition is successfull, null otherwise
*/
public static JobLock acquireLock(String name) {
EntityManager em = JPA.newEntityManager();
try {
em.getTransaction().begin();
List<JobLock> locks = em.createQuery("from JobLock where jobName=:name", JobLock.class)
.setParameter("name", name).setMaxResults(1).getResultList();
JobLock lock = locks != null && !locks.isEmpty() ? locks.get(0) : null;
if (lock == null) {
lock = new JobLock();
lock.jobName = name;
lock.acquisitionDate = new Date();
lock.lockMode = LockMode.ACQUIRED;
em.persist(lock);
} else {
if (LockMode.ACQUIRED.equals(lock.lockMode)) {
if ((System.currentTimeMillis() - lock.acquisitionDate.getTime()) > MAX_ACQUISITION_DELAY) {
throw new ServiceException(String.format(
"Lock is held for too much time : there is a problem with job %s", name));
}
return null;
}
lock.lockMode = LockMode.ACQUIRED;
lock.acquisitionDate = new Date();
lock.willBeSaved = true;
}
em.flush();
em.getTransaction().commit();
return lock;
} catch (Exception e) {
// Do not log exception here because it is normal to have exception
// in case of multi-node installation, this is the way to avoid
// multiple job execution
if (em.getTransaction().isActive()) {
em.getTransaction().rollback();
}
// Maybe we have to inverse the test and to define which exception
// is not problematic : exception that denotes concurrency in the
// database are normal
if (e instanceof ServiceException) {
throw (ServiceException) e;
} else {
return null;
}
} finally {
if (em.isOpen()) {
em.close();
}
}
}
/**
* Release the lock on the database thus another instance can take it. This
* action change the {@link #lockMode} and set {@link #acquisitionDate} to
* null. This is done in a separate transaction that can have visibility on
* what happens on the database during the time of the business transaction
*
* @param lock
* the lock to release
* @return true if we managed to relase the lock and false otherwise
*/
public static boolean releaseLock(JobLock lock) {
EntityManager em = JPA.newEntityManager();
if (lock == null || LockMode.FREE.equals(lock.lockMode)) {
return false;
}
try {
em.getTransaction().begin();
lock = em.find(JobLock.class, lock.id);
lock.lockMode = LockMode.FREE;
lock.acquisitionDate = null;
lock.willBeSaved = true;
em.persist(lock);
em.flush();
em.getTransaction().commit();
return true;
} catch (Exception e) {
if (em.getTransaction().isActive()) {
em.getTransaction().rollback();
}
Logger.error(e, "Error during commit of lock release");
return false;
} finally {
if (em.isOpen()) {
em.close();
}
}
}
}
,這裏是我的LockAwareJob使用該鎖
package jobs;
import models.JobLock;
import notifiers.ExceptionMails;
import play.Logger;
import play.jobs.Job;
public abstract class LockAwareJob<V> extends Job<V> {
@Override
public final void doJob() throws Exception {
String name = this.getClass().getSimpleName();
try {
JobLock lock = JobLock.acquireLock(name);
if (lock != null) {
Logger.info("Starting %s", name);
try {
doJobWithLock(lock);
} finally {
if (!JobLock.releaseLock(lock)) {
Logger.error("Lock acquired but cannot be released for %s", name);
}
Logger.info("End of %s", name);
}
} else {
Logger.info("Another node is running %s : nothing to do", name);
}
} catch (Exception ex) {
ExceptionMails.exception(ex, String.format("Error while executing job %s", name));
throw ex;
}
}
@Override
public final V doJobWithResult() throws Exception {
String name = this.getClass().getSimpleName();
try {
JobLock lock = JobLock.acquireLock(name);
if (lock != null) {
Logger.info("Starting %s", name);
try {
return resultWithLock(lock);
} finally {
if (!JobLock.releaseLock(lock)) {
Logger.error("Lock acquired but cannot be released for %s", name);
}
Logger.info("End of %s", name);
}
} else {
Logger.info("Another node is running %s : nothing to do", name);
return resultWithoutLock();
}
} catch (Exception ex) {
ExceptionMails.exception(ex, String.format("Error while executing job %s", name));
throw ex;
}
}
public void doJobWithLock(JobLock lock) throws Exception {
}
public V resultWithLock(JobLock lock) throws Exception {
doJobWithLock(lock);
return null;
}
public V resultWithoutLock() throws Exception {
return null;
}
}
在我的log4j.properties我添加了一個專用線,以避免每次一個實例失敗獲取具有錯誤的作業鎖定
log4j.logger.org.hibernate.event.def.AbstractFlushingEventListener=FATAL
有了這個解決方案,您還可以使用JobLock ID存儲參數與此工作相關聯(例如上次運行日期)
帕拉科,你找到解決方案嗎? – angelokh
嗨Angelohk。嗯,是的,這個問題真的很老,從那以後我多次遇到這個場景,並且已經應用了不同的解決方案來解決這個問題,但是這個想法一直是相同的,這是圍繞某種併發控制鎖定在數據庫中。不同的數據庫引擎允許我做不同的事情。儘管如此,關於併發訪問數據庫以獲取來自不同播放作業的鎖本身(或鎖),您需要小心實施。 – palako