diff --git a/.idea/cellpose-web-backend.iml b/.idea/cellpose-web.iml similarity index 82% rename from .idea/cellpose-web-backend.iml rename to .idea/cellpose-web.iml index 3c7d613..82e95be 100644 --- a/.idea/cellpose-web-backend.iml +++ b/.idea/cellpose-web.iml @@ -1,7 +1,7 @@ - + diff --git a/.idea/modules.xml b/.idea/modules.xml index 89ae0ac..6067172 100644 --- a/.idea/modules.xml +++ b/.idea/modules.xml @@ -2,7 +2,7 @@ - + \ No newline at end of file diff --git a/cp_run.py b/cp_run.py index 49d6e43..c69c95b 100644 --- a/cp_run.py +++ b/cp_run.py @@ -3,29 +3,83 @@ from cellpose.io import imread, save_masks from PIL import Image import numpy as np import os, datetime +from typing import Literal -def test(): - model = models.CellposeModel(gpu=True) - files = ['img.png'] - imgs = [imread(f) for f in files] - masks, flows, styles = model.eval( - imgs, flow_threshold=0.4, cellprob_threshold=0.0 - ) - for i, m in enumerate(masks): - print( - f"[{i}] mask max={int(getattr(m, 'max', lambda: 0)()) if hasattr(m, 'max') else int(np.max(m))}, unique={np.unique(m)[:5]} ..." +from sympy import false + + +class Cprun: + # def __init__(self, model: str | Literal["cpsam"], images: list[str] | str): + # self.model = model + # self.images = images + + @classmethod + def run_test(cls): + model = models.CellposeModel(gpu=True) + files = ['test_tif/img.png'] + imgs = [imread(f) for f in files] + masks, flows, styles = model.eval( + imgs, flow_threshold=0.4, cellprob_threshold=0.0 + ) + for i, m in enumerate(masks): + print( + f"[{i}] mask max={int(getattr(m, 'max', lambda: 0)()) if hasattr(m, 'max') else int(np.max(m))}, unique={np.unique(m)[:5]} ..." + ) + + ts = datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S") + outdir = os.path.join(os.path.dirname(__file__), "test_output", ts) + os.makedirs(outdir, exist_ok=True) # 自动创建目录 + for img, mask, flow, name in zip(imgs, masks, flows, files): + base = os.path.join(outdir, os.path.splitext(os.path.basename(name))[0]) + #使用内置绘图生成蒙版 + out = base + "_output" + save_masks(imgs, mask, flow, out, tif=True) + + # 用 plot 生成彩色叠加图(不依赖 skimage) + rgb = plot.image_to_rgb(img, channels=[0, 0]) # 原图转 RGB + over = plot.mask_overlay(rgb, masks=mask, colors=None) # 叠加彩色实例 + Image.fromarray(over).save(base + "_overlay.png") + + @classmethod + async def run(cls, + images: list[str] | str | None = None, + time: datetime.datetime | None = None, + model: str | str = "cpsam", + diameter: float | None = None, + flow_threshold: float | float = 0.4, + cellprob_threshold: float | float = 0.0, ): + + if time is None: + return [False, "No time received"] + + if images is None: + return [False, "No images received"] + + message = [f"Using {model} model"] + + model = models.CellposeModel(gpu=True, model_type=model) + files = images + imgs = [imread(f) for f in files] + masks, flows, styles = model.eval( + imgs, + flow_threshold=flow_threshold, + cellprob_threshold=cellprob_threshold, + diameter=diameter ) - ts = datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S") - outdir = os.path.join(os.path.dirname(__file__), "test_output", ts) - os.makedirs(outdir, exist_ok=True) # 自动创建目录 - for img, mask, flow, name in zip(imgs, masks, flows, files): - base = os.path.join(outdir, os.path.splitext(os.path.basename(name))[0]) - #使用内置绘图生成蒙版 - out = base + "_output" - save_masks(imgs, mask, flow, out, tif=True) + ts = datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S") + outdir = os.path.join(os.path.dirname(__file__), "run_output", ts) + os.makedirs(outdir, exist_ok=True) # 自动创建目录 + for img, mask, flow, name in zip(imgs, masks, flows, files): + base = os.path.join(outdir, os.path.splitext(os.path.basename(name))[0]) + # 使用内置绘图生成蒙版 + out = base + "_output" + save_masks(imgs, mask, flow, out, tif=True) - # 用 plot 生成彩色叠加图(不依赖 skimage) - rgb = plot.image_to_rgb(img, channels=[0, 0]) # 原图转 RGB - over = plot.mask_overlay(rgb, masks=mask, colors=None) # 叠加彩色实例 - Image.fromarray(over).save(base + "_overlay.png") + # 用 plot 生成彩色叠加图(不依赖 skimage) + rgb = plot.image_to_rgb(img, channels=[0, 0]) # 原图转 RGB + over = plot.mask_overlay(rgb, masks=mask, colors=None) # 叠加彩色实例 + Image.fromarray(over).save(base + "_overlay.png") + + message.append(f"Output saved to: {outdir}") + return [True, message] \ No newline at end of file diff --git a/cp_train.py b/cp_train.py new file mode 100644 index 0000000..e69de29 diff --git a/flaskApp.py b/flaskApp.py new file mode 100644 index 0000000..762b714 --- /dev/null +++ b/flaskApp.py @@ -0,0 +1,40 @@ +from flask import Flask, send_from_directory, request, jsonify +import os, shutil, time, threading +from werkzeug.utils import secure_filename + +app = Flask(__name__) +UPLOAD_DIR = "./uploads" +os.makedirs(UPLOAD_DIR, exist_ok=True) + +def run_flask(): + app.run(host="10.147.18.141", port=5000) + +@app.route("/") +def index(): + return "

Hello

This is the backend of our cellpose server, please visit our website.

" + +@app.route("/testdl") +def test_download(): + return send_from_directory("./test_output/2025-09-16-20-03-51", "img_overlay.png", as_attachment=True) + +@app.route("/dl/") +def download(timestamp): + input_dir = os.path.join("./output", timestamp) + output_dir = os.path.join("./output/tmp", timestamp) # 不要加 .zip,make_archive 会自动加 + os.makedirs("./output/tmp", exist_ok=True) # 确保 tmp 存在 + shutil.make_archive(output_dir, 'zip', input_dir) + print(f"压缩完成: {output_dir}.zip") + return send_from_directory("./output/tmp", f"{timestamp}.zip", as_attachment=True) + + +@app.post("/upload") +def upload(): + files = request.files.getlist("files") # ← 前端用同一个键名多次 append + saved = [] + for f in files: + if not f or f.filename == "": + continue + name = secure_filename(f.filename) + f.save(os.path.join(UPLOAD_DIR, name)) + saved.append(name) + return jsonify({"ok": True, "count": len(saved), "files": saved}) \ No newline at end of file diff --git a/main.py b/main.py index 87ab091..f6ff9fa 100644 --- a/main.py +++ b/main.py @@ -1,4 +1,10 @@ -import cp_run +from cp_run import Cprun +from flaskApp import run_flask +from multiprocessing import Process + if __name__ == "__main__": - cp_run.test() + # Cprun.run_test() + p = Process(target=run_flask) + p.start() + print(f"Flask running in PID {p.pid}") \ No newline at end of file