python、mosquitto、Home Assistantでスマートメーターから消費電力データを取得してグラフを表示しよう、という企画です。
概要はこちらの記事をご覧ください。
この記事では、スマートメーターから消費電力データを取得するpythonスクリプトを作成します。
スマートメーター情報取得用pythonスクリプトの作成
python関連パッケージのインストール
pipで必要なパッケージをインストールします。
本来はvenvを使うべきなのかもしれませんが、サクッと済ませるために–break-system-packagesオプションにしています。
$ pip3 install pyserial paho-mqtt --break-system-packages
スクリプト作成
pythonスクリプトを作成します。
$ vi main.py
import serial
import time
import json
import paho.mqtt.client as mqtt
import sys
# ==========================================
# 設定エリア
# ==========================================
# Bルート認証IDとパスワード (電力会社から送られてきたもの)
RBID = "ここにIDを入れる"
PWD = "ここにパスワードを入れる"
SERIAL_PORT = "/dev/ttyUSB0"
BAUD_RATE = 115200
MQTT_BROKER = "localhost"
# トピック
MQTT_TOPIC_INSTANT = "home/smartmeter/instant" # 瞬時電力 (W)
MQTT_TOPIC_TOTAL = "home/smartmeter/total" # 積算電力量 (kWh)
# ECHONET Lite フレーム定義
# 瞬時電力計測値 (E7) 要求
FRAME_E7 = b'\x10\x81\x00\x01\x05\xFF\x01\x02\x88\x01\x62\x01\xE7\x00'
# 積算電力量計測値 (E0) 要求
FRAME_E0 = b'\x10\x81\x00\x01\x05\xFF\x01\x02\x88\x01\x62\x01\xE0\x00'
# ==========================================
# シリアルポート設定
# ==========================================
ser = serial.Serial(
SERIAL_PORT,
BAUD_RATE,
timeout=2,
rtscts=False,
dsrdtr=False,
xonxoff=False
)
client = mqtt.Client()
ipv6_addr = ""
def mqtt_connect():
try:
client.connect(MQTT_BROKER, 1883, 60)
client.loop_start()
except Exception as e:
print(f"MQTT Connection Error: {e}")
def read_response():
try:
line = ser.readline()
return line.decode('utf-8', errors='ignore').strip()
except:
return ""
def send_command_wait_ok(cmd):
#print(f"Send: {cmd}")
ser.write(cmd.encode() + b'\r\n')
start = time.time()
while time.time() - start < 5:
line = read_response()
if line == "OK":
#print(" -> OK")
return True
print(f"Timeout/Error: {cmd}")
return False
def init_wisun():
global ipv6_addr
print("--- Wi-SUN 初期化 ---")
ser.reset_input_buffer()
# Echo Back OFF
#print("Echo Back OFF設定...")
ser.write(b"SKSREG SFE 0\r\n")
time.sleep(0.5)
ser.reset_input_buffer()
if not send_command_wait_ok("SKSETPWD C " + PWD): return False
if not send_command_wait_ok("SKSETRBID " + RBID): return False
print("--- スキャン開始 ---")
ser.write(b"SKSCAN 2 FFFFFFFF 6\r\n")
scan_result = {}
while True:
line = read_response()
if line.startswith("EVENT 22"): break
if line.startswith("Channel:"): scan_result["Channel"] = line.split(":")[1]
if line.startswith("Pan ID:"): scan_result["PanID"] = line.split(":")[1]
if line.startswith("Addr:"): scan_result["Addr"] = line.split(":")[1]
if "Addr" not in scan_result:
print("スキャン失敗")
return False
#print(f"メーター発見: {scan_result}")
ser.write(f"SKLL64 {scan_result['Addr']}\r\n".encode())
while True:
line = read_response()
if line.startswith("FE80:"):
ipv6_addr = line.strip()
break
#print(f"ターゲットIPv6: {ipv6_addr}")
if not send_command_wait_ok(f"SKSREG S2 {scan_result['Channel']}"): return False
if not send_command_wait_ok(f"SKSREG S3 {scan_result['PanID']}"): return False
print("--- PANA認証開始 ---")
ser.write(f"SKJOIN {ipv6_addr}\r\n".encode())
while True:
line = read_response()
if line.startswith("EVENT 25"):
#print(">>> PANA認証 成功 (EVENT 25) <<<")
break
if line.startswith("EVENT 24"):
print("PANA認証に失敗しました")
return False
return True
def parse_echonet(data_hex):
try:
# --- 瞬時電力 (E7) の解析 ---
idx_e7 = data_hex.find("E7")
if idx_e7 != -1:
if data_hex[idx_e7+2:idx_e7+4] == "04":
hex_val = data_hex[idx_e7+4 : idx_e7+12]
val = int(hex_val, 16)
print(f"★ 瞬時電力: {val} W")
client.publish(MQTT_TOPIC_INSTANT, json.dumps({"val": val}))
# --- 積算電力量 (E0) の解析 ---
idx_e0 = data_hex.find("E0")
if idx_e0 != -1:
if data_hex[idx_e0+2:idx_e0+4] == "04":
hex_val = data_hex[idx_e0+4 : idx_e0+12]
val = int(hex_val, 16)
# 一般的なスマートメーターは係数1、単位0.1kWhが多い
kwh_val = val * 0.1
# 小数点第1位までに丸める
kwh_val = round(kwh_val, 1)
print(f"☆ 積算電力量: {kwh_val} kWh")
client.publish(MQTT_TOPIC_TOTAL, json.dumps({"val": kwh_val}))
except Exception as e:
print(f"Parse Error: {e}")
def send_request(frame, name):
ser.reset_input_buffer()
# コマンド文字列
cmd_str = f"SKSENDTO 1 {ipv6_addr} 0E1A 1 {len(frame):04X} "
#print(f"\nリクエスト送信 ({name})...")
ser.write(cmd_str.encode() + frame)
start = time.time()
got_ok = False
while time.time() - start < 5:
line = read_response()
if line == "OK":
got_ok = True
if line.startswith("ERXUDP"):
parts = line.split(" ")
if len(parts) >= 9:
parse_echonet(parts[8])
return # データ取れたら即終了
if not got_ok:
print(" -> (警告) スマートメーターからOKが返ってきません")
def main_loop():
print("--- データ取得ループ開始 ---")
time.sleep(2)
# 連続失敗カウント用
fail_count = 0
while True:
# 1. 瞬時電力 (E7)
# 連続失敗は、簡易的に「例外発生」や「タイムアウト」でカウント
try:
# 既存の処理...
send_request(FRAME_E7, "E7:瞬時")
time.sleep(2)
send_request(FRAME_E0, "E0:積算")
# ここまでエラーなく来たらカウンタリセット
fail_count = 0
except Exception as e:
print(f"エラー発生: {e}")
fail_count += 1
# 連続で10回失敗(約5分間通信不能)したら、スクリプトを強制終了する
# -> Systemdがそれを検知してサービス再起動
if fail_count >= 10:
print("!!! 連続失敗回数が上限に達しました。スクリプトを終了します !!!")
sys.exit(1)
#print("--- 待機 (30s) ---")
time.sleep(30)
if __name__ == "__main__":
mqtt_connect()
if init_wisun():
main_loop()
else:
ser.close()
スクリプト実行
スクリプト単体で実行してみます。
まず、radxaユーザがWi-SUNアダプタ(/dev/ttyUSB0)にアクセスできるよう、dialoutグループに所属させます。
$ sudo usermod -aG dialout $USER
グループ設定の反映のために一度ログアウトしてログインしなおします。
スクリプトを実行します。
$ python3 main.py
スマートメーターとの通信がうまくいけば、以下のような電力値が表示されます。
★ 瞬時電力: 1234 W
☆ 積算電力量: 123456.7 kWh
pythonスクリプトをサービス化する
Systemd サービスファイルの作成
以下の設定ファイルを新規作成します。
mosquittが起動した後にスクリプトが起動しないとエラーが発生するので、ポート1883が開くのを待つ処理を入れています。
$ sudo vi /etc/systemd/system/smartmeter.service
[Unit]
Description=Wi-SUN Smart Meter Reader
# docker.service の後に起動
After=docker.service network.target
[Service]
User=radxa
Group=radxa
WorkingDirectory=/home/radxa/smartmeter
ExecStart=/usr/bin/python3 /home/radxa/smartmeter/main.py
# ポート1883 (MQTT) が応答するようになるまで、起動前に最大60秒待機する
ExecStartPre=/bin/bash -c 'until timeout 1s bash -c "</dev/tcp/localhost/1883"; do sleep 1; done'
Restart=always
RestartSec=10
Environment=PYTHONUNBUFFERED=1
[Install]
WantedBy=multi-user.target
サービスの有効化・起動
サービスを有効化・起動します。
$ sudo systemctl daemon-reload
$ sudo systemctl enable smartmeter.service
$ sudo systemctl start smartmeter.service
起動後、状態を確認します。
状態がactive (running)で、画面下部のログにエラーが無ければとりあえずOKです。
$ sudo systemctl status smartmeter.service
「python3 main.py」で実行したときには画面に表示されていた出力は、サービス化後は以下のコマンドで確認できます。
$ sudo journalctl -u smartmeter.service -f
main.pyは瞬時電力/積算電力量を画面に表示し続けるようになっていますが、このままではSystemdのログが肥大化するので、main.pyの以下の2行をコメントアウトするとよいでしょう。
#print(f"★ 瞬時電力: {val} W")
#print(f"☆ 積算電力量: {kwh_val} kWh")
OSを再起動して、smartmeter.serviceが自動起動することを確認します。
$ sudo reboot
(再起動後に)
$ sudo systemctl status smartmeter.service
smartmeter.service が自動起動していたらOKです。


コメント