Skip to content

biohub.core.tasks

hsfzxjy edited this page Aug 8, 2017 · 2 revisions

概述

此为 biohub 异步任务模块,需安装并正确配置 redis

示例

# biohub/some_app/tasks.py

import time
from biohub.core.tasks import Task

class MyTask(Task):

    def run(self, a, b):
        time.sleep(10)
        result = a + b
        # Store result or broadcast to user via websocket

调用:

from biohub.core.tasks import apply_async
from biohub.some_app.tasks import MyTask # 注意此处一定要用完整路径引入

MyTask.apply_async(4, 4)
apply_async(MyTask, args=(4,4)) # 和上一条等价
apply_async(MyTask, kwargs=dict(a=4, b=4)) # 和上一条等价
apply_async(MyTask, args=(4,4), timeout=5) # 设置时限

apply_async 方法不会阻塞,它会返回一个 MyTask 实例:

task_instance = MyTask.apply_async(4, 4)
print(isinstance(task_instance, MyTask)) # True

使用 task_instance.status 可以查询运行状态。status 有如下几个取值:

  • tasks.GONE: 任务不存在
  • tasks.PENDING: 等待执行
  • tasks.RUNNING: 正在执行
  • tasks.TIMEOUT: 超时
  • tasks.DONE: 在正常时间内结束

注意,一个已完成(超时或正常结束)的任务的状态只会保留两个小时,两个小时后再查询结果就会转为 GONE

可以通过 task_instance.task_id 获得任务的 id,此 id 可在之后被用于状态查询:

task_id = task_instance.task_id

# .........

from biohub.core.tasks import get_status
print(get_status(task_id))

相关配置

可以在 config.json 中设置任务队列的参数:

  • MAX_TASKS: 最多可以同时执行的任务数目,默认为 5 * 你的 CPU 数
  • TASK_MAX_TIMEOUT:最大超时秒数,默认为 180

注意事项

此模块使用多线程技术完成(曾想使用多进程,但暂时失败了...),然而 Python 并没有提供终止线程的接口(事实上也不应该暴力终止线程,会造成不稳定),因此 当一个任务超时时并没有合适的手段将其从外部终止,终止操作需任务自己来完成Task 类提供了一个 check_interrupt 方法,当任务未超时时,调用此方法不做任何操作;当任务超时时,调用此方法会抛出一个异常以终止任务。如果需要写一个相当耗时甚至会死循环的任务,建议 频繁并适时地调用 check_interrupt 方法

class BadTask(Task):

    def run(self):
        time.sleep(10)
        # Do something

class GoodTask(Task):

    def run(self):
        for _ in range(10):
            time.sleep(1)
            self.check_interrupt()
            # Do something

    def before_interrupt(self):
        # 此处可以在中断前做清理操作
Clone this wiki locally