スズハドットコム

IT関連や3Dプリンタの記事、たまに生活のメモを書いていきます。

スマートメーターからBルート経由でデータを取得する

電力スマートメーターにはBルートというものがあります。
HEMS(Home Energy Management System)機器に、リアルタイムの消費電力や履歴データを送るルートです。
www.tepco.co.jp これを使って、ラズパイでデータ取得してみます。

用意するもの

  • ラズパイ
    今回はラズパイ3Bを使いました。zeroでも4Bでも大丈夫なはずです。

  • Wi-SUNモジュール RL7023 Stick-D/IPS
    スマートメーターとの通信はWi-SUNという規格でおこないます。そのためのUSB通信モジュールです。
    https://www.tessera.co.jp/rl7023stick-d_ips.htmlwww.tessera.co.jp

  • スマートメーターと通信するためのID/パスワード
    電力会社から発行してもらいます。
    東京電力の申し込み方法は、冒頭のリンクに説明があります。

コード

#!/usr/bin/env python
# -*- coding: utf-8 -*-



import sys
import serial
import time


# 接続処理
def connect():
    global rbid     # BルートID
    global rbpwd    # Bルートパスワード
    global ser      # シリアルポート
    global ipv6Addr # スマートメータのIPv6アドレス
    
    # Bルート認証パスワード設定
    ser_cmd_str="SKSETPWD C " + rbpwd + "\r\n"
    ser.write(str.encode(ser_cmd_str))
    print(ser.readline().decode(), end="") # エコーバック
    print(ser.readline().decode(), end="") # OK
    
    # Bルート認証ID設定
    ser_cmd_str="SKSETRBID " + rbid + "\r\n"
    ser.write(str.encode(ser_cmd_str))
    print(ser.readline().decode(), end="") # エコーバック
    print(ser.readline().decode(), end="") # OK
    
    scanDuration = 6; # スキャン時間
    scanRes = {}      # スキャン結果格納用
    
    # スキャン結果が見つかるまでループ
    while "Channel" not in scanRes :
        ser_cmd_str="SKSCAN 2 FFFFFFFF " + str(scanDuration) + "\r\n"
        ser.write(str.encode(ser_cmd_str))
        
        # イベント22=スキャン終了 が返ってくるまで読み込みを続ける
        scanEnd = False
        while not scanEnd :
            line = ser.readline().decode()
            print(line, end="")
            
            if line.startswith("EVENT 22") :
                # スキャン終了
                scanEnd = True
            elif line.startswith("  ") :
                # スキャン結果があれば、先頭にスペース2個あけてデータが返ってくるので取得する
                # 例
                #  Channel:39
                #  Channel Page:09
                #  Pan ID:FFFF
                #  Addr:FFFFFFFFFFFFFFFF
                #  LQI:A7
                #  PairID:FFFFFFFF
                cols = line.strip().split(':')
                scanRes[cols[0]] = cols[1]
        
        # スキャン終了しても何も見つからない場合は、スキャン時間を伸ばしてリトライ
        scanDuration+=1
        
        if 7 < scanDuration and "Channel" not in scanRes:
            # スキャン時間は14まで指定できるが、7でやめておく
            print("スキャンリトライオーバー")
            sys.exit()  #### 終了 ####
    
    # スキャン結果からChannelを設定。
    ser_cmd_str="SKSREG S2 " + scanRes["Channel"] + "\r\n"
    ser.write(str.encode(ser_cmd_str))
    print(ser.readline().decode(), end="") # エコーバック
    print(ser.readline().decode(), end="") # OK
    
    # スキャン結果からPan IDを設定
    ser_cmd_str="SKSREG S3 " + scanRes["Pan ID"] + "\r\n"
    ser.write(str.encode(ser_cmd_str))
    print(ser.readline().decode(), end="") # エコーバック
    print(ser.readline().decode(), end="") # OK
    
    # スキャン結果からMACアドレス(64bit)を取得し、IPV6リンクローカルアドレスに変換
    ser_cmd_str="SKLL64 " + scanRes["Addr"] + "\r\n"
    ser.write(str.encode(ser_cmd_str))
    print(ser.readline().decode(), end="") # エコーバック
    ipv6Addr = ser.readline().decode().strip()
    print(ipv6Addr)
    
    # PANA 接続シーケンスを開始
    ser_cmd_str="SKJOIN " + ipv6Addr + "\r\n"
    ser.write(str.encode(ser_cmd_str))
    print(ser.readline().decode(), end="") # エコーバック
    print(ser.readline().decode(), end="") # OK
    
    # PANA 接続完了待ち(イベント25が返ってくるのを待つ)
    bConnected = False
    while not bConnected :
        line = ser.readline().decode()
        print(line, end="")
        if line.startswith("EVENT 24") :
            print("PANA 接続失敗")
            sys.exit()  #### 終了 ####
        elif line.startswith("EVENT 25") :
            # 接続完了!
            bConnected = True

