Flask - 常见应用部署方案

前言 开发调试阶段,运行 Flask 的方式多直接使用 app.run(),但 Flask 内置的 WSGI Server 的性能并不高。对于生产环境,一般使用 gunicorn。如果老项目并不需要多高的性能,而且用了很多单进程内的共享变量,使用 gunicorn 会影响不同会话间的通信,那么也可以试试直接用 gevent。 ...

2026年2月14日 · 8 分钟 · Rainux He

FastAPI 执行异步定时任务:基于数据库的分布式锁实现

前言 在 FastAPI 应用中执行定时任务,通常可以选择 celery,但 Celery 相对重量级,且需要依赖 Redis 或 RabbitMQ 等消息队列。当服务规模较小,且原本未使用 Redis 或 RabbitMQ 时,仅为定时任务功能引入这些依赖会增加额外的运维成本。 ...

2026年2月8日 · 9 分钟 · Rainux He

SQLAlchemy 中使用 UPSERT

前言 SQLite 和 PostgreSQL 都支持 UPSERT 操作,即"有则更新,无则新增"。冲突列必须有唯一约束。 语法: PostgreSQL: INSERT ... ON CONFLICT (column) DO UPDATE/NOTHING SQLite: INSERT ... ON CONFLICT(column) DO UPDATE/NOTHING。注意括号位置 场景 PostgreSQL SQLite 说明 基本 UPSERT ON CONFLICT (col) DO UPDATE SET ... ON CONFLICT(col) DO UPDATE SET ... 括号位置略有不同 冲突忽略 ON CONFLICT (col) DO NOTHING ON CONFLICT(col) DO NOTHING 相同 引用新值 EXCLUDED.col excluded.col PostgreSQL 大写,SQLite 小写 返回结果 RETURNING * RETURNING * 相同 条件更新 WHERE condition 不支持 WHERE SQLite 限制 注意事项 冲突列必须有唯一约束 PostgreSQL 和 SQLite 的语法相似,但仍有细微差别。使用原生 SQL 时需要注意。 SQLite 在 UPSERT 时不支持 WHERE 子句,需要改用 CASE 表达式或应用层过滤。 SQLite 3.35+ 版本才支持 RETURNING EXCLUDED 和 RETURNING EXCLUDED EXCLUDED 表示冲突时被拦截的新值。 ...

2026年2月8日 · 6 分钟 · Rainux He

Flask - Tracking ID的设计

前言 在实际业务中,根据 tracking_id 追溯一条请求的完整处理路径是比较常见的需求。借助 Flask 自带的全局对象 g 以及钩子函数,可以很容易地为每条请求添加 tracking_id,并在日志中自动记录。 ...

2026年1月17日 · 7 分钟 · Rainux He

FastAPI - 在异步方法中调用同步方法

前言 在异步方法中直接调用同步方法会阻塞整个事件循环,导致应用在执行同步方法期间无法处理任何其他并发请求,严重影响服务的整体性能和响应能力。 为了解决这个问题,核心思路是将同步方法交给外部线程池或进程池执行,避免阻塞主事件循环。 方法 1:使用 asyncio.to_thread Python 3.9 及以后版本可以使用 asyncio.to_thread 方法,将同步函数运行在独立的线程中,并返回一个可供 await 的协程对象 import asyncio import time from fastapi import FastAPI app = FastAPI() def sync_task(name: str): time.sleep(2) return f"Hello {name}, sync task done!" @app.get("/async-call") async def async_endpoint(): result = await asyncio.to_thread(sync_task, "World") return {"message": result} 方法 2:直接定义同步路由 FastAPI 支持定义同步路由,FastAPI 会自动在一个外部线程池中运行该函数。不过出于代码整体设计和一致性的考虑,不建议在异步项目中混用同步路由。 方法 3:使用 run_in_threadpool FastAPI 基于 Starlette,而 Starlette 提供了一个工具函数 run_in_threadpool,这种方式类似于 asyncio.to_thread,在某些老版本的 FastAPI 或特定的 contextvars 传递场景下更常用。 from fastapi.concurrency import run_in_threadpool @app.get("/method3") async def starlette_endpoint(): result = await run_in_threadpool(sync_task, "Starlette") return {"message": result} 方法 4:使用进程池 对于 CPU 密集型任务,应该使用多进程 ProcessPoolExecutor 来处理 import concurrent.futures import math from fastapi import FastAPI app = FastAPI() # 创建一个全局进程池 executor = concurrent.futures.ProcessPoolExecutor() def cpu_intensive_calculation(n: int): # 模拟重度 CPU 计算 return sum(math.isqrt(i) for i in range(n)) @app.get("/cpu-bound-task") async def cpu_task(): loop = asyncio.get_running_loop() result = await loop.run_in_executor(executor, cpu_intensive_calculation, 10**7) return {"result": result}

