반응형
python-rq에 유사한 예약된 작업이 있는지 확인하는 방법은 무엇입니까?
서버 시작 시 작업을 예약하기 위해 호출되는 함수는 다음과 같습니다.
그러나 어떻게든 예정된 작업이 반복적으로 호출되고 있으며, 이로 인해 해당 기능에 너무 많은 호출이 발생하고 있습니다.
이것이 여러 개의 함수 호출 때문에 발생하는 것입니까, 아니면 다른 것입니까? 추천해 주세요.
def redis_schedule():
with current_app.app_context():
redis_url = current_app.config["REDIS_URL"]
with Connection(redis.from_url(redis_url)):
q = Queue("notification")
from ..tasks.notification import send_notifs
task = q.enqueue_in(timedelta(minutes=5), send_notifs)
다음은 모든 작업을 검사할 수 있는 상당히 깨끗한 대안입니다:
수동 ID를 작업에 할당할 수 있습니다(예: 전달).
다음은 지정된 ID를 가진 작업이 이미 존재하는지 확인하는 코드 조각입니다. 또한 잘못된 긍정을 방지하기 위해 동일한 ID를 가진 이전 작업을 정리합니다.
def job_exists(job_id, cleanup_stale=True):
try:
job = Job.fetch(id=job_id, connection=Redis())
except NoSuchJobError:
return False
status = job.get_status()
if status in {JobStatus.QUEUED, JobStatus.SCHEDULED}:
# Job exists and will be run.
return True
# Delete old jobs that have been completed, cancelled, stopped etc.
job.delete()
return False
빠르게 실행되는 작업이 연속적으로 빠르게 진행되지 않도록 하려면 작업을 지연(예: 5초)하여 실행하는 것이 좋습니다.
도움이 된다면 알려주세요.
참조 -
읽기 및 검색이 필요합니다. 현재 아래의 논리는 하나밖에 없기 때문에 저에게 적합합니다. 하지만 여러 작업의 경우 올바른 존재 여부를 찾기 위해 이것들을 반복해야 합니다.
def redis_schedule():
with current_app.app_context():
redis_url = current_app.config["REDIS_URL"]
with Connection(redis.from_url(redis_url)):
q = Queue("notification")
if len(q.scheduled_job_registry.get_job_ids()) == 0:
from ..tasks.notification import send_notifs
task = q.enqueue_in(timedelta(seconds=30), send_notifs)
반응형
'개발하자' 카테고리의 다른 글
큐버네티스 포드에 대한 리소스(V1ResourceRequirements) 개체를 기류 내에서 동적으로 구축하는 방법 (0) | 2023.08.19 |
---|---|
인증 시 FastApi 422 처리할 수 없는 엔티티, 수정하는 방법? (0) | 2023.08.18 |
TypeError: _evaluate()는 3개의 위치 인수를 사용하지만 4개는 fastapi에서 제공되었습니다 (0) | 2023.08.17 |
주피터 노트북 내부에서 pytest 테스트 기능 (0) | 2023.08.16 |
Pycharm Jupyter 노트북 wsl: Jupyter 패키지가 설치되지 않았습니다 (0) | 2023.08.16 |