# 係数
# 積算電力量計測値 × 係数 × 積算電力量単位 = 実使用量 となる。
def getD3Factor():
    global ser # シリアルポート
    
    # ECHONET Lite フレーム作成
    echonetLiteFrame = b""
    echonetLiteFrame += b"\x10\x81"      # EHD
    echonetLiteFrame += b"\x00\x01"      # TID
    # ここから EDATA
    echonetLiteFrame += b"\x05\xFF\x01"  # SEOJ
    echonetLiteFrame += b"\x02\x88\x01"  # DEOJ
    echonetLiteFrame += b"\x62"          # ESV(62:プロパティ値読み出し要求)
    echonetLiteFrame += b"\x01"          # OPC(1個)
    echonetLiteFrame += b"\xD3"          # EPC
    echonetLiteFrame += b"\x00"          # PDC
    
    # コマンド送信
    ser_cmd_str="SKSENDTO 1 {0} 0E1A 1 {1:04X} ".format(ipv6Addr, len(echonetLiteFrame))
    ser_cmd_byte=str.encode(ser_cmd_str) + echonetLiteFrame
    ser.write(ser_cmd_byte)
    bE5read = False
    while not bE5read:
        line = ser.readline().decode()         # ERXUDPが来るはず
        print(line, end="")
        if line.startswith("ERXUDP") :
            cols = line.strip().split(' ')
            res = cols[8]   # UDP受信データ部分
            #tid = res[4:4+4];
            seoj = res[8:8+6]
            #deoj = res[14,14+6]
            ESV = res[20:20+2]
            #OPC = res[22,22+2]
            if seoj == "028801" and ESV == "72" :
                # スマートメーター(028801)から来た応答(72)なら
                EPC = res[24:24+2]
                if EPC == "D3" :
                    # 内容が係数(D3)だったら
                    hexFactor = res[-8:]    # 最後の4バイト(16進数で8文字)が値
                    intFactor = int(hexFactor, 16)
                    bE5read = True
    return intFactor


# 有効桁数(1~8)
def getD7EffectiveDigit():
    global ser # シリアルポート
    
    # ECHONET Lite フレーム作成
    echonetLiteFrame = b""
    echonetLiteFrame += b"\x10\x81"      # EHD
    echonetLiteFrame += b"\x00\x01"      # TID
    # ここから EDATA
    echonetLiteFrame += b"\x05\xFF\x01"  # SEOJ
    echonetLiteFrame += b"\x02\x88\x01"  # DEOJ
    echonetLiteFrame += b"\x62"          # ESV(62:プロパティ値読み出し要求)
    echonetLiteFrame += b"\x01"          # OPC(1個)
    echonetLiteFrame += b"\xD7"          # EPC
    echonetLiteFrame += b"\x00"          # PDC
    
    # コマンド送信
    ser_cmd_str="SKSENDTO 1 {0} 0E1A 1 {1:04X} ".format(ipv6Addr, len(echonetLiteFrame))
    ser_cmd_byte=str.encode(ser_cmd_str) + echonetLiteFrame
    ser.write(ser_cmd_byte)
    bE5read = False
    while not bE5read:
        line = ser.readline().decode()         # ERXUDPが来るはず
        print(line, end="")
        if line.startswith("ERXUDP") :
            cols = line.strip().split(' ')
            res = cols[8]   # UDP受信データ部分
            #tid = res[4:4+4];
            seoj = res[8:8+6]
            #deoj = res[14,14+6]
            ESV = res[20:20+2]
            #OPC = res[22,22+2]
            if seoj == "028801" and ESV == "72" :
                # スマートメーター(028801)から来た応答(72)なら
                EPC = res[24:24+2]
                if EPC == "D7" :
                    # 内容が有効桁数(D7)だったら
                    hexEffectiveDigit = res[-2:]    # 最後の1バイト(16進数で2文字)が値
                    intEffectiveDigit = int(hexEffectiveDigit, 16)
                    bE5read = True
    return intEffectiveDigit