2026年1月6日 · 1 分钟 · Rainux He

Mcp-01: 简介与概念

前言 所有示例代码我都上传到 Git Repo 了,有需要的话可以直接 clone。https://github.com/rainuxhe/mcp-examples 简介 MCP(全称为Model Context Protocol,模型上下文协议)是一种面向大模型交互过程的通用上下文协议标准。其核心目标在于为模型构建一个结构化、可控、可扩展的语义执行环境,使语言模型能够在统一的上下文管理体系下进行任务调度、工具调用、资源协作与状态保持,从而突破传统Prompt Engineering在多轮交互、指令组合与行为稳定性方面的瓶颈。 在传统的大模型应用中,模型本身只能被动地接收输入、产生输出,要让它调用外部工具或访问自定义的上下文,就需要在代码里逐条写好 API 调用、认证、错误处理的逻辑,既繁琐又难以维护。MCP的初衷,就是将这些"上下文管理"和"工具调用"能力抽象成一个标准化的通信协议,让大模型应用只需关注"我想用什么资源",由专门的 MCP 服务端来真正执行调用、管理状态、返回结果。 MCP 官方GitHub 有一种说法是,传统的大模型应用叫做Prompt Engineering,而MCP出现后,大模型应用开发应该叫做Context Engineering。传统的提示工程常常依赖于简单的字符串拼接,这种方式有几个问题: 歧义性:模型可能难以区分哪些是指令,哪些是用户输入,哪些是检索到的数据。 提示注入风险:如果提示中包含恶意指令,例如ignore all previous instructions,模型可能被欺骗。 脆弱性:格式的微小变化(比如多一个换行符)都可能导致模型性能下降。 难以维护:当上下文变得更复杂时(例如,多个数据源、工具定义、历史消息),这种拼接方式会变得一团糟(亲身体验,塞了一堆历史消息后,模型的回答越拐越远) 核心概念 Tools(工具) 工具是AI模型可以调用以执行特定操作的函数。它们允许模型与外部系统交互,执行有副作用的操作,如: 调用API获取实时数据 查询或修改数据库 执行代码或脚本 发送邮件或消息 文件操作 工具由模型控制,这意味着AI决定是否以及何时使用它们。工具调用可能会产生副作用,其结果可以反馈到对话中。 Resources(资源) 资源是提供给模型的只读上下文单元(数据源)。它们可以是: 文件内容 数据库记录 API响应 知识库内容 资源由应用程序控制,托管方或开发人员决定公开哪些数据以及如何公开。读取资源没有副作用,类似于仅获取数据的GET请求。资源提供可在需要时注入模型上下文的内容(例如,在问答场景中检索到的文档)。 Prompts(提示模板) 提示模板是可重复使用的提示模板或指令,可以根据需要调用。它们由用户控制或由开发人员预定义。提示可能包含常见任务或指导性工作流程的模板(例如,代码审查模板或问答格式)。 提示模板的关键特性包括: 参数化:支持动态参数输入 资源整合:可嵌入资源上下文供模型参考 多轮交互:支持构建多轮对话流程 统一发现:通过标准接口注册和调用 Sampling(采样) 采样是工具与LLM交互以生成文本的机制。通过采样,工具可以请求LLM生成文本内容,例如生成诗歌、文章或其他文本内容。采样允许工具利用LLM的能力来创建内容,而不仅限于执行预定义的操作。 Elicitation(征询) 征询是一种允许工具向用户请求额外信息或确认的机制。当工具执行过程中需要更多信息才能继续执行时,可以使用征询功能与用户交互。这在处理需要用户确认或提供额外参数的操作时特别有用。 例如,在预订系统中,如果用户请求的日期已满,工具可以征询用户是否愿意选择其他日期。征询机制确保了工具可以在必要时暂停执行,等待用户输入,从而提供更好的用户体验。 征询的关键特性包括: 交互性:允许工具与用户进行双向沟通 验证:可以对用户输入进行验证,确保数据的正确性 可选性:用户可以选择接受、拒绝或取消征询请求 结构化:支持结构化数据输入,便于处理复杂信息 Roots(根) Root表示一次语义执行的起点,携带资源引用、执行目标、响应格式等信息,支持多并发执行流。它作为语义执行的基础输入结构,可以包含多个Prompt和工具,为模型提供完整的上下文环境。 Logging(日志记录) 日志记录是MCP中的一个重要功能,允许服务器和工具向客户端发送日志信息。通过日志记录,开发者可以跟踪工具执行过程、调试问题以及监控系统状态。MCP支持多种日志级别,包括调试(debug)、信息(info)、警告(warning)和错误(error)等。 Notifications(通知) 通知机制允许服务器向客户端发送实时更新信息,例如资源变更、工具列表更新等。通过通知,客户端可以及时了解服务器状态的变化,并相应地更新用户界面或执行其他操作。常见的通知类型包括资源更新通知、工具列表变更通知、提示列表变更通知等。 组件 MCP Server Server 是一个独立的程序或服务,它通过 MCP 协议向 MCP 客户端暴露特定的功能、工具或数据资源。 ...

