基于 Supabase 构建高并发物联网架构:从协议选型到削峰填谷

在物联网开发中,将海量设备数据接入现代化的后端即服务(BaaS)平台(如 Supabase)是一个极具挑战的任务。Supabase 基于 PostgreSQL,提供了强大的数据库、身份认证和实时功能,但其原生的 HTTP/REST 接口并非为物联网场景下的“海量长连接”和“高频写入”而设计。

本文将深入探讨如何结合 MQTT、消息队列和 Supabase 边缘函数,构建一个既能支撑百万级并发,又能充分利用 Supabase 开发效率优势的稳健架构,并提供核心代码示例。


核心挑战:为什么不能直接用 HTTP 直连 Supabase?

很多开发者在初期会尝试让设备直接通过 HTTP POST 请求将数据发送给 Supabase API。在小规模测试(如几十台设备)时,这种方案看似可行,但在生产环境中会迅速遇到瓶颈。

协议开销与性能对比

物联网设备通常受限于电池电量和网络带宽(如 4G/NB-IoT)。HTTP 协议在设计上是为请求-响应模式服务的,而 MQTT 则是为发布-订阅模式设计的。

特性HTTP/HTTPS (Webhook)MQTT物联网影响
连接模式短连接(通常):频繁握手/断开,延迟高(200ms+)。长连接:一次握手,永久保持,延迟极低(<50ms)。HTTP 会迅速耗尽设备电量和服务器端口资源。
带宽占用重:头部包含大量文本元数据(Cookie, User-Agent)。极轻:二进制头部,最小仅 2 字节。MQTT 能显著降低流量成本,适合弱网环境。
并发能力服务器通常支持数千至数万并发。单集群可轻松支撑百万至千万级并发连接。只有 MQTT 能满足大规模设备接入需求。

Supabase 的承载极限

Supabase 的后端本质是 PostgreSQL 数据库。

  • 连接数限制:PostgreSQL 的最大连接数通常在几百到一千左右。如果 1 万台设备通过 HTTP 频繁请求,数据库连接池会瞬间耗尽。
  • 写入吞吐量:即使是高性能的 Pro 版实例,直接写入的 TPS(每秒事务数)也有限。面对每秒数万条的传感器数据洪流,直接写入会导致数据库锁死或响应超时。

结论:在物联网场景下,绝对不要让设备直接通过 HTTP 高频访问 Supabase API。


架构演进:引入“中间层”

为了解决上述问题,我们需要引入“中间人”来负责连接管理和流量缓冲。
Supabase-IoT-Architecture.png

方案一:MQTT + 边缘函数(适合中小规模)

这是最轻量级的 Serverless 方案。利用云厂商的 MQTT 服务(如阿里云 IoT 平台、EMQX Cloud)作为接入点,通过 Webhook 触发 Supabase 边缘函数。

  • 流程:设备发布 MQTT 消息 -> 云厂商 MQTT 服务 -> 触发 HTTP Webhook -> Supabase 边缘函数 -> 写入数据库。
  • 优点:无需维护服务器,开发极快。
  • 缺点:如果设备瞬间上报量极大,边缘函数会被频繁调用,可能触发 Supabase 的速率限制或产生高额费用。

方案二:MQTT + 消息队列 + 批量消费(工业级高并发架构)

这是支撑百万级并发的标准架构。核心思想是削峰填谷:使用高性能消息队列作为“水库”,将设备数据的洪峰拦截下来,然后由消费者平滑地写入 Supabase。


推荐架构详解:削峰填谷模式

在这个架构中,Supabase 专注于数据存储和业务管理,而繁重的接入和缓冲工作交由专用组件处理。

架构流程描述

  1. 设备接入层(MQTT Broker)

    • 维持百万级设备的 TCP 长连接,处理设备上下线通知、心跳保活。
    • 建议使用托管服务(如阿里云 IoT Core、腾讯云 MQTT)。
  2. 流量缓冲层(消息队列)

    • 作为“蓄水池”。当设备数据瞬间激增(如突发报警)时,队列将数据暂存,保护后端数据库不被击穿。
    • 建议使用 Kafka、RocketMQ 或云厂商的消息队列服务。
  3. 数据处理层(消费者)

    • 这是连接物联网世界与 Supabase 的桥梁。
    • 关键逻辑批量写入。消费者不应来一条写一条,而应从队列中一次性拉取 500-1000 条数据,组装成一个 INSERT 语句写入 Supabase。
    • 可以使用 Supabase 边缘函数(如果支持长轮询队列),或者部署一个独立的轻量级服务(如 Go/Netty 编写)。
  4. Supabase 存储层

    • 存储清洗后的业务数据、设备元数据、用户信息。
    • 利用 PostgreSQL 的强大查询能力进行数据分析,利用 Supabase Auth 管理设备管理员权限。

