我正在努力實現一個工作流引擎到我的項目中,我的嘗試的主要目的是創建一個便攜式應用程序。有些東西,我將來可以放入任何其他項目中,然後將工作流程附加到項目中的不同模型中並使其運行。Django工作流引擎使用信號和Celery-django
我試圖想到一種方法,但它似乎並不是完美的設置。我想在我的項目中創建一個工作流應用程序,附加兩種模型,其中一些將包含工作流(工作流,步驟,操作)的設置,其他模型將包含實例/事務。
下面是我的工作流程/ models.py
from django.db import models
from django.contrib.contenttypes.models import ContentType
from django.contrib.contenttypes import generic
import signals
################################
# Workflow engine models
################################
class Signal_Listener(models.Model):
LISTENING_ON = (
('I', 'INSERT'),
('U', 'UPDATE'),
('D', 'DELETE'),
('S', 'SELECT'),
('A', 'ANY QUERY'),
('L', 'ANY DDL'),
)
table_name = models.CharField(max_length=100)
listening_to = models.CharField(max_length=1, choices=LISTENING_ON)
class Meta:
unique_together = (("table_name", "listening_to"),)
def __unicode__(self):
return '%s - %s' % (self.table_name, self.listening_to)
class Action(models.Model):
ACTION_TYPE_CHOICES = (
('P', 'Python Script' ),
('C', 'Class name' ),
)
name = models.CharField(max_length=100)
action_type = models.CharField(max_length=1, choices=ACTION_TYPE_CHOICES)
audit_obj = generic.GenericRelation('core.Audit', editable=False)
class Steps(models.Model):
sequence = models.IntegerField()
Action = models.ForeignKey(Action)
Signal_Listener = models.ForeignKey(Signal_Listener)
class Process(models.Model):
## TODO: Document
# Processes class is used to store information about the process itself.
# Or in another word, the workflow name.
WF_TYPE_LIST = (
('P', 'Python-API'),
)
name = models.CharField(max_length=30)
is_active = models.BooleanField()
wf_type = models.CharField(max_length=1, choices=WF_TYPE_LIST)
audit_obj = generic.GenericRelation('core.Audit', editable=False)
listening_to = models.ForeignKey(Steps)
################################
# Workflow transactions models
################################
class Instance(models.Model):
## TODO: Document
# Re
INSTANCE_STATUS = (
('I', 'In Progress'),
('C', 'Cancelled' ),
('A', 'Archived' ), # Old completed tasks can be archived
('P', 'Pending' ),
('O', 'Completed' )
)
id = models.CharField(max_length=200, primary_key=True)
status = models.CharField(max_length=1, choices=INSTANCE_STATUS, db_index=True)
audit_obj = generic.GenericRelation('core.Audit', editable=False)
def save(self, *args, **kwargs):
# on new records generate a new uuid
if self.id is None or self.id.__len__() is 0:
import uuid
self.id = uuid.uuid4().__str__()
super(Instances, self).save(*args, **kwargs)
class Task(models.Model):
TASK_STATUS = (
('S', 'Assigned' ),
('I', 'In Progress'),
('P', 'Pending' ),
('C', 'Cancelled' ),
('A', 'Archived' ), # Old completed tasks can be archived
('O', 'Completed' )
)
name = models.CharField(max_length=100)
instance = models.ForeignKey(Instance)
status = models.CharField(max_length=1, choices=TASK_STATUS)
bio = models.CharField(max_length=100)
audit_obj = generic.GenericRelation('core.Audit', editable=False)
,我也有一個工作流/ signals.py
"""
Workflow Signals
Will be used to monitor all inserts, update, delete or select statements
If an action is attached to that particular table, it will be inserted Celery-Tasks distribution.
"""
from django.db.models.signals import post_save, post_delete
from django.core.cache import cache
def workflow_post_init_listener(sender, **kwargs):
try:
if cache.get('wf_listner_cache_%s' % kwargs['instance']._meta.db_table):
pass
else:
record = 'Signal_Listener'.objects.get(table_name__exact=kwargs['instance']._meta.db_table)
# am not sure what to do next!
except 'Signal_Listener'.DoesNotExist:
# TODO: Error logging
pass
post_save.connect(workflow_post_init_listener, dispatch_uid="workflow.models.listener.save")
我覺得我的模型設計可能需要提高爲好。我可以在幾種情況下使用,並且我想從批准週期開始,例如,我可以在signal_listener中插入table_name/model_name以監視新插入。如果是這樣,我會觸發一個特定的工作流程。
至於行動,我明白行動將需要發展。也許我需要在工作流應用程序下創建一個操作文件夾,將每個操作放在一個類中。每個人都會完成發送電子郵件,存檔,更新數據庫值的特殊任務。如果你正在重新發明輪子,並且如果有這樣的東西已經被開發出來,任何人都可以建議,我會更樂意通過它。例如,你可以去看看的zope.wfmc
最好的問候,
我認爲你的問題可能會被改寫。目前尚不清楚你真正想問什麼。 – roman 2011-01-11 00:37:43