スマートメーターから消費電力データを取得してグラフ化する2026 ③pythonスクリプト作成編

python、mosquitto、Home Assistantでスマートメーターから消費電力データを取得してグラフを表示しよう、という企画です。
概要はこちらの記事をご覧ください。

この記事では、スマートメーターから消費電力データを取得するpythonスクリプトを作成します。

スマートメーター情報取得用pythonスクリプトの作成

python関連パッケージのインストール

pipで必要なパッケージをインストールします。
本来はvenvを使うべきなのかもしれませんが、サクッと済ませるために–break-system-packagesオプションにしています。

$ pip3 install pyserial paho-mqtt --break-system-packages

スクリプト作成

pythonスクリプトを作成します。

$ vi main.py
import serial
import time
import json
import paho.mqtt.client as mqtt
import sys

# ==========================================
# 設定エリア
# ==========================================
# Bルート認証IDとパスワード (電力会社から送られてきたもの)
RBID = "ここにIDを入れる" 
PWD  = "ここにパスワードを入れる"

SERIAL_PORT = "/dev/ttyUSB0"
BAUD_RATE = 115200

MQTT_BROKER = "localhost"
# トピック
MQTT_TOPIC_INSTANT = "home/smartmeter/instant"   # 瞬時電力 (W)
MQTT_TOPIC_TOTAL   = "home/smartmeter/total"     # 積算電力量 (kWh)

# ECHONET Lite フレーム定義
# 瞬時電力計測値 (E7) 要求
FRAME_E7 = b'\x10\x81\x00\x01\x05\xFF\x01\x02\x88\x01\x62\x01\xE7\x00'
# 積算電力量計測値 (E0) 要求
FRAME_E0 = b'\x10\x81\x00\x01\x05\xFF\x01\x02\x88\x01\x62\x01\xE0\x00'

# ==========================================
# シリアルポート設定
# ==========================================
ser = serial.Serial(
    SERIAL_PORT,
    BAUD_RATE,
    timeout=2,
    rtscts=False,
    dsrdtr=False,
    xonxoff=False
)

client = mqtt.Client()
ipv6_addr = ""

def mqtt_connect():
    try:
        client.connect(MQTT_BROKER, 1883, 60)
        client.loop_start()
    except Exception as e:
        print(f"MQTT Connection Error: {e}")

def read_response():
    try:
        line = ser.readline()
        return line.decode('utf-8', errors='ignore').strip()
    except:
        return ""

def send_command_wait_ok(cmd):
    #print(f"Send: {cmd}")
    ser.write(cmd.encode() + b'\r\n')
    start = time.time()
    while time.time() - start < 5:
        line = read_response()
        if line == "OK":
            #print("  -> OK")
            return True
    print(f"Timeout/Error: {cmd}")
    return False

def init_wisun():
    global ipv6_addr
    print("--- Wi-SUN 初期化 ---")
    ser.reset_input_buffer()

    # Echo Back OFF
    #print("Echo Back OFF設定...")
    ser.write(b"SKSREG SFE 0\r\n")
    time.sleep(0.5)
    ser.reset_input_buffer()

    if not send_command_wait_ok("SKSETPWD C " + PWD): return False
    if not send_command_wait_ok("SKSETRBID " + RBID): return False

    print("--- スキャン開始 ---")
    ser.write(b"SKSCAN 2 FFFFFFFF 6\r\n")

    scan_result = {}
    while True:
        line = read_response()
        if line.startswith("EVENT 22"): break
        if line.startswith("Channel:"): scan_result["Channel"] = line.split(":")[1]
        if line.startswith("Pan ID:"):  scan_result["PanID"] = line.split(":")[1]
        if line.startswith("Addr:"):    scan_result["Addr"] = line.split(":")[1]

    if "Addr" not in scan_result:
        print("スキャン失敗")
        return False

    #print(f"メーター発見: {scan_result}")

    ser.write(f"SKLL64 {scan_result['Addr']}\r\n".encode())
    while True:
        line = read_response()
        if line.startswith("FE80:"):
            ipv6_addr = line.strip()
            break

    #print(f"ターゲットIPv6: {ipv6_addr}")

    if not send_command_wait_ok(f"SKSREG S2 {scan_result['Channel']}"): return False
    if not send_command_wait_ok(f"SKSREG S3 {scan_result['PanID']}"): return False

    print("--- PANA認証開始 ---")
    ser.write(f"SKJOIN {ipv6_addr}\r\n".encode())

    while True:
        line = read_response()
        if line.startswith("EVENT 25"):
            #print(">>> PANA認証 成功 (EVENT 25) <<<")
            break
        if line.startswith("EVENT 24"):
            print("PANA認証に失敗しました")
            return False

    return True

def parse_echonet(data_hex):
    try:
        # --- 瞬時電力 (E7) の解析 ---
        idx_e7 = data_hex.find("E7")
        if idx_e7 != -1:
            if data_hex[idx_e7+2:idx_e7+4] == "04":
                hex_val = data_hex[idx_e7+4 : idx_e7+12]
                val = int(hex_val, 16)
                print(f"★ 瞬時電力: {val} W")
                client.publish(MQTT_TOPIC_INSTANT, json.dumps({"val": val}))

        # --- 積算電力量 (E0) の解析 ---
        idx_e0 = data_hex.find("E0")
        if idx_e0 != -1:
            if data_hex[idx_e0+2:idx_e0+4] == "04":
                hex_val = data_hex[idx_e0+4 : idx_e0+12]
                val = int(hex_val, 16)
                # 一般的なスマートメーターは係数1、単位0.1kWhが多い
                kwh_val = val * 0.1
                # 小数点第1位までに丸める
                kwh_val = round(kwh_val, 1)

                print(f"☆ 積算電力量: {kwh_val} kWh")
                client.publish(MQTT_TOPIC_TOTAL, json.dumps({"val": kwh_val}))

    except Exception as e:
        print(f"Parse Error: {e}")

def send_request(frame, name):
    ser.reset_input_buffer()

    # コマンド文字列
    cmd_str = f"SKSENDTO 1 {ipv6_addr} 0E1A 1 {len(frame):04X} "

    #print(f"\nリクエスト送信 ({name})...")
    ser.write(cmd_str.encode() + frame)

    start = time.time()
    got_ok = False

    while time.time() - start < 5:
        line = read_response()
        if line == "OK":
            got_ok = True

        if line.startswith("ERXUDP"):
            parts = line.split(" ")
            if len(parts) >= 9:
                parse_echonet(parts[8])
            return # データ取れたら即終了

    if not got_ok:
        print("  -> (警告) スマートメーターからOKが返ってきません")

def main_loop():
    print("--- データ取得ループ開始 ---")
    time.sleep(2)

    # 連続失敗カウント用
    fail_count = 0

    while True:
        # 1. 瞬時電力 (E7)
        # 連続失敗は、簡易的に「例外発生」や「タイムアウト」でカウント

        try:
            # 既存の処理...
            send_request(FRAME_E7, "E7:瞬時")
            time.sleep(2)
            send_request(FRAME_E0, "E0:積算")

            # ここまでエラーなく来たらカウンタリセット
            fail_count = 0

        except Exception as e:
            print(f"エラー発生: {e}")
            fail_count += 1

        # 連続で10回失敗(約5分間通信不能)したら、スクリプトを強制終了する
        # -> Systemdがそれを検知してサービス再起動
        if fail_count >= 10:
            print("!!! 連続失敗回数が上限に達しました。スクリプトを終了します !!!")
            sys.exit(1)

        #print("--- 待機 (30s) ---")
        time.sleep(30)

if __name__ == "__main__":
    mqtt_connect()
    if init_wisun():
        main_loop()
    else:
        ser.close()

スクリプト実行

スクリプト単体で実行してみます。
まず、radxaユーザがWi-SUNアダプタ(/dev/ttyUSB0)にアクセスできるよう、dialoutグループに所属させます。

$ sudo usermod -aG dialout $USER

グループ設定の反映のために一度ログアウトしてログインしなおします。

スクリプトを実行します。

$ python3 main.py

スマートメーターとの通信がうまくいけば、以下のような電力値が表示されます。

★ 瞬時電力: 1234 W
☆ 積算電力量: 123456.7 kWh

pythonスクリプトをサービス化する

Systemd サービスファイルの作成

以下の設定ファイルを新規作成します。
mosquittが起動した後にスクリプトが起動しないとエラーが発生するので、ポート1883が開くのを待つ処理を入れています。

$ sudo vi /etc/systemd/system/smartmeter.service
[Unit]
Description=Wi-SUN Smart Meter Reader
# docker.service の後に起動
After=docker.service network.target

[Service]
User=radxa
Group=radxa
WorkingDirectory=/home/radxa/smartmeter
ExecStart=/usr/bin/python3 /home/radxa/smartmeter/main.py

# ポート1883 (MQTT) が応答するようになるまで、起動前に最大60秒待機する
ExecStartPre=/bin/bash -c 'until timeout 1s bash -c "</dev/tcp/localhost/1883"; do sleep 1; done'

Restart=always
RestartSec=10
Environment=PYTHONUNBUFFERED=1

[Install]
WantedBy=multi-user.target

サービスの有効化・起動

サービスを有効化・起動します。

$ sudo systemctl daemon-reload
$ sudo systemctl enable smartmeter.service
$ sudo systemctl start smartmeter.service

起動後、状態を確認します。
状態がactive (running)で、画面下部のログにエラーが無ければとりあえずOKです。

$ sudo systemctl status smartmeter.service

「python3 main.py」で実行したときには画面に表示されていた出力は、サービス化後は以下のコマンドで確認できます。

$ sudo journalctl -u smartmeter.service -f

main.pyは瞬時電力/積算電力量を画面に表示し続けるようになっていますが、このままではSystemdのログが肥大化するので、main.pyの以下の2行をコメントアウトするとよいでしょう。

#print(f"★ 瞬時電力: {val} W")
#print(f"☆ 積算電力量: {kwh_val} kWh")

OSを再起動して、smartmeter.serviceが自動起動することを確認します。

$ sudo reboot

(再起動後に)

$ sudo systemctl status smartmeter.service

smartmeter.service が自動起動していたらOKです。

続きはこちら

コメント

タイトルとURLをコピーしました