-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathapp.py
103 lines (76 loc) · 2.69 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import dramatiq
import os
import time
from contextlib import closing
from dramatiq.brokers.rabbitmq import URLRabbitmqBroker
from flask import Flask, redirect, render_template, request
from sqlalchemy import Column, DateTime, Integer, String, create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.sql import func
from threading import local
app = Flask(__name__)
database_url = os.getenv("DATABASE_URL", os.getenv("SQLITE_URL"))
database = create_engine(database_url)
Model = declarative_base()
Session = sessionmaker(bind=database)
broker_url = os.getenv("CLOUDAMQP_URL", os.getenv("BROKER_URL"))
broker = URLRabbitmqBroker(broker_url)
dramatiq.set_broker(broker)
class AppContextMiddleware(dramatiq.Middleware):
state = local()
def __init__(self, app):
self.app = app
def before_process_message(self, broker, message):
context = self.app.app_context()
context.push()
self.state.context = context
def after_process_message(self, broker, message, *, result=None, exception=None):
try:
context = self.state.context
context.pop(exception)
del self.state.context
except AttributeError:
pass
after_skip_message = after_process_message
broker.add_middleware(AppContextMiddleware(app))
class Job(Model):
__tablename__ = "jobs"
TYPE_SLOW = "slow"
TYPE_FAST = "fast"
TYPES = (TYPE_SLOW, TYPE_FAST)
STATUS_PENDING = "pending"
STATUS_DONE = "done"
STATUSES = (STATUS_PENDING, STATUS_DONE)
id = Column(Integer, primary_key=True)
type = Column(String(10), nullable=False)
status = Column(String(10), default=STATUS_PENDING, nullable=False)
created_at = Column(DateTime(timezone=True), server_default=func.now())
def process(self):
if self.type == Job.TYPE_SLOW:
time.sleep(30)
elif self.type == Job.TYPE_FAST:
time.sleep(1)
else:
raise ValueError("Unknown job type.")
@dramatiq.actor
def process_job(job_id):
with closing(Session()) as session:
job = session.query(Job).get(job_id)
job.process()
job.status = Job.STATUS_DONE
session.add(job)
session.commit()
@app.route("/jobs", methods=["POST"])
def add_job():
with closing(Session()) as session:
job = Job(type=request.form["type"])
session.add(job)
session.commit()
process_job.send(job.id)
return redirect("/")
@app.route("/")
def index():
with closing(Session()) as session:
jobs = session.query(Job).order_by(Job.created_at.desc()).all()
return render_template("index.html", jobs=jobs)