基于 Supabase 构建高并发物联网架构:从协议选型到削峰填谷
在物联网开发中,将海量设备数据接入现代化的后端即服务(BaaS)平台(如 Supabase)是一个极具挑战的任务。Supabase 基于 PostgreSQL,提供了强大的数据库、身份认证和实时功能,但其原生的 HTTP/REST 接口并非为物联网场景下的“海量长连接”和“高频写入”而设计。
本文将深入探讨如何结合 MQTT、消息队列和 Supabase 边缘函数,构建一个既能支撑百万级并发,又能充分利用 Supabase 开发效率优势的稳健架构,并提供核心代码示例。
核心挑战:为什么不能直接用 HTTP 直连 Supabase?
很多开发者在初期会尝试让设备直接通过 HTTP POST 请求将数据发送给 Supabase API。在小规模测试(如几十台设备)时,这种方案看似可行,但在生产环境中会迅速遇到瓶颈。
协议开销与性能对比
物联网设备通常受限于电池电量和网络带宽(如 4G/NB-IoT)。HTTP 协议在设计上是为请求-响应模式服务的,而 MQTT 则是为发布-订阅模式设计的。
| 特性 | HTTP/HTTPS (Webhook) | MQTT | 物联网影响 |
|---|---|---|---|
| 连接模式 | 短连接(通常):频繁握手/断开,延迟高(200ms+)。 | 长连接:一次握手,永久保持,延迟极低(<50ms)。 | HTTP 会迅速耗尽设备电量和服务器端口资源。 |
| 带宽占用 | 重:头部包含大量文本元数据(Cookie, User-Agent)。 | 极轻:二进制头部,最小仅 2 字节。 | MQTT 能显著降低流量成本,适合弱网环境。 |
| 并发能力 | 服务器通常支持数千至数万并发。 | 单集群可轻松支撑百万至千万级并发连接。 | 只有 MQTT 能满足大规模设备接入需求。 |
Supabase 的承载极限
Supabase 的后端本质是 PostgreSQL 数据库。
- 连接数限制:PostgreSQL 的最大连接数通常在几百到一千左右。如果 1 万台设备通过 HTTP 频繁请求,数据库连接池会瞬间耗尽。
- 写入吞吐量:即使是高性能的 Pro 版实例,直接写入的 TPS(每秒事务数)也有限。面对每秒数万条的传感器数据洪流,直接写入会导致数据库锁死或响应超时。
结论:在物联网场景下,绝对不要让设备直接通过 HTTP 高频访问 Supabase API。
架构演进:引入“中间层”
为了解决上述问题,我们需要引入“中间人”来负责连接管理和流量缓冲。

方案一:MQTT + 边缘函数(适合中小规模)
这是最轻量级的 Serverless 方案。利用云厂商的 MQTT 服务(如阿里云 IoT 平台、EMQX Cloud)作为接入点,通过 Webhook 触发 Supabase 边缘函数。
- 流程:设备发布 MQTT 消息 -> 云厂商 MQTT 服务 -> 触发 HTTP Webhook -> Supabase 边缘函数 -> 写入数据库。
- 优点:无需维护服务器,开发极快。
- 缺点:如果设备瞬间上报量极大,边缘函数会被频繁调用,可能触发 Supabase 的速率限制或产生高额费用。
方案二:MQTT + 消息队列 + 批量消费(工业级高并发架构)
这是支撑百万级并发的标准架构。核心思想是削峰填谷:使用高性能消息队列作为“水库”,将设备数据的洪峰拦截下来,然后由消费者平滑地写入 Supabase。
推荐架构详解:削峰填谷模式
在这个架构中,Supabase 专注于数据存储和业务管理,而繁重的接入和缓冲工作交由专用组件处理。
架构流程描述
-
设备接入层(MQTT Broker)
- 维持百万级设备的 TCP 长连接,处理设备上下线通知、心跳保活。
- 建议使用托管服务(如阿里云 IoT Core、腾讯云 MQTT)。
-
流量缓冲层(消息队列)
- 作为“蓄水池”。当设备数据瞬间激增(如突发报警)时,队列将数据暂存,保护后端数据库不被击穿。
- 建议使用 Kafka、RocketMQ 或云厂商的消息队列服务。
-
数据处理层(消费者)
- 这是连接物联网世界与 Supabase 的桥梁。
- 关键逻辑:批量写入。消费者不应来一条写一条,而应从队列中一次性拉取 500-1000 条数据,组装成一个 INSERT 语句写入 Supabase。
- 可以使用 Supabase 边缘函数(如果支持长轮询队列),或者部署一个独立的轻量级服务(如 Go/Netty 编写)。
-
Supabase 存储层
- 存储清洗后的业务数据、设备元数据、用户信息。
- 利用 PostgreSQL 的强大查询能力进行数据分析,利用 Supabase Auth 管理设备管理员权限。
代码实战:边缘函数中的批量写入逻辑
假设我们使用一个消费者服务(可以是边缘函数,也可以是独立服务)从队列获取数据并写入 Supabase。
TypeScript 示例(Supabase 边缘函数风格)
// supabase/functions/iot-consumer/index.ts
import { serve } from "https://deno.land/std@0.168.0/http/server.ts"
import { createClient } from 'https://esm.sh/@supabase/supabase-js@2'
// 模拟从消息队列拉取数据的函数
// 在实际场景中,这里会调用 Kafka/RocketMQ 的 API
async function pollMessageQueue() {
// 模拟网络请求延迟
await new Promise(resolve => setTimeout(resolve, 100))
// 模拟返回 5 条数据
return [
{ deviceId: "dev_001", temp: 24.5, hum: 60 },
{ deviceId: "dev_002", temp: 25.1, hum: 58 },
{ deviceId: "dev_001", temp: 24.6, hum: 61 },
{ deviceId: "dev_003", temp: 23.9, hum: 62 },
{ deviceId: "dev_002", temp: 25.2, hum: 57 },
]
}
serve(async (req) => {
// 1. 从消息队列批量拉取数据
const messages = await pollMessageQueue()
if (messages.length === 0) {
return new Response(JSON.stringify({ message: "No messages" }))
}
// 2. 初始化 Supabase 客户端
const supabase = createClient(
Deno.env.get('SUPABASE_URL')!,
Deno.env.get('SUPABASE_SERVICE_ROLE_KEY')!
)
// 3. 数据转换:将 MQTT 消息转换为数据库行格式
const rows = messages.map(msg => ({
device_id: msg.deviceId,
temperature: msg.temp,
humidity: msg.hum,
recorded_at: new Date().toISOString()
}))
try {
// 4. 关键:批量插入!
// 将多次网络请求合并为1次,极大减轻数据库压力
const { data, error } = await supabase
.from('sensor_readings')
.insert(rows)
if (error) throw error
return new Response(JSON.stringify({
success: true,
insertedCount: rows.length
}))
} catch (err) {
return new Response(JSON.stringify({ error: err.message }), { status: 500 })
}
})
Python 示例(独立消费者服务风格)
# consumer.py
import time
import os
from supabase import create_client, Client
# 1. 初始化 Supabase 客户端
url: str = os.getenv("SUPABASE_URL")
key: str = os.getenv("SUPABASE_SERVICE_ROLE_KEY")
supabase: Client = create_client(url, key)
def poll_message_queue():
# 模拟从 Kafka/RocketMQ 拉取数据
# 实际代码中这里会使用 confluent_kafka 或 rocketmq 库
time.sleep(0.1)
return [
{"deviceId": "dev_001", "temp": 24.5, "hum": 60},
{"deviceId": "dev_002", "temp": 25.1, "hum": 58},
]
def consume_loop():
print("Starting consumer loop...")
while True:
# 2. 批量拉取
messages = poll_message_queue()
if not messages:
time.sleep(1) # 无数据时休眠,避免空转
continue
# 3. 数据转换
rows = []
for msg in messages:
rows.append({
"device_id": msg["deviceId"],
"temperature": msg["temp"],
"humidity": msg["hum"],
"recorded_at": time.strftime("%Y-%m-%d %H:%M:%S")
})
# 4. 批量写入 Supabase
try:
data = supabase.table("sensor_readings").insert(rows).execute()
print(f"Successfully inserted {len(rows)} records")
except Exception as e:
print(f"Failed to insert: {e}")
# 模拟确认消息已处理 (ACK)
if __name__ == "__main__":
consume_loop()
总结与建议
| 关注点 | 建议方案 | 理由 |
|---|---|---|
| 通信协议 | MQTT | 相比 HTTP,它是长连接、二进制协议,极度节省带宽和电量,适合海量并发。 |
| 接入方式 | 云厂商 IoT 平台 | 不要自建 Netty/Mosquitto 服务器,除非有极特殊需求。云托管服务能解决运维难题。 |
| 写入策略 | 消息队列 + 批量消费 | 這是解决“高并发写入”的唯一正解。直接写库必死无疑。 |
| Supabase 角色 | 数据仓库 + 业务后台 | 用它来存数据、管用户、做 API,但不要让它直接面对设备端的流量洪峰。 |
通过这种架构,你既享受了 Supabase 带来的极速后端开发体验,又通过标准的物联网中间件解决了高并发和协议适配问题,实现了真正的“鱼和熊掌兼得”。