2025年12月25日 · 1 分钟 · Rainux He

FastAPI - Tracking ID的设计

前言 在实际业务中,根据 tracking_id 追查日志中一条请求的完整处理路径是一个比较常见的需求。不过 FastAPI 官方并没有提供相对应的功能,因此需要开发者自行实现。本文介绍如何基于 contextvars,为每次请求的完整流程都添加一个 tracking_id,并在日志中自动记录。 什么是 contextvars Python 在 3.7 版本的标准库中加入了一个模块 contextvars,顾名思义就是 “(Context Variables) 上下文变量”,通常用来隐式地传递一些环境信息的变量,其作用跟 threading.local() 比较相似。不过 threading.local() 是针对线程的,隔离线程之间的数据状态,而 contextvars 可以用在 asyncio 生态的异步协程中。PS: contextvars 不仅可以用在异步协程中,也可以替代 threading.local() 用在多线程函数中。 基本使用 首先编写 context.py import contextvars from typing import Optional TRACKING_ID: contextvars.ContextVar[Optional[str]] = contextvars.ContextVar( 'tracking_id', default=None ) def get_tracking_id() -> Optional[str]: """用于依赖注入""" return TRACKING_ID.get() 编写中间件 middlewares.py,在请求头和响应头中添加 tracking_id 的信息。常见场景就是客户拿着 tracking_id 找碴。 import uuid from starlette.middleware.base import (BaseHTTPMiddleware, RequestResponseEndpoint) from starlette.requests import Request from starlette.responses import Response from context import TRACKING_ID class TrackingIDMiddleware(BaseHTTPMiddleware): async def dispatch( self, request: Request, call_next: RequestResponseEndpoint ) -> Response: tracking_id = str(uuid.uuid4()) token = TRACKING_ID.set(tracking_id) # HTTP 请求头习惯于使用 latin-1 编码 request.scope["headers"].append((b"x-request-id", tracking_id.encode("latin-1"))) try: resp = await call_next(request) finally: # 无论是否成功,每次请求结束时重置 tracking_id,避免泄露到下一次的请求中 TRACKING_ID.reset(token) # 可选, 在响应中设置跟踪 ID 头 resp.headers["X-Tracking-ID"] = tracking_id return resp 编写 handler 函数 handlers.py,测试在 handler 函数中获取 tracking_id。 import asyncio from context import TRACKING_ID async def mock_db_query(): await asyncio.sleep(1) current_id = TRACKING_ID.get() print(f"This is mock_db_query. Current tracking ID: {current_id}") await asyncio.sleep(1) 编写主函数 main.py import uvicorn from fastapi import Depends, FastAPI from fastapi.responses import PlainTextResponse from starlette.background import BackgroundTasks from context import TRACKING_ID, get_tracking_id from handlers import mock_db_query from middlewares import TrackingIDMiddleware app = FastAPI() app.add_middleware(TrackingIDMiddleware) @app.get("/qwer") async def get_qwer(): """测试上下文变量传递""" current_id = TRACKING_ID.get() print(f"This is get qwer. Current tracking ID: {current_id}") return PlainTextResponse(f"Current tracking ID: {current_id}") @app.get("/asdf") async def get_asdf(tracking_id: str = Depends(get_tracking_id)): """测试依赖注入""" print(f"This is get asdf. tracking ID: {tracking_id}") await mock_db_query() return PlainTextResponse(f"Get request, tracking ID: {tracking_id}") if __name__ == "__main__": uvicorn.run("main:app", host="127.0.0.1", port=8000, workers=4) 启动服务后用 curl 测试 api,在控制台可以看到 tracking_id 在请求中都能捕获到。 This is get qwer. Current tracking ID: 01b0153f-4877-4ca0-ac35-ed88ab406452 INFO: 127.0.0.1:55708 - "GET /qwer HTTP/1.1" 200 OK This is get asdf. tracking ID: 0be61d8d-11a0-4cb6-812f-51b9bfdc2639 This is mock_db_query. Current tracking ID: 0be61d8d-11a0-4cb6-812f-51b9bfdc2639 INFO: 127.0.0.1:55722 - "GET /asdf HTTP/1.1" 200 OK 使用 curl 的控制台输出 ...

2025年12月11日 · 7 分钟 · Rainux He