传感器及相关程序控制
综合环境传感器集成与数据采集系统笔记本笔记详细介绍了光照、温湿度、土壤综合传感器的接线、测试、程序实现、数据库存储、MQTT发布及注意事项适合环境监测、智慧农业等场景的传感器集成开发与工程落地。一、光照传感器1. 光照传感器图片2. 光照传感器测试步骤通过“此电脑-管理”查看设备端口信息。在程序中找到对应串口号点击测试波特率显示设备地址可后续更改。波特率默认9600可根据需求自定义修改。点击“查询”按钮实时显示光照度。3. 相关程序内容1程序运行从站地址、寄存器地址# 光照传感器从站地址 2 LIGHT_SLAVE 2 LIGHT_REGISTER_ADDR 0x0006 # 光照寄存器地址 # 注寄存器地址需自行查询如有误则无法获取数据2程序运行主体内容# 光照从站2 def read_light(ser): 读取光照强度Lux clear_serial_buffer(ser) cmd build_read_command(LIGHT_SLAVE, LIGHT_REGISTER_ADDR, 1) logger.debug(f发送光照命令从站{LIGHT_SLAVE}{cmd.hex()}) ser.write(cmd) time.sleep(0.3) resp ser.read(50) if len(resp) 7: logger.error(f光照响应太短{len(resp)}字节{resp.hex()}) return None regs parse_read_response(resp, LIGHT_SLAVE, 1) if regs is None: logger.error(f解析光照失败{resp.hex()}) return None light_value regs[0] if light_value 0: logger.warning(光照值为0请检查传感器是否被遮挡或接线异常) logger.info(f光照强度{light_value} Lux) return light_value二、温湿度变送器1. 温湿度变送器图片2. 温湿度变送器测试步骤测试步骤与光照传感器完全一致使用同一程序和操作流程。点击“查询”即可查看温湿度数据。3. 相关程序内容1程序运行从站地址、寄存器地址# 温湿度传感器从站地址 101 TEMP_HUMI_SLAVE 101 TEMP_HUMI_START_ADDR 0x0000 # 湿度(0x0000)温度(0x0001) # 注寄存器地址需自行查询如有误则无法获取数据2程序运行主体部分三、土壤# 温湿度从站101 def read_temp_humi(ser): 读取空气温度和湿度 clear_serial_buffer(ser) # 清除之前可能残留的旧数据避免将上次的响应误当作本次响应 cmd build_read_command(TEMP_HUMI_SLAVE, TEMP_HUMI_START_ADDR, 2) logger.debug(f发送温湿度命令{cmd.hex()}) ser.write(cmd) time.sleep(0.3) resp ser.read(50) if len(resp) 9: logger.error(f温湿度响应太短{len(resp)}字节{resp.hex()}) return None, None # Modbus 正确的响应帧至少 9 字节。长度不足说明通信失败可能传感器未响应、接线错误、从站地址不对、波特率不匹配等 regs parse_read_response(resp, TEMP_HUMI_SLAVE, 2) if regs is None: logger.error(f解析温湿度失败{resp.hex()}) return None, None humi_raw, temp_raw regs humidity humi_raw / 10.0 if temp_raw 32767: temperature (temp_raw - 65536) / 10.0 else: temperature temp_raw / 10.0 logger.info(f空气温度{temperature:.1f}°C空气湿度{humidity:.1f}%) return temperature, humidity综合传感器1. 土壤综合传感器图片2. 土壤综合传感器测试步骤通过“此电脑-管理”查看端口信息。在程序中找到对应串口号点击“自动获取当前波特率和地址”可获取当前设备波特率和设备地址均可根据需求更改。点击“连接设备”连接成功后默认显示土壤PH值。如需查看其他参数可通过下拉列表选择相应内容。3. 相关程序内容1程序运行从站地址、寄存器地址# 土壤综合传感器从站地址 1 SOIL_SLAVE 1 SOIL_MOISTURE_ADDR 0x0012 # 土壤湿度 SOIL_TEMP_ADDR 0x0013 # 土壤温度 SOIL_EC_ADDR 0x0015 # 土壤电导率 SOIL_N_ADDR 0x001E # 土壤氮 SOIL_P_ADDR 0x001F # 土壤磷 SOIL_K_ADDR 0x0020 # 土壤钾 SOIL_PH_ADDR 0x0006 # 土壤酸碱度从站1 # 注寄存器地址需自行查询如有误则无法获取数据2程序运行主体部分# 土壤传感器从站1 def read_soil_parameter(ser, name, addr, divide_factor1, retry2): 读取单个土壤参数支持重试 for attempt in range(retry): clear_serial_buffer(ser) cmd build_read_command(SOIL_SLAVE, addr, 1) ser.write(cmd) time.sleep(0.3) resp ser.read(50) if len(resp) 7: logger.error(f读取{name}响应太短{len(resp)}字节{resp.hex()}) continue regs parse_read_response(resp, SOIL_SLAVE, 1) if regs is None: logger.error(f解析{name}失败{resp.hex()}) continue raw regs[0] value raw / divide_factor if divide_factor ! 1 else raw logger.debug(f土壤{name}原始值{raw}转换后{value}) return value, False return None, True def read_soil_sensor(ser): 读取土壤所有参数湿度、温度、电导率、氮磷钾、酸碱度 soil_data { moisture: None, temp: None, ec: None, n: None, p: None, k: None, ph: None } tasks [ (湿度, SOIL_MOISTURE_ADDR, 10), (温度, SOIL_TEMP_ADDR, 10), (电导率, SOIL_EC_ADDR, 1), (氮含量, SOIL_N_ADDR, 1), (磷含量, SOIL_P_ADDR, 1), (钾含量, SOIL_K_ADDR, 1), (酸碱度, SOIL_PH_ADDR, 100) # PH值原始值/100 ] for name, addr, div in tasks: value, err read_soil_parameter(ser, name, addr, div, retry2) if not err: if name 湿度: soil_data[moisture] value elif name 温度: soil_data[temp] value elif name 电导率: soil_data[ec] value elif name 氮含量: soil_data[n] value elif name 磷含量: soil_data[p] value elif name 钾含量: soil_data[k] value elif name 酸碱度: soil_data[ph] value else: logger.warning(f土壤{name}读取失败) # 输出汇总 if soil_data[moisture] is not None: logger.info(f土壤湿度{soil_data[moisture]:.1f}%) if soil_data[temp] is not None: logger.info(f土壤温度{soil_data[temp]:.1f}°C) if soil_data[ec] is not None: logger.info(f土壤电导率{soil_data[ec]} μS/cm) if soil_data[n] is not None: logger.info(f土壤氮含量{soil_data[n]} mg/kg) if soil_data[p] is not None: logger.info(f土壤磷含量{soil_data[p]} mg/kg) if soil_data[k] is not None: logger.info(f土壤钾含量{soil_data[k]} mg/kg) if soil_data[ph] is not None: logger.info(f土壤酸碱度{soil_data[ph]:.2f}) return soil_data四、数据库上传与数据保存# 数据库文件 DB_FILE sensor_data.db # 数据库操作 def init_db(): 初始化数据库创建需要的表 conn sqlite3.connect(DB_FILE) cursor conn.cursor() cursor.execute( CREATE TABLE IF NOT EXISTS sensor_data ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL, temperature REAL, humidity REAL, light_intensity INTEGER ) ) cursor.execute( CREATE TABLE IF NOT EXISTS soil_data ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL, moisture REAL, temperature REAL, conductivity INTEGER, nitrogen INTEGER, phosphorus INTEGER, potassium INTEGER, ph REAL ) ) # 确保列存在兼容旧表 cursor.execute(PRAGMA table_info(soil_data)) columns [col[1] for col in cursor.fetchall()] if ph not in columns: cursor.execute(ALTER TABLE soil_data ADD COLUMN ph REAL) logger.info(已为土壤数据表添加酸碱度列) cursor.execute(PRAGMA table_info(sensor_data)) columns [col[1] for col in cursor.fetchall()] if light_intensity not in columns: cursor.execute(ALTER TABLE sensor_data ADD COLUMN light_intensity INTEGER) logger.info(已为空气数据表添加光照列) conn.commit() conn.close() logger.info(f数据库初始化完成{DB_FILE}) def save_temp_humi_light_to_db(temp, humi, light): 保存空气温湿度和光照到数据库 try: conn sqlite3.connect(DB_FILE) cursor conn.cursor() ts datetime.now().strftime(%Y-%m-%d %H:%M:%S) cursor.execute( INSERT INTO sensor_data (timestamp, temperature, humidity, light_intensity) VALUES (?, ?, ?, ?) , (ts, temp, humi, light)) conn.commit() except Exception as e: logger.error(f保存空气数据失败{e}) finally: conn.close() def save_soil_to_db(soil_data): 保存土壤数据到数据库 try: conn sqlite3.connect(DB_FILE) cursor conn.cursor() ts datetime.now().strftime(%Y-%m-%d %H:%M:%S) cursor.execute( INSERT INTO soil_data (timestamp, moisture, temperature, conductivity, nitrogen, phosphorus, potassium, ph) VALUES (?, ?, ?, ?, ?, ?, ?, ?) , ( ts, soil_data.get(moisture), soil_data.get(temp), soil_data.get(ec), soil_data.get(n), soil_data.get(p), soil_data.get(k), soil_data.get(ph) )) conn.commit() except Exception as e: logger.error(f保存土壤数据失败{e}) finally: conn.close()五、MQTT数据发布# MQTT 服务器配置 MQTT_BROKER MQTT_PORT 1883 MQTT_TOPIC MQTT_USER MQTT_PASSWORD # MQTT 发布字段名已中文化 def on_connect(client, userdata, flags, rc, propertiesNone): if rc 0: logger.info(MQTT 服务器连接成功) else: logger.error(fMQTT 连接失败返回码{rc}) def publish_all_data(client, temp, humi, light, soil_data): 将所有传感器数据发布到 MQTT 主题字段名为中文 if not client or not client.is_connected(): logger.warning(MQTT 未连接跳过数据发布) return payload { 设备: sensor_01, 时间: datetime.now().strftime(%Y-%m-%d %H:%M:%S) } if temp is not None: payload[温度] round(temp, 1) if humi is not None: payload[湿度] round(humi, 1) if light is not None: payload[光照] light soil_fields { 土壤湿度: soil_data.get(moisture), 土壤温度: soil_data.get(temp), 土壤电导率: soil_data.get(ec), 土壤氮含量: soil_data.get(n), 土壤磷含量: soil_data.get(p), 土壤钾含量: soil_data.get(k), 土壤酸碱度: soil_data.get(ph) } for key, val in soil_fields.items(): if val is not None: if isinstance(val, float): # 酸碱度保留两位小数其他保留一位 payload[key] round(val, 2 if key 土壤酸碱度 else 1) else: payload[key] val # 发布时 ensure_asciiFalse 保证中文正常显示 client.publish(MQTT_TOPIC, json.dumps(payload, ensure_asciiFalse), qos0) logger.debug(fMQTT 已发布{payload})六、主程序设计SERIAL_PORT COM10 # 串口号需根据自己实际情况进行更改 BAUDRATE 9600 # 波特率 # 根据自己实际情况进行配置波特率 # 主程序 def main(): logger.info( * 50) logger.info(综合传感器采集程序支持空气温湿度、光照、土壤多参数) logger.info(f串口{SERIAL_PORT}波特率{BAUDRATE}) logger.info(f空气温湿度传感器地址{TEMP_HUMI_SLAVE}) logger.info(f光照传感器地址{LIGHT_SLAVE}寄存器 0x{LIGHT_REGISTER_ADDR:04X}) logger.info(f土壤传感器地址{SOIL_SLAVE}酸碱度寄存器 0x{SOIL_PH_ADDR:04X}) logger.info(fMQTT 服务器{MQTT_BROKER}:{MQTT_PORT}主题{MQTT_TOPIC}) logger.info( * 50) init_db() # 初始化 MQTT mqtt_client None try: try: mqtt_client mqtt.Client(callback_api_versionmqtt.CallbackAPIVersion.VERSION2) except AttributeError: mqtt_client mqtt.Client() mqtt_client.on_connect on_connect if MQTT_USER: mqtt_client.username_pw_set(MQTT_USER, MQTT_PASSWORD) mqtt_client.connect(MQTT_BROKER, MQTT_PORT, 60) mqtt_client.loop_start() time.sleep(1) except Exception as e: logger.error(fMQTT 初始化失败{e}) # 打开串口 try: ser serial.Serial( portSERIAL_PORT, baudrateBAUDRATE, bytesize8, parityN, stopbits1, timeout1 ) logger.info(f串口 {SERIAL_PORT} 打开成功) except Exception as e: logger.error(f无法打开串口 {SERIAL_PORT}{e}) sys.exit(1) success 0 fail 0 light_zero_count 0 try: while True: logger.info(正在读取传感器数据...) temp, humi read_temp_humi(ser) light read_light(ser) if light is not None and light 0: light_zero_count 1 if light_zero_count 3: logger.warning(光照值连续为0请检查传感器是否连接或光照条件) else: light_zero_count 0 soil_data read_soil_sensor(ser) if temp is not None or humi is not None or light is not None or any(soil_data.values()): success 1 logger.info(f✓ 读取成功成功次数{success}失败次数{fail}) if temp is not None: logger.info(f 空气温度{temp:.1f}°C空气湿度{humi:.1f}%) if light is not None: logger.info(f 光照强度{light} Lux) save_temp_humi_light_to_db(temp, humi, light) if any(soil_data.values()): save_soil_to_db(soil_data) if mqtt_client: publish_all_data(mqtt_client, temp, humi, light, soil_data) else: fail 1 logger.warning(f✗ 读取失败成功次数{success}失败次数{fail}) logger.warning( 请检查传感器供电、接线、串口参数及从站地址是否正确) time.sleep(INTERVAL) except KeyboardInterrupt: logger.info(用户按 CtrlC 停止程序) finally: ser.close() if mqtt_client: mqtt_client.loop_stop() mqtt_client.disconnect() logger.info(程序已退出)七、完整程序源码详细主程序与工具函数请参考下方源码实现便于直接复制、调试和二次开发。#!/usr/bin/env python3 # -*- coding: utf-8 -*- import sys import time import json import logging import sqlite3 import struct from datetime import datetime try: import serial except ImportError: print(错误缺少 pyserial 库请运行pip install pyserial) sys.exit(1) try: import paho.mqtt.client as mqtt except ImportError: print(错误缺少 paho-mqtt 库请运行pip install paho-mqtt) sys.exit(1) # 配置区域 SERIAL_PORT COM10 # 串口号Windows下为COM10 BAUDRATE 9600 # 波特率 # 温湿度传感器从站地址 101 TEMP_HUMI_SLAVE 101 TEMP_HUMI_START_ADDR 0x0000 # 湿度(0x0000)温度(0x0001) # 光照传感器从站地址 2 LIGHT_SLAVE 2 LIGHT_REGISTER_ADDR 0x0006 # 光照寄存器地址 # 土壤综合传感器从站地址 1 SOIL_SLAVE 1 SOIL_MOISTURE_ADDR 0x0012 # 土壤湿度 SOIL_TEMP_ADDR 0x0013 # 土壤温度 SOIL_EC_ADDR 0x0015 # 土壤电导率 SOIL_N_ADDR 0x001E # 土壤氮 SOIL_P_ADDR 0x001F # 土壤磷 SOIL_K_ADDR 0x0020 # 土壤钾 SOIL_PH_ADDR 0x0006 # 土壤酸碱度从站1 # MQTT 服务器配置 MQTT_BROKER MQTT_PORT 1883 MQTT_TOPIC MQTT_USER MQTT_PASSWORD # 数据库文件 DB_FILE sensor_data.db # 采集间隔秒 INTERVAL 5 # 日志设置输出中文 logging.basicConfig(levellogging.INFO, format%(asctime)s [%(levelname)s] %(message)s) logger logging.getLogger(__name__) # Modbus RTU 工具函数 def modbus_crc16(data: bytes) - bytes: 计算 Modbus CRC16 校验值低位在前 crc 0xFFFF for byte in data: crc ^ byte for _ in range(8): if crc 0x0001: crc (crc 1) ^ 0xA001 else: crc 1 return bytes([crc 0xFF, (crc 8) 0xFF]) def build_read_command(slave_id, start_addr, quantity): 构建读取保持寄存器的 Modbus 命令 cmd struct.pack(BBHH, slave_id, 0x03, start_addr, quantity) crc modbus_crc16(cmd) return cmd crc def parse_read_response(response, slave_id, expected_quantity): 解析 Modbus 响应返回寄存器值列表 if len(response) 5: return None if response[0] ! slave_id or response[1] ! 0x03: return None byte_count response[2] if len(response) 3 byte_count 2: return None data response[3:3byte_count] recv_crc response[-2] (response[-1] 8) calc_crc modbus_crc16(response[:-2]) calc_crc_val calc_crc[0] (calc_crc[1] 8) if recv_crc ! calc_crc_val: logger.warning(fCRC校验失败收到 0x{recv_crc:04X}计算 0x{calc_crc_val:04X}) return None registers [] for i in range(0, byte_count, 2): reg_val (data[i] 8) data[i1] registers.append(reg_val) if len(registers) ! expected_quantity: return None return registers def clear_serial_buffer(ser): 清空串口缓冲区避免旧数据干扰 ser.reset_input_buffer() ser.reset_output_buffer() # 温湿度从站101 def read_temp_humi(ser): 读取空气温度和湿度 clear_serial_buffer(ser) cmd build_read_command(TEMP_HUMI_SLAVE, TEMP_HUMI_START_ADDR, 2) logger.debug(f发送温湿度命令{cmd.hex()}) ser.write(cmd) time.sleep(0.3) resp ser.read(50) if len(resp) 9: logger.error(f温湿度响应太短{len(resp)}字节{resp.hex()}) return None, None regs parse_read_response(resp, TEMP_HUMI_SLAVE, 2) if regs is None: logger.error(f解析温湿度失败{resp.hex()}) return None, None humi_raw, temp_raw regs humidity humi_raw / 10.0 if temp_raw 32767: temperature (temp_raw - 65536) / 10.0 else: temperature temp_raw / 10.0 logger.info(f空气温度{temperature:.1f}°C空气湿度{humidity:.1f}%) return temperature, humidity # 光照从站2 def read_light(ser): 读取光照强度Lux clear_serial_buffer(ser) cmd build_read_command(LIGHT_SLAVE, LIGHT_REGISTER_ADDR, 1) logger.debug(f发送光照命令从站{LIGHT_SLAVE}{cmd.hex()}) ser.write(cmd) time.sleep(0.3) resp ser.read(50) if len(resp) 7: logger.error(f光照响应太短{len(resp)}字节{resp.hex()}) return None regs parse_read_response(resp, LIGHT_SLAVE, 1) if regs is None: logger.error(f解析光照失败{resp.hex()}) return None light_value regs[0] if light_value 0: logger.warning(光照值为0请检查传感器是否被遮挡或接线异常) logger.info(f光照强度{light_value} Lux) return light_value # 土壤传感器从站1 def read_soil_parameter(ser, name, addr, divide_factor1, retry2): 读取单个土壤参数支持重试 for attempt in range(retry): clear_serial_buffer(ser) cmd build_read_command(SOIL_SLAVE, addr, 1) ser.write(cmd) time.sleep(0.3) resp ser.read(50) if len(resp) 7: logger.error(f读取{name}响应太短{len(resp)}字节{resp.hex()}) continue regs parse_read_response(resp, SOIL_SLAVE, 1) if regs is None: logger.error(f解析{name}失败{resp.hex()}) continue raw regs[0] value raw / divide_factor if divide_factor ! 1 else raw logger.debug(f土壤{name}原始值{raw}转换后{value}) return value, False return None, True def read_soil_sensor(ser): 读取土壤所有参数湿度、温度、电导率、氮磷钾、酸碱度 soil_data { moisture: None, temp: None, ec: None, n: None, p: None, k: None, ph: None } tasks [ (湿度, SOIL_MOISTURE_ADDR, 10), (温度, SOIL_TEMP_ADDR, 10), (电导率, SOIL_EC_ADDR, 1), (氮含量, SOIL_N_ADDR, 1), (磷含量, SOIL_P_ADDR, 1), (钾含量, SOIL_K_ADDR, 1), (酸碱度, SOIL_PH_ADDR, 100) # PH值原始值/100 ] for name, addr, div in tasks: value, err read_soil_parameter(ser, name, addr, div, retry2) if not err: if name 湿度: soil_data[moisture] value elif name 温度: soil_data[temp] value elif name 电导率: soil_data[ec] value elif name 氮含量: soil_data[n] value elif name 磷含量: soil_data[p] value elif name 钾含量: soil_data[k] value elif name 酸碱度: soil_data[ph] value else: logger.warning(f土壤{name}读取失败) # 输出汇总 if soil_data[moisture] is not None: logger.info(f土壤湿度{soil_data[moisture]:.1f}%) if soil_data[temp] is not None: logger.info(f土壤温度{soil_data[temp]:.1f}°C) if soil_data[ec] is not None: logger.info(f土壤电导率{soil_data[ec]} μS/cm) if soil_data[n] is not None: logger.info(f土壤氮含量{soil_data[n]} mg/kg) if soil_data[p] is not None: logger.info(f土壤磷含量{soil_data[p]} mg/kg) if soil_data[k] is not None: logger.info(f土壤钾含量{soil_data[k]} mg/kg) if soil_data[ph] is not None: logger.info(f土壤酸碱度{soil_data[ph]:.2f}) return soil_data # 数据库操作 def init_db(): 初始化数据库创建需要的表 conn sqlite3.connect(DB_FILE) cursor conn.cursor() cursor.execute( CREATE TABLE IF NOT EXISTS sensor_data ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL, temperature REAL, humidity REAL, light_intensity INTEGER ) ) cursor.execute( CREATE TABLE IF NOT EXISTS soil_data ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL, moisture REAL, temperature REAL, conductivity INTEGER, nitrogen INTEGER, phosphorus INTEGER, potassium INTEGER, ph REAL ) ) # 确保列存在兼容旧表 cursor.execute(PRAGMA table_info(soil_data)) columns [col[1] for col in cursor.fetchall()] if ph not in columns: cursor.execute(ALTER TABLE soil_data ADD COLUMN ph REAL) logger.info(已为土壤数据表添加酸碱度列) cursor.execute(PRAGMA table_info(sensor_data)) columns [col[1] for col in cursor.fetchall()] if light_intensity not in columns: cursor.execute(ALTER TABLE sensor_data ADD COLUMN light_intensity INTEGER) logger.info(已为空气数据表添加光照列) conn.commit() conn.close() logger.info(f数据库初始化完成{DB_FILE}) def save_temp_humi_light_to_db(temp, humi, light): 保存空气温湿度和光照到数据库 try: conn sqlite3.connect(DB_FILE) cursor conn.cursor() ts datetime.now().strftime(%Y-%m-%d %H:%M:%S) cursor.execute( INSERT INTO sensor_data (timestamp, temperature, humidity, light_intensity) VALUES (?, ?, ?, ?) , (ts, temp, humi, light)) conn.commit() except Exception as e: logger.error(f保存空气数据失败{e}) finally: conn.close() def save_soil_to_db(soil_data): 保存土壤数据到数据库 try: conn sqlite3.connect(DB_FILE) cursor conn.cursor() ts datetime.now().strftime(%Y-%m-%d %H:%M:%S) cursor.execute( INSERT INTO soil_data (timestamp, moisture, temperature, conductivity, nitrogen, phosphorus, potassium, ph) VALUES (?, ?, ?, ?, ?, ?, ?, ?) , ( ts, soil_data.get(moisture), soil_data.get(temp), soil_data.get(ec), soil_data.get(n), soil_data.get(p), soil_data.get(k), soil_data.get(ph) )) conn.commit() except Exception as e: logger.error(f保存土壤数据失败{e}) finally: conn.close() # MQTT 发布字段名已中文化 def on_connect(client, userdata, flags, rc, propertiesNone): if rc 0: logger.info(MQTT 服务器连接成功) else: logger.error(fMQTT 连接失败返回码{rc}) def publish_all_data(client, temp, humi, light, soil_data): 将所有传感器数据发布到 MQTT 主题字段名为中文 if not client or not client.is_connected(): logger.warning(MQTT 未连接跳过数据发布) return payload { 设备: sensor_01, 时间: datetime.now().strftime(%Y-%m-%d %H:%M:%S) } if temp is not None: payload[温度] round(temp, 1) if humi is not None: payload[湿度] round(humi, 1) if light is not None: payload[光照] light soil_fields { 土壤湿度: soil_data.get(moisture), 土壤温度: soil_data.get(temp), 土壤电导率: soil_data.get(ec), 土壤氮含量: soil_data.get(n), 土壤磷含量: soil_data.get(p), 土壤钾含量: soil_data.get(k), 土壤酸碱度: soil_data.get(ph) } for key, val in soil_fields.items(): if val is not None: if isinstance(val, float): # 酸碱度保留两位小数其他保留一位 payload[key] round(val, 2 if key 土壤酸碱度 else 1) else: payload[key] val # 发布时 ensure_asciiFalse 保证中文正常显示 client.publish(MQTT_TOPIC, json.dumps(payload, ensure_asciiFalse), qos0) logger.debug(fMQTT 已发布{payload}) # 主程序 def main(): logger.info( * 50) logger.info(综合传感器采集程序支持空气温湿度、光照、土壤多参数) logger.info(f串口{SERIAL_PORT}波特率{BAUDRATE}) logger.info(f空气温湿度传感器地址{TEMP_HUMI_SLAVE}) logger.info(f光照传感器地址{LIGHT_SLAVE}寄存器 0x{LIGHT_REGISTER_ADDR:04X}) logger.info(f土壤传感器地址{SOIL_SLAVE}酸碱度寄存器 0x{SOIL_PH_ADDR:04X}) logger.info(fMQTT 服务器{MQTT_BROKER}:{MQTT_PORT}主题{MQTT_TOPIC}) logger.info( * 50) init_db() # 初始化 MQTT mqtt_client None try: try: mqtt_client mqtt.Client(callback_api_versionmqtt.CallbackAPIVersion.VERSION2) except AttributeError: mqtt_client mqtt.Client() mqtt_client.on_connect on_connect if MQTT_USER: mqtt_client.username_pw_set(MQTT_USER, MQTT_PASSWORD) mqtt_client.connect(MQTT_BROKER, MQTT_PORT, 60) mqtt_client.loop_start() time.sleep(1) except Exception as e: logger.error(fMQTT 初始化失败{e}) # 打开串口 try: ser serial.Serial( portSERIAL_PORT, baudrateBAUDRATE, bytesize8, parityN, stopbits1, timeout1 ) logger.info(f串口 {SERIAL_PORT} 打开成功) except Exception as e: logger.error(f无法打开串口 {SERIAL_PORT}{e}) sys.exit(1) success 0 fail 0 light_zero_count 0 try: while True: logger.info(正在读取传感器数据...) temp, humi read_temp_humi(ser) light read_light(ser) if light is not None and light 0: light_zero_count 1 if light_zero_count 3: logger.warning(光照值连续为0请检查传感器是否连接或光照条件) else: light_zero_count 0 soil_data read_soil_sensor(ser) if temp is not None or humi is not None or light is not None or any(soil_data.values()): success 1 logger.info(f✓ 读取成功成功次数{success}失败次数{fail}) if temp is not None: logger.info(f 空气温度{temp:.1f}°C空气湿度{humi:.1f}%) if light is not None: logger.info(f 光照强度{light} Lux) save_temp_humi_light_to_db(temp, humi, light) if any(soil_data.values()): save_soil_to_db(soil_data) if mqtt_client: publish_all_data(mqtt_client, temp, humi, light, soil_data) else: fail 1 logger.warning(f✗ 读取失败成功次数{success}失败次数{fail}) logger.warning( 请检查传感器供电、接线、串口参数及从站地址是否正确) time.sleep(INTERVAL) except KeyboardInterrupt: logger.info(用户按 CtrlC 停止程序) finally: ser.close() if mqtt_client: mqtt_client.loop_stop() mqtt_client.disconnect() logger.info(程序已退出) if __name__ __main__: main()八、注意事项1. 串口号、波特率等参数需根据实际设备情况配置。2. 各传感器从站地址不可相同否则会导致地址冲突无法获取数据或程序运行错误。寄存器地址如重复但不在同一从站号下不会出错。3. MQTT服务器需提前配置好否则MQTTX无法获取上传数据或程序报错。4. 数据采集时间间隔可根据实际需求进行调整。