代码实战:边缘函数中的批量写入逻辑

假设我们使用一个消费者服务(可以是边缘函数,也可以是独立服务)从队列获取数据并写入 Supabase。

TypeScript 示例(Supabase 边缘函数风格)

// supabase/functions/iot-consumer/index.ts
import { serve } from "https://deno.land/std@0.168.0/http/server.ts"
import { createClient } from 'https://esm.sh/@supabase/supabase-js@2'

// 模拟从消息队列拉取数据的函数
// 在实际场景中,这里会调用 Kafka/RocketMQ 的 API
async function pollMessageQueue() {
  // 模拟网络请求延迟
  await new Promise(resolve => setTimeout(resolve, 100))
  
  // 模拟返回 5 条数据
  return [
    { deviceId: "dev_001", temp: 24.5, hum: 60 },
    { deviceId: "dev_002", temp: 25.1, hum: 58 },
    { deviceId: "dev_001", temp: 24.6, hum: 61 },
    { deviceId: "dev_003", temp: 23.9, hum: 62 },
    { deviceId: "dev_002", temp: 25.2, hum: 57 },
  ]
}

serve(async (req) => {
  // 1. 从消息队列批量拉取数据
  const messages = await pollMessageQueue()
  
  if (messages.length === 0) {
    return new Response(JSON.stringify({ message: "No messages" }))
  }

  // 2. 初始化 Supabase 客户端
  const supabase = createClient(
    Deno.env.get('SUPABASE_URL')!,
    Deno.env.get('SUPABASE_SERVICE_ROLE_KEY')!
  )

  // 3. 数据转换:将 MQTT 消息转换为数据库行格式
  const rows = messages.map(msg => ({
    device_id: msg.deviceId,
    temperature: msg.temp,
    humidity: msg.hum,
    recorded_at: new Date().toISOString()
  }))

  try {
    // 4. 关键:批量插入!
    // 将多次网络请求合并为1次,极大减轻数据库压力
    const { data, error } = await supabase
      .from('sensor_readings')
      .insert(rows)

    if (error) throw error
    
    return new Response(JSON.stringify({ 
      success: true, 
      insertedCount: rows.length 
    }))
  } catch (err) {
    return new Response(JSON.stringify({ error: err.message }), { status: 500 })
  }
})

Python 示例(独立消费者服务风格)

# consumer.py
import time
import os
from supabase import create_client, Client

# 1. 初始化 Supabase 客户端
url: str = os.getenv("SUPABASE_URL")
key: str = os.getenv("SUPABASE_SERVICE_ROLE_KEY")
supabase: Client = create_client(url, key)

def poll_message_queue():
    # 模拟从 Kafka/RocketMQ 拉取数据
    # 实际代码中这里会使用 confluent_kafka 或 rocketmq 库
    time.sleep(0.1) 
    return [
        {"deviceId": "dev_001", "temp": 24.5, "hum": 60},
        {"deviceId": "dev_002", "temp": 25.1, "hum": 58},
    ]

def consume_loop():
    print("Starting consumer loop...")
    while True:
        # 2. 批量拉取
        messages = poll_message_queue()
        
        if not messages:
            time.sleep(1) # 无数据时休眠,避免空转
            continue

        # 3. 数据转换
        rows = []
        for msg in messages:
            rows.append({
                "device_id": msg["deviceId"],
                "temperature": msg["temp"],
                "humidity": msg["hum"],
                "recorded_at": time.strftime("%Y-%m-%d %H:%M:%S")
            })

        # 4. 批量写入 Supabase
        try:
            data = supabase.table("sensor_readings").insert(rows).execute()
            print(f"Successfully inserted {len(rows)} records")
        except Exception as e:
            print(f"Failed to insert: {e}")
        
        # 模拟确认消息已处理 (ACK)

if __name__ == "__main__":
    consume_loop()

总结与建议

关注点建议方案理由
通信协议MQTT相比 HTTP,它是长连接、二进制协议,极度节省带宽和电量,适合海量并发。
接入方式云厂商 IoT 平台不要自建 Netty/Mosquitto 服务器,除非有极特殊需求。云托管服务能解决运维难题。
写入策略消息队列 + 批量消费這是解决“高并发写入”的唯一正解。直接写库必死无疑。
Supabase 角色数据仓库 + 业务后台用它来存数据、管用户、做 API,但不要让它直接面对设备端的流量洪峰。

通过这种架构,你既享受了 Supabase 带来的极速后端开发体验,又通过标准的物联网中间件解决了高并发和协议适配问题,实现了真正的“鱼和熊掌兼得”。