Carlos Aguni

Highly motivated self-taught IT analyst. Always learning and ready to explore new skills. An eternal apprentice.


Flask Queue

22 Sep 2022 »
from flask import Flask, request, jsonify, json, abort, redirect, url_for, render_template, g
from flask_cors import CORS, cross_origin
import os
import re
import time
import subprocess
import random
import traceback
import datetime
import uuid
from queue import Queue
from threading import Thread, Lock

app = Flask(__name__, template_folder='template')
cors = CORS(app)
app.config['CORS_HEADERS'] = 'Content-Type'

threading_lock = Lock()
myqueue = Queue()
myqueue_res = {}

class Job:
    def __init__(self):
        pass

def worker_runner(tid, q):
    print(tid, "started")
    while True:
        job = q.get()
        try:
            print(tid, job.id, "started")
            job.status = "RUNNING"
            time.sleep(job.sleep)
            job.status = "FINISHED"
        except:
            print(tid, job.id, "failed")
            job.status = "failed"
            job.error = traceback.format_exc()
            job.status = "FAILED"

        print(tid, job.id, "finished")
        job.end = datetime.datetime.now()
        q.task_done()
        time.sleep(1)


for i in range(3):
    worker = Thread(target=worker_runner, args=(f"#{i+1:02d}", myqueue,))
    worker.setDaemon(True)
    worker.start()

def produce_job(job):
    jobid = uuid.uuid4().hex
    job.id = jobid
    job.start = datetime.datetime.now()
    job.end = None
    job.status = "QUEUE"
    myqueue.put(job)
    myqueue_res[jobid] = job

    return jobid

@app.route('/tasks', methods=['GET', 'POST'])
def r_tasks():
    ret = []

    for k,v in myqueue_res.items():
        ret.append({
            "jobid": k,
            "status": v.status,
            "start": v.start,
            "end": v.end,
        })

    return jsonify(ret)

@app.route('/', methods=['GET', 'POST'])
def main():
    global queue
    return "hello world"

@app.route('/produce', methods=['GET', 'POST'])
def r_produce():
    job = Job()
    job.sleep = random.randint(1,8)
    jobid = produce_job(job)
    return f"job created {jobid}"

# gunicorn --workers=2 'app:create_app()' --bind=0.0.0.0:<port>
def create_app():
    return app

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=9000)

    #test
    #with app.test_client() as c:
    #    rs = c.get("/")
    #    print(rs.data)