# 積算電力量単位
# 0x00:1kWh
# 0x01:0.1kWh
# 0x02:0.01kWh
# 0x03:0.001kWh
# 0x04:0.0001kWh
# 0x0A:10kWh
# 0x0B:100kWh
# 0x0C:1000kWh
# 0x0D:10000kWh
def getE1Unit():
    global ser # シリアルポート
    
    # ECHONET Lite フレーム作成
    echonetLiteFrame = b""
    echonetLiteFrame += b"\x10\x81"      # EHD
    echonetLiteFrame += b"\x00\x01"      # TID
    # ここから EDATA
    echonetLiteFrame += b"\x05\xFF\x01"  # SEOJ
    echonetLiteFrame += b"\x02\x88\x01"  # DEOJ
    echonetLiteFrame += b"\x62"          # ESV(62:プロパティ値読み出し要求)
    echonetLiteFrame += b"\x01"          # OPC(1個)
    echonetLiteFrame += b"\xE1"          # EPC
    echonetLiteFrame += b"\x00"          # PDC
    
    # コマンド送信
    ser_cmd_str="SKSENDTO 1 {0} 0E1A 1 {1:04X} ".format(ipv6Addr, len(echonetLiteFrame))
    ser_cmd_byte=str.encode(ser_cmd_str) + echonetLiteFrame
    ser.write(ser_cmd_byte)
    bE5read = False
    while not bE5read:
        line = ser.readline().decode()         # ERXUDPが来るはず
        print(line, end="")
        if line.startswith("ERXUDP") :
            cols = line.strip().split(' ')
            res = cols[8]   # UDP受信データ部分
            #tid = res[4:4+4];
            seoj = res[8:8+6]
            #deoj = res[14,14+6]
            ESV = res[20:20+2]
            #OPC = res[22,22+2]
            if seoj == "028801" and ESV == "72" :
                # スマートメーター(028801)から来た応答(72)なら
                EPC = res[24:24+2]
                if EPC == "E1" :
                    # 内容が単位(E1)だったら
                    hexUnit = res[-2:]    # 最後の1バイト(16進数で2文字)が値
                    bE5read = True
    return hexUnit


# 積算履歴収集日1(取得)
# 0:当日 1~99:指定した日数分前
def getE5HistCollectDays1():
    global ser # シリアルポート
    
    # ECHONET Lite フレーム作成
    echonetLiteFrame = b""
    echonetLiteFrame += b"\x10\x81"      # EHD
    echonetLiteFrame += b"\x00\x01"      # TID
    # ここから EDATA
    echonetLiteFrame += b"\x05\xFF\x01"  # SEOJ
    echonetLiteFrame += b"\x02\x88\x01"  # DEOJ
    echonetLiteFrame += b"\x62"          # ESV(62:プロパティ値読み出し要求)
    echonetLiteFrame += b"\x01"          # OPC(1個)
    echonetLiteFrame += b"\xE5"          # EPC
    echonetLiteFrame += b"\x00"          # PDC
    
    # コマンド送信
    ser_cmd_str="SKSENDTO 1 {0} 0E1A 1 {1:04X} ".format(ipv6Addr, len(echonetLiteFrame))
    ser_cmd_byte=str.encode(ser_cmd_str) + echonetLiteFrame
    ser.write(ser_cmd_byte)
    bE5read = False
    while not bE5read:
        line = ser.readline().decode()         # ERXUDPが来るはず
        print(line, end="")
        if line.startswith("ERXUDP") :
            cols = line.strip().split(' ')
            res = cols[8]   # UDP受信データ部分
            #tid = res[4:4+4];
            seoj = res[8:8+6]
            #deoj = res[14,14+6]
            ESV = res[20:20+2]
            #OPC = res[22,22+2]
            if seoj == "028801" and ESV == "72" :
                # スマートメーター(028801)から来た応答(72)なら
                EPC = res[24:24+2]
                if EPC == "E5" :
                    # 内容が積算履歴収集日(E5)だったら
                    hexHistCollectDays = res[-2:]    # 最後の1バイト(16進数で2文字)が値
                    intHistCollectDays = int(hexHistCollectDays, 16)
                    bE5read = True
    return intHistCollectDays


# 積算履歴収集日1(設定)
# 0:当日 1~99:指定した日数分前
def setE5HistCollectDays1(days_before):
    global ser # シリアルポート
    
    # ECHONET Lite フレーム作成
    echonetLiteFrame = b""
    echonetLiteFrame += b"\x10\x81"      # EHD
    echonetLiteFrame += b"\x00\x01"      # TID
    # ここから EDATA
    echonetLiteFrame += b"\x05\xFF\x01"  # SEOJ
    echonetLiteFrame += b"\x02\x88\x01"  # DEOJ
    echonetLiteFrame += b"\x61"          # ESV(61:プロパティ値書き込み要求)
    echonetLiteFrame += b"\x01"          # OPC(1個)
    echonetLiteFrame += b"\xE5"          # EPC
    echonetLiteFrame += b"\x01"          # PDC
    echonetLiteFrame += days_before.to_bytes(1,'big') # EDT(days_before 日前のデータを取得)(参考:EL p.3-9)
    
    # コマンド送信
    ser_cmd_str="SKSENDTO 1 {0} 0E1A 1 {1:04X} ".format(ipv6Addr, len(echonetLiteFrame))
    ser_cmd_byte=str.encode(ser_cmd_str) + echonetLiteFrame
    ser.write(ser_cmd_byte)


# 積算電力量計測値履歴1
# 積算履歴収集日1の値、および
# 該当日の24時間48コマ分(0時0分~23時30分)の正方向の定時積算電力量計測値の履歴データを
# 時系列順に示す。
def getE2EnergyHist1():
    global ser # シリアルポート
    
    # ECHONET Lite フレーム作成
    echonetLiteFrame = b""
    echonetLiteFrame += b"\x10\x81"      # EHD
    echonetLiteFrame += b"\x00\x01"      # TID
    # ここから EDATA
    echonetLiteFrame += b"\x05\xFF\x01"  # SEOJ
    echonetLiteFrame += b"\x02\x88\x01"  # DEOJ
    echonetLiteFrame += b"\x62"          # ESV(62:プロパティ値読み出し要求)
    echonetLiteFrame += b"\x01"          # OPC(1個)
    echonetLiteFrame += b"\xE2"          # EPC
    echonetLiteFrame += b"\x00"          # PDC
    
    # コマンド送信
    ser_cmd_str="SKSENDTO 1 {0} 0E1A 1 {1:04X} ".format(ipv6Addr, len(echonetLiteFrame))
    ser_cmd_byte=str.encode(ser_cmd_str) + echonetLiteFrame
    ser.write(ser_cmd_byte)
    
    bE2read = False
    while not bE2read:
        line = ser.readline().decode()         # ERXUDPが来るはず
        print(line, end="")
        if line.startswith("ERXUDP") :
            cols = line.strip().split(' ')
            res = cols[8]   # UDP受信データ部分
            #tid = res[4:4+4];
            seoj = res[8:8+6]
            #deoj = res[14,14+6]
            ESV = res[20:20+2]
            #OPC = res[22,22+2]
            if seoj == "028801" and ESV == "72" :
                # スマートメーター(028801)から来た応答(72)なら
                EPC = res[24:24+2]
                if EPC == "E2" :
                    # 内容がE2だったら
                    # 積算履歴収集日1
                    days_before = str(int(res[30:30+2]))
                    print("積算履歴収集日1:{0}".format(days_before))
                    # 積算電力
                    offset = 8
                    len_data= offset*48
                    line = res[32:32+len_data]
                    flg = True
                    cnt = 0
                    body = ""
                    while flg:
                        start = cnt*offset
                        intPower = str(int(line[start:start+offset],16))
                        body = body +  intPower + ","
                        cnt += 1
                        if 47 < cnt :
                            flg = False
                    print("積算電力量計測値履歴")
                    print(body)
                    bE2read = True

########## 認証情報定義 ##########
# Bルート認証ID
rbid  = "0123456789ABCDEFGHIJKLMNOPQRSTUV"
# Bルート認証パスワード
rbpwd = "0123456789AB"

########## デバイス定義 ##########
# シリアルポートデバイス名
serialPortDev = '/dev/ttyUSB0'

########## 処理スタート ##########
# シリアルポート初期化
ser = serial.Serial(serialPortDev, 115200)

ser_cmd_str="SKRESET\r\n"
ser.write(str.encode(ser_cmd_str))
print(ser.readline().decode(), end="") # エコーバック
print(ser.readline().decode(), end="") # OK

ser_cmd_str="SKINFO\r\n"
ser.write(str.encode(ser_cmd_str))
print(ser.readline().decode(), end="") # エコーバック
print(ser.readline().decode(), end="") # バージョン

########## スマメとの接続処理 ##########

ipv6Addr = "" # スマートメータのIPv6アドレス
connect()

# これ以降、シリアル通信のタイムアウトを設定
ser.timeout = 2

# スマートメーターがインスタンスリスト通知を投げてくる
# (ECHONET-Lite_Ver.1.12_02.pdf p.4-16)
print(ser.readline(), end="") #無視

########## 各種設定値の取得 ##########
RC = getD3Factor()
print("係数:{0}".format(RC))

RC = getD7EffectiveDigit()
print("有効桁数:{0}".format(RC))

RC = getE1Unit()
print("単位:{0}".format(RC))

########## E5(積算履歴収集日1)設定処理 ##########
setE5HistCollectDays1(1)

########## E5(積算履歴収集日1)取得処理 ##########
RC = getE5HistCollectDays1()
print("積算履歴収集日1:{0}".format(RC))

########## E2(積算電力量計測値履歴)取得処理 ##########
getE2EnergyHist1()

ser.close()

実行例

$ python3 get_power_history.py
SKRESET
OK

(中略)

EVENT 21 FE80:0000:0000:0000:XXXX:XXXX:XXXX:XXXX 00
OK
ERXUDP FE80:0000:0000:0000:XXXX:XXXX:XXXX:XXXX FE80:0000:0000:0000:YYYY:YYYY:YYYY:YYYY 0E1A 0E1A A06610FFFED325BD 1 0012 1081000102880105FF017201D30400000001
係数:1
SKSENDTO 1 FE80:0000:0000:0000:XXXX:XXXX:XXXX:XXXX 0E1A 1 000E
EVENT 21 FE80:0000:0000:0000:XXXX:XXXX:XXXX:XXXX 01
OK
ERXUDP FE80:0000:0000:0000:XXXX:XXXX:XXXX:XXXX FE80:0000:0000:0000:YYYY:YYYY:YYYY:YYYY 0E1A 0E1A A06610FFFED325BD 1 000F 1081000102880105FF017201D70106
有効桁数:6
SKSENDTO 1 FE80:0000:0000:0000:XXXX:XXXX:XXXX:XXXX 0E1A 1 000E
EVENT 21 FE80:0000:0000:0000:XXXX:XXXX:XXXX:XXXX 01
OK
ERXUDP FE80:0000:0000:0000:XXXX:XXXX:XXXX:XXXX FE80:0000:0000:0000:YYYY:YYYY:YYYY:YYYY 0E1A 0E1A A06610FFFED325BD 1 000F 1081000102880105FF017201E10101
単位:01
SKSENDTO 1 FE80:0000:0000:0000:XXXX:XXXX:XXXX:XXXX 0E1A 1 000F
EVENT 21 FE80:0000:0000:0000:XXXX:XXXX:XXXX:XXXX 01
OK
SKSENDTO 1 FE80:0000:0000:0000:XXXX:XXXX:XXXX:XXXX 0E1A 1 000E
EVENT 21 FE80:0000:0000:0000:XXXX:XXXX:XXXX:XXXX 00
OK
ERXUDP FE80:0000:0000:0000:XXXX:XXXX:XXXX:XXXX FE80:0000:0000:0000:YYYY:YYYY:YYYY:YYYY 0E1A 0E1A A06610FFFED325BD 1 000E 1081000102880105FF017101E500
ERXUDP FE80:0000:0000:0000:XXXX:XXXX:XXXX:XXXX FE80:0000:0000:0000:YYYY:YYYY:YYYY:YYYY 0E1A 0E1A A06610FFFED325BD 1 000F 1081000102880105FF017201E50101
積算履歴収集日1:1
SKSENDTO 1 FE80:0000:0000:0000:XXXX:XXXX:XXXX:XXXX 0E1A 1 000E
EVENT 21 FE80:0000:0000:0000:XXXX:XXXX:XXXX:XXXX 01
OK

積算履歴収集日1:1
積算電力量計測値履歴
196909,196918,196932,196947,196959,196971,196983,196996,197006,197017,197025,197028,197030,197045,197053,197061,197070,197074,197076,197077,197079,197080,197081,197082,197084,197085,197086,197088,197089,197090,197091,197093,197094,197095,197096,197098,197099,197100,197109,197124,197133,197145,197162,197182,197195,197204,197211,197219,

無事、スマートメーターの各種設定値と、1日前の積算履歴(毎時00分、30分の積算電力量)が取得できました。
setE5HistCollectDays1(1) の引数を変更すれば、当日から99日前まで取得できます。

今後

取得した値を別のアプリに喰わせてグラフ化などしたい。

参考にさせて頂いたサイト

qiita.com

mina2.sama.to

echonet.jp