feature(redis): 引入redis作为缓存机制

fix(cp_run): 修复存储时间戳
feature(preview): preview 施工中
This commit is contained in:
ClovertaTheTrilobita 2025-09-17 22:43:36 +03:00
parent 297284bf27
commit 0349c190d4
5 changed files with 131 additions and 28 deletions

View file

@ -3,15 +3,10 @@ from cellpose.io import imread, save_masks
from PIL import Image from PIL import Image
import numpy as np import numpy as np
import os, datetime import os, datetime
from typing import Literal import time
from sympy import false
class Cprun: class Cprun:
# def __init__(self, model: str | Literal["cpsam"], images: list[str] | str):
# self.model = model
# self.images = images
@classmethod @classmethod
def run_test(cls): def run_test(cls):
@ -26,7 +21,7 @@ class Cprun:
f"[{i}] mask max={int(getattr(m, 'max', lambda: 0)()) if hasattr(m, 'max') else int(np.max(m))}, unique={np.unique(m)[:5]} ..." f"[{i}] mask max={int(getattr(m, 'max', lambda: 0)()) if hasattr(m, 'max') else int(np.max(m))}, unique={np.unique(m)[:5]} ..."
) )
ts = datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S") ts = datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S") + f"-{int(time.time()*1000)%1000:03d}"
outdir = os.path.join(os.path.dirname(__file__), "test_output", ts) outdir = os.path.join(os.path.dirname(__file__), "test_output", ts)
os.makedirs(outdir, exist_ok=True) # 自动创建目录 os.makedirs(outdir, exist_ok=True) # 自动创建目录
for img, mask, flow, name in zip(imgs, masks, flows, files): for img, mask, flow, name in zip(imgs, masks, flows, files):
@ -43,8 +38,8 @@ class Cprun:
@classmethod @classmethod
async def run(cls, async def run(cls,
images: list[str] | str | None = None, images: list[str] | str | None = None,
time: datetime.datetime | None = None, time: str | None = None,
model: str | str = "cpsam", model: str = "cpsam",
diameter: float | None = None, diameter: float | None = None,
flow_threshold: float | float = 0.4, flow_threshold: float | float = 0.4,
cellprob_threshold: float | float = 0.0, ): cellprob_threshold: float | float = 0.0, ):
@ -67,8 +62,8 @@ class Cprun:
diameter=diameter diameter=diameter
) )
ts = datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S") ts = time
outdir = os.path.join(os.path.dirname(__file__), "run_output", ts) outdir = os.path.join(os.path.dirname(__file__), "output", ts)
os.makedirs(outdir, exist_ok=True) # 自动创建目录 os.makedirs(outdir, exist_ok=True) # 自动创建目录
for img, mask, flow, name in zip(imgs, masks, flows, files): for img, mask, flow, name in zip(imgs, masks, flows, files):
base = os.path.join(outdir, os.path.splitext(os.path.basename(name))[0]) base = os.path.join(outdir, os.path.splitext(os.path.basename(name))[0])
@ -82,4 +77,5 @@ class Cprun:
Image.fromarray(over).save(base + "_overlay.png") Image.fromarray(over).save(base + "_overlay.png")
message.append(f"Output saved to: {outdir}") message.append(f"Output saved to: {outdir}")
message.append(outdir)
return [True, message] return [True, message]

View file

@ -1,39 +1,74 @@
import asyncio
from concurrent.futures import ThreadPoolExecutor
from flask import Flask, send_from_directory, request, jsonify from flask import Flask, send_from_directory, request, jsonify
import os, shutil, time, threading, datetime import os, shutil, time, threading, datetime, json, redis
from werkzeug.utils import secure_filename from werkzeug.utils import secure_filename
from flask_cors import CORS from flask_cors import CORS
from pathlib import Path from pathlib import Path
from cp_run import Cprun
app = Flask(__name__) app = Flask(__name__)
CORS(app) CORS(app)
BASE_DIR = Path(__file__).resolve().parent BASE_DIR = Path(__file__).resolve().parent
UPLOAD_DIR = BASE_DIR / "uploads" UPLOAD_DIR = BASE_DIR / "uploads"
os.makedirs(UPLOAD_DIR, exist_ok=True) os.makedirs(UPLOAD_DIR, exist_ok=True)
executor = ThreadPoolExecutor(max_workers=4)
TASKS = {}
r = redis.Redis(host="127.0.0.1", port=6379, db=0)
def run_flask(): # 启动测试服务器
def run_dev():
app.run(host="10.147.18.141", port=5000) app.run(host="10.147.18.141", port=5000)
def set_status(task_id, status, **extra):
payload = {"status": status, "updated_at": datetime.datetime.utcnow().isoformat(), **extra}
r.set(f"task:{task_id}", json.dumps(payload), ex=86400) # 1 天过期
def get_status(task_id):
raw = r.get(f"task:{task_id}")
return json.loads(raw) if raw else None
@app.route("/") @app.route("/")
def index(): def index():
return "<h1>Hello</h1><p>This is the backend of our cellpose server, please visit our website.</p>" return "<h1>Hello</h1><p>This is the backend of our cellpose server, please visit our website.</p>"
@app.route("/testdl") @app.get("/testdl")
def test_download(): def test_download():
return send_from_directory("test_output/2025-09-16-20-03-51", "img_overlay.png", as_attachment=True) return send_from_directory("test_output/2025-09-16-20-03-51", "img_overlay.png", as_attachment=True)
@app.route("/dl/<timestamp>") @app.get("/dl/<timestamp>")
def download(timestamp): def download(timestamp):
input_dir = os.path.join("output", timestamp) input_dir = os.path.join(BASE_DIR, "output", timestamp)
output_dir = os.path.join("output/tmp", timestamp) # 不要加 .zipmake_archive 会自动加 output_dir = os.path.join(BASE_DIR, "output/tmp", timestamp) # 不要加 .zipmake_archive 会自动加
os.makedirs("output/tmp", exist_ok=True) # 确保 tmp 存在 os.makedirs(BASE_DIR / "output/tmp", exist_ok=True) # 确保 tmp 存在
shutil.make_archive(output_dir, 'zip', input_dir) shutil.make_archive(output_dir, 'zip', input_dir)
print(f"压缩完成: {output_dir}.zip") print(f"压缩完成: {output_dir}.zip")
return send_from_directory("output/tmp", f"{timestamp}.zip", as_attachment=True) return send_from_directory("output/tmp", f"{timestamp}.zip", as_attachment=True)
@app.post("/upload") @app.post("/upload")
def upload(): def upload():
ts = datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S") """
接收上传的文件并将其发送给cellpose
:return:
"""
# 从请求中获取参数,若没有则设定为默认值
model = request.args.get("model") or request.form.get("model") or "cpsam"
def _to_float(x, default):
try:
return float(x)
except (TypeError, ValueError):
return default
flow_threshold = _to_float(request.args.get("flow_threshold") or request.form.get("flow_threshold"), 0.4)
cellprob_threshold = _to_float(request.args.get("cellprob_threshold") or request.form.get("cellprob_threshold"),
0.0)
diameter_raw = request.args.get("diameter") or request.form.get("diameter")
diameter = _to_float(diameter_raw, None) if diameter_raw not in (None, "") else None
# 将文件保存在本地目录中
ts = datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S") + f"-{int(time.time()*1000)%1000:03d}"
os.makedirs(UPLOAD_DIR / ts, exist_ok=True) os.makedirs(UPLOAD_DIR / ts, exist_ok=True)
files = request.files.getlist("files") files = request.files.getlist("files")
saved = [] saved = []
@ -42,5 +77,41 @@ def upload():
continue continue
name = secure_filename(f.filename) name = secure_filename(f.filename)
f.save(os.path.join(UPLOAD_DIR / ts, name)) f.save(os.path.join(UPLOAD_DIR / ts, name))
saved.append(name) saved.append(os.path.join(UPLOAD_DIR, ts, name))
return jsonify({"ok": True, "count": len(saved), "files": saved, "id": ts})
# 新建一个线程,防止返回被阻塞
def job():
return asyncio.run(Cprun.run(
images=saved, model=model,
cellprob_threshold=cellprob_threshold,
flow_threshold=flow_threshold,
diameter=diameter, time=ts
))
# 将线程状态存入redis
set_status(ts, "running")
fut = executor.submit(job)
def done_cb(f):
try:
f.result()
set_status(ts, "success")
except Exception as e:
set_status(ts, "failed", error=str(e))
fut.add_done_callback(done_cb)
return jsonify({"ok": True, "count": len(saved), "id": ts})
@app.get("/status/")
def status():
"""
检查某一cellpose任务是否完成
:return:
"""
task_id = request.args.get('id')
st = get_status(task_id)
if not st:
return jsonify({"ok": True, "exists": False, "status": "not_found"}), 200
return jsonify({"ok": True, "exists": True, **st}), 200

View file

@ -1,10 +1,10 @@
from cp_run import Cprun from cp_run import Cprun
from flaskApp import run_flask from flaskApp import run_dev
from multiprocessing import Process from multiprocessing import Process
if __name__ == "__main__": if __name__ == "__main__":
# Cprun.run_test() # Cprun.run_test()
p = Process(target=run_flask) p = Process(target=run_dev)
p.start() p.start()
print(f"Flask running in PID {p.pid}") print(f"Flask running in PID {p.pid}")

View file

@ -1,3 +1,37 @@
<!DOCTYPE html> <!DOCTYPE html>
<meta charset="UTF-8"> <meta charset="UTF-8" />
<h1>This is preview</h1> <h1>This is preview</h1>
<p id="none-exist" hidden>id not exists</p>
<script src="https://cdn.jsdelivr.net/npm/axios/dist/axios.min.js"></script>
<script type="module">
const API_BASE = "http://10.147.18.141:5000/status/";
const params = new URLSearchParams(window.location.search);
const ID = params.get("id");
const msg = document.getElementById("none-exist");
// 没有 id直接提示
if (!ID) {
msg.textContent = "missing id in URL";
msg.hidden = false;
} else {
try {
const res = await axios.get(API_BASE + "?id=" + encodeURIComponent(ID));
const { exists, status } = res.data; // exists: boolean, status: "running" | "success" | "failed"...
// 只在“不存在”时显示这条消息
if (!exists) {
msg.textContent = `id "${ID}" not exists`;
msg.hidden = false;
} else {
msg.hidden = true; // 存在就隐藏这条“not exists”的提示
}
} catch (e) {
msg.textContent = "request failed";
msg.hidden = false;
console.error(e);
}
}
</script>

View file

@ -2,5 +2,7 @@ numpy~=2.1.2
cellpose~=4.0.6 cellpose~=4.0.6
pillow~=11.0.0 pillow~=11.0.0
sympy~=1.13.3 sympy~=1.13.3
redis~=6.4.0
flask~=3.1.2 flask~=3.1.2
werkzeug~=3.1.3 werkzeug~=3.1.3
flask-cors~=6.0.1