スマートメーターからBルート経由でデータを取得する

電力スマートメーターにはBルートというものがあります。
HEMS(Home Energy Management System)機器に、リアルタイムの消費電力や履歴データを送るルートです。
www.tepco.co.jp
これを使って、ラズパイでデータ取得してみます。

用意するもの

  • ラズパイ
    今回はラズパイ3Bを使いました。zeroでも4Bでも大丈夫なはずです。

  • Wi-SUNモジュール RL7023 Stick-D/IPS
    スマートメーターとの通信はWi-SUNという規格でおこないます。そのためのUSB通信モジュールです。

    www.tessera.co.jp

  • スマートメーターと通信するためのID/パスワード
    電力会社から発行してもらいます。
    東京電力の申し込み方法は、冒頭のリンクに説明があります。

コード

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys
import serial
import time
# 接続処理
def connect():
global rbid     # BルートID
global rbpwd    # Bルートパスワード
global ser      # シリアルポート
global ipv6Addr # スマートメータのIPv6アドレス
# Bルート認証パスワード設定
ser_cmd_str="SKSETPWD C " + rbpwd + "\r\n"
ser.write(str.encode(ser_cmd_str))
print(ser.readline().decode(), end="") # エコーバック
print(ser.readline().decode(), end="") # OK
# Bルート認証ID設定
ser_cmd_str="SKSETRBID " + rbid + "\r\n"
ser.write(str.encode(ser_cmd_str))
print(ser.readline().decode(), end="") # エコーバック
print(ser.readline().decode(), end="") # OK
scanDuration = 6; # スキャン時間
scanRes = {}      # スキャン結果格納用
# スキャン結果が見つかるまでループ
while "Channel" not in scanRes :
ser_cmd_str="SKSCAN 2 FFFFFFFF " + str(scanDuration) + "\r\n"
ser.write(str.encode(ser_cmd_str))
# イベント22=スキャン終了 が返ってくるまで読み込みを続ける
scanEnd = False
while not scanEnd :
line = ser.readline().decode()
print(line, end="")
if line.startswith("EVENT 22") :
# スキャン終了
scanEnd = True
elif line.startswith("  ") :
# スキャン結果があれば、先頭にスペース2個あけてデータが返ってくるので取得する
# 例
#  Channel:39
#  Channel Page:09
#  Pan ID:FFFF
#  Addr:FFFFFFFFFFFFFFFF
#  LQI:A7
#  PairID:FFFFFFFF
cols = line.strip().split(':')
scanRes[cols[0]] = cols[1]
# スキャン終了しても何も見つからない場合は、スキャン時間を伸ばしてリトライ
scanDuration+=1
if 7 < scanDuration and "Channel" not in scanRes:
# スキャン時間は14まで指定できるが、7でやめておく
print("スキャンリトライオーバー")
sys.exit()  #### 終了 ####
# スキャン結果からChannelを設定。
ser_cmd_str="SKSREG S2 " + scanRes["Channel"] + "\r\n"
ser.write(str.encode(ser_cmd_str))
print(ser.readline().decode(), end="") # エコーバック
print(ser.readline().decode(), end="") # OK
# スキャン結果からPan IDを設定
ser_cmd_str="SKSREG S3 " + scanRes["Pan ID"] + "\r\n"
ser.write(str.encode(ser_cmd_str))
print(ser.readline().decode(), end="") # エコーバック
print(ser.readline().decode(), end="") # OK
# スキャン結果からMACアドレス(64bit)を取得し、IPV6リンクローカルアドレスに変換
ser_cmd_str="SKLL64 " + scanRes["Addr"] + "\r\n"
ser.write(str.encode(ser_cmd_str))
print(ser.readline().decode(), end="") # エコーバック
ipv6Addr = ser.readline().decode().strip()
print(ipv6Addr)
# PANA 接続シーケンスを開始
ser_cmd_str="SKJOIN " + ipv6Addr + "\r\n"
ser.write(str.encode(ser_cmd_str))
print(ser.readline().decode(), end="") # エコーバック
print(ser.readline().decode(), end="") # OK
# PANA 接続完了待ち(イベント25が返ってくるのを待つ)
bConnected = False
while not bConnected :
line = ser.readline().decode()
print(line, end="")
if line.startswith("EVENT 24") :
print("PANA 接続失敗")
sys.exit()  #### 終了 ####
elif line.startswith("EVENT 25") :
# 接続完了!
bConnected = True
# 係数
# 積算電力量計測値 × 係数 × 積算電力量単位 = 実使用量 となる。
def getD3Factor():
global ser # シリアルポート
# ECHONET Lite フレーム作成
echonetLiteFrame = b""
echonetLiteFrame += b"\x10\x81"      # EHD
echonetLiteFrame += b"\x00\x01"      # TID
# ここから EDATA
echonetLiteFrame += b"\x05\xFF\x01"  # SEOJ
echonetLiteFrame += b"\x02\x88\x01"  # DEOJ
echonetLiteFrame += b"\x62"          # ESV(62:プロパティ値読み出し要求)
echonetLiteFrame += b"\x01"          # OPC(1個)
echonetLiteFrame += b"\xD3"          # EPC
echonetLiteFrame += b"\x00"          # PDC
# コマンド送信
ser_cmd_str="SKSENDTO 1 {0} 0E1A 1 {1:04X} ".format(ipv6Addr, len(echonetLiteFrame))
ser_cmd_byte=str.encode(ser_cmd_str) + echonetLiteFrame
ser.write(ser_cmd_byte)
bE5read = False
while not bE5read:
line = ser.readline().decode()         # ERXUDPが来るはず
print(line, end="")
if line.startswith("ERXUDP") :
cols = line.strip().split(' ')
res = cols[8]   # UDP受信データ部分
#tid = res[4:4+4];
seoj = res[8:8+6]
#deoj = res[14,14+6]
ESV = res[20:20+2]
#OPC = res[22,22+2]
if seoj == "028801" and ESV == "72" :
# スマートメーター(028801)から来た応答(72)なら
EPC = res[24:24+2]
if EPC == "D3" :
# 内容が係数(D3)だったら
hexFactor = res[-8:]    # 最後の4バイト(16進数で8文字)が値
intFactor = int(hexFactor, 16)
bE5read = True
return intFactor
# 有効桁数(1~8)
def getD7EffectiveDigit():
global ser # シリアルポート
# ECHONET Lite フレーム作成
echonetLiteFrame = b""
echonetLiteFrame += b"\x10\x81"      # EHD
echonetLiteFrame += b"\x00\x01"      # TID
# ここから EDATA
echonetLiteFrame += b"\x05\xFF\x01"  # SEOJ
echonetLiteFrame += b"\x02\x88\x01"  # DEOJ
echonetLiteFrame += b"\x62"          # ESV(62:プロパティ値読み出し要求)
echonetLiteFrame += b"\x01"          # OPC(1個)
echonetLiteFrame += b"\xD7"          # EPC
echonetLiteFrame += b"\x00"          # PDC
# コマンド送信
ser_cmd_str="SKSENDTO 1 {0} 0E1A 1 {1:04X} ".format(ipv6Addr, len(echonetLiteFrame))
ser_cmd_byte=str.encode(ser_cmd_str) + echonetLiteFrame
ser.write(ser_cmd_byte)
bE5read = False
while not bE5read:
line = ser.readline().decode()         # ERXUDPが来るはず
print(line, end="")
if line.startswith("ERXUDP") :
cols = line.strip().split(' ')
res = cols[8]   # UDP受信データ部分
#tid = res[4:4+4];
seoj = res[8:8+6]
#deoj = res[14,14+6]
ESV = res[20:20+2]
#OPC = res[22,22+2]
if seoj == "028801" and ESV == "72" :
# スマートメーター(028801)から来た応答(72)なら
EPC = res[24:24+2]
if EPC == "D7" :
# 内容が有効桁数(D7)だったら
hexEffectiveDigit = res[-2:]    # 最後の1バイト(16進数で2文字)が値
intEffectiveDigit = int(hexEffectiveDigit, 16)
bE5read = True
return intEffectiveDigit
# 積算電力量単位
# 0x00:1kWh
# 0x01:0.1kWh
# 0x02:0.01kWh
# 0x03:0.001kWh
# 0x04:0.0001kWh
# 0x0A:10kWh
# 0x0B:100kWh
# 0x0C:1000kWh
# 0x0D:10000kWh
def getE1Unit():
global ser # シリアルポート
# ECHONET Lite フレーム作成
echonetLiteFrame = b""
echonetLiteFrame += b"\x10\x81"      # EHD
echonetLiteFrame += b"\x00\x01"      # TID
# ここから EDATA
echonetLiteFrame += b"\x05\xFF\x01"  # SEOJ
echonetLiteFrame += b"\x02\x88\x01"  # DEOJ
echonetLiteFrame += b"\x62"          # ESV(62:プロパティ値読み出し要求)
echonetLiteFrame += b"\x01"          # OPC(1個)
echonetLiteFrame += b"\xE1"          # EPC
echonetLiteFrame += b"\x00"          # PDC
# コマンド送信
ser_cmd_str="SKSENDTO 1 {0} 0E1A 1 {1:04X} ".format(ipv6Addr, len(echonetLiteFrame))
ser_cmd_byte=str.encode(ser_cmd_str) + echonetLiteFrame
ser.write(ser_cmd_byte)
bE5read = False
while not bE5read:
line = ser.readline().decode()         # ERXUDPが来るはず
print(line, end="")
if line.startswith("ERXUDP") :
cols = line.strip().split(' ')
res = cols[8]   # UDP受信データ部分
#tid = res[4:4+4];
seoj = res[8:8+6]
#deoj = res[14,14+6]
ESV = res[20:20+2]
#OPC = res[22,22+2]
if seoj == "028801" and ESV == "72" :
# スマートメーター(028801)から来た応答(72)なら
EPC = res[24:24+2]
if EPC == "E1" :
# 内容が単位(E1)だったら
hexUnit = res[-2:]    # 最後の1バイト(16進数で2文字)が値
bE5read = True
return hexUnit
# 積算履歴収集日1(取得)
# 0:当日 1~99:指定した日数分前
def getE5HistCollectDays1():
global ser # シリアルポート
# ECHONET Lite フレーム作成
echonetLiteFrame = b""
echonetLiteFrame += b"\x10\x81"      # EHD
echonetLiteFrame += b"\x00\x01"      # TID
# ここから EDATA
echonetLiteFrame += b"\x05\xFF\x01"  # SEOJ
echonetLiteFrame += b"\x02\x88\x01"  # DEOJ
echonetLiteFrame += b"\x62"          # ESV(62:プロパティ値読み出し要求)
echonetLiteFrame += b"\x01"          # OPC(1個)
echonetLiteFrame += b"\xE5"          # EPC
echonetLiteFrame += b"\x00"          # PDC
# コマンド送信
ser_cmd_str="SKSENDTO 1 {0} 0E1A 1 {1:04X} ".format(ipv6Addr, len(echonetLiteFrame))
ser_cmd_byte=str.encode(ser_cmd_str) + echonetLiteFrame
ser.write(ser_cmd_byte)
bE5read = False
while not bE5read:
line = ser.readline().decode()         # ERXUDPが来るはず
print(line, end="")
if line.startswith("ERXUDP") :
cols = line.strip().split(' ')
res = cols[8]   # UDP受信データ部分
#tid = res[4:4+4];
seoj = res[8:8+6]
#deoj = res[14,14+6]
ESV = res[20:20+2]
#OPC = res[22,22+2]
if seoj == "028801" and ESV == "72" :
# スマートメーター(028801)から来た応答(72)なら
EPC = res[24:24+2]
if EPC == "E5" :
# 内容が積算履歴収集日(E5)だったら
hexHistCollectDays = res[-2:]    # 最後の1バイト(16進数で2文字)が値
intHistCollectDays = int(hexHistCollectDays, 16)
bE5read = True
return intHistCollectDays
# 積算履歴収集日1(設定)
# 0:当日 1~99:指定した日数分前
def setE5HistCollectDays1(days_before):
global ser # シリアルポート
# ECHONET Lite フレーム作成
echonetLiteFrame = b""
echonetLiteFrame += b"\x10\x81"      # EHD
echonetLiteFrame += b"\x00\x01"      # TID
# ここから EDATA
echonetLiteFrame += b"\x05\xFF\x01"  # SEOJ
echonetLiteFrame += b"\x02\x88\x01"  # DEOJ
echonetLiteFrame += b"\x61"          # ESV(61:プロパティ値書き込み要求)
echonetLiteFrame += b"\x01"          # OPC(1個)
echonetLiteFrame += b"\xE5"          # EPC
echonetLiteFrame += b"\x01"          # PDC
echonetLiteFrame += days_before.to_bytes(1,'big') # EDT(days_before 日前のデータを取得)(参考:EL p.3-9)
# コマンド送信
ser_cmd_str="SKSENDTO 1 {0} 0E1A 1 {1:04X} ".format(ipv6Addr, len(echonetLiteFrame))
ser_cmd_byte=str.encode(ser_cmd_str) + echonetLiteFrame
ser.write(ser_cmd_byte)
# 積算電力量計測値履歴1
# 積算履歴収集日1の値、および
# 該当日の24時間48コマ分(0時0分~23時30分)の正方向の定時積算電力量計測値の履歴データを
# 時系列順に示す。
def getE2EnergyHist1():
global ser # シリアルポート
# ECHONET Lite フレーム作成
echonetLiteFrame = b""
echonetLiteFrame += b"\x10\x81"      # EHD
echonetLiteFrame += b"\x00\x01"      # TID
# ここから EDATA
echonetLiteFrame += b"\x05\xFF\x01"  # SEOJ
echonetLiteFrame += b"\x02\x88\x01"  # DEOJ
echonetLiteFrame += b"\x62"          # ESV(62:プロパティ値読み出し要求)
echonetLiteFrame += b"\x01"          # OPC(1個)
echonetLiteFrame += b"\xE2"          # EPC
echonetLiteFrame += b"\x00"          # PDC
# コマンド送信
ser_cmd_str="SKSENDTO 1 {0} 0E1A 1 {1:04X} ".format(ipv6Addr, len(echonetLiteFrame))
ser_cmd_byte=str.encode(ser_cmd_str) + echonetLiteFrame
ser.write(ser_cmd_byte)
bE2read = False
while not bE2read:
line = ser.readline().decode()         # ERXUDPが来るはず
print(line, end="")
if line.startswith("ERXUDP") :
cols = line.strip().split(' ')
res = cols[8]   # UDP受信データ部分
#tid = res[4:4+4];
seoj = res[8:8+6]
#deoj = res[14,14+6]
ESV = res[20:20+2]
#OPC = res[22,22+2]
if seoj == "028801" and ESV == "72" :
# スマートメーター(028801)から来た応答(72)なら
EPC = res[24:24+2]
if EPC == "E2" :
# 内容がE2だったら
# 積算履歴収集日1
days_before = str(int(res[30:30+2]))
print("積算履歴収集日1:{0}".format(days_before))
# 積算電力
offset = 8
len_data= offset*48
line = res[32:32+len_data]
flg = True
cnt = 0
body = ""
while flg:
start = cnt*offset
intPower = str(int(line[start:start+offset],16))
body = body +  intPower + ","
cnt += 1
if 47 < cnt :
flg = False
print("積算電力量計測値履歴")
print(body)
bE2read = True
########## 認証情報定義 ##########
# Bルート認証ID
rbid  = "0123456789ABCDEFGHIJKLMNOPQRSTUV"
# Bルート認証パスワード
rbpwd = "0123456789AB"
########## デバイス定義 ##########
# シリアルポートデバイス名
serialPortDev = '/dev/ttyUSB0'
########## 処理スタート ##########
# シリアルポート初期化
ser = serial.Serial(serialPortDev, 115200)
ser_cmd_str="SKRESET\r\n"
ser.write(str.encode(ser_cmd_str))
print(ser.readline().decode(), end="") # エコーバック
print(ser.readline().decode(), end="") # OK
ser_cmd_str="SKINFO\r\n"
ser.write(str.encode(ser_cmd_str))
print(ser.readline().decode(), end="") # エコーバック
print(ser.readline().decode(), end="") # バージョン
########## スマメとの接続処理 ##########
ipv6Addr = "" # スマートメータのIPv6アドレス
connect()
# これ以降、シリアル通信のタイムアウトを設定
ser.timeout = 2
# スマートメーターがインスタンスリスト通知を投げてくる
# (ECHONET-Lite_Ver.1.12_02.pdf p.4-16)
print(ser.readline(), end="") #無視
########## 各種設定値の取得 ##########
RC = getD3Factor()
print("係数:{0}".format(RC))
RC = getD7EffectiveDigit()
print("有効桁数:{0}".format(RC))
RC = getE1Unit()
print("単位:{0}".format(RC))
########## E5(積算履歴収集日1)設定処理 ##########
setE5HistCollectDays1(1)
########## E5(積算履歴収集日1)取得処理 ##########
RC = getE5HistCollectDays1()
print("積算履歴収集日1:{0}".format(RC))
########## E2(積算電力量計測値履歴)取得処理 ##########
getE2EnergyHist1()
ser.close()

実行例

$ python3 get_power_history.py
SKRESET
OK
(中略)
EVENT 21 FE80:0000:0000:0000:XXXX:XXXX:XXXX:XXXX 00
OK
ERXUDP FE80:0000:0000:0000:XXXX:XXXX:XXXX:XXXX FE80:0000:0000:0000:YYYY:YYYY:YYYY:YYYY 0E1A 0E1A A06610FFFED325BD 1 0012 1081000102880105FF017201D30400000001
係数:1
SKSENDTO 1 FE80:0000:0000:0000:XXXX:XXXX:XXXX:XXXX 0E1A 1 000E
EVENT 21 FE80:0000:0000:0000:XXXX:XXXX:XXXX:XXXX 01
OK
ERXUDP FE80:0000:0000:0000:XXXX:XXXX:XXXX:XXXX FE80:0000:0000:0000:YYYY:YYYY:YYYY:YYYY 0E1A 0E1A A06610FFFED325BD 1 000F 1081000102880105FF017201D70106
有効桁数:6
SKSENDTO 1 FE80:0000:0000:0000:XXXX:XXXX:XXXX:XXXX 0E1A 1 000E
EVENT 21 FE80:0000:0000:0000:XXXX:XXXX:XXXX:XXXX 01
OK
ERXUDP FE80:0000:0000:0000:XXXX:XXXX:XXXX:XXXX FE80:0000:0000:0000:YYYY:YYYY:YYYY:YYYY 0E1A 0E1A A06610FFFED325BD 1 000F 1081000102880105FF017201E10101
単位:01
SKSENDTO 1 FE80:0000:0000:0000:XXXX:XXXX:XXXX:XXXX 0E1A 1 000F
EVENT 21 FE80:0000:0000:0000:XXXX:XXXX:XXXX:XXXX 01
OK
SKSENDTO 1 FE80:0000:0000:0000:XXXX:XXXX:XXXX:XXXX 0E1A 1 000E
EVENT 21 FE80:0000:0000:0000:XXXX:XXXX:XXXX:XXXX 00
OK
ERXUDP FE80:0000:0000:0000:XXXX:XXXX:XXXX:XXXX FE80:0000:0000:0000:YYYY:YYYY:YYYY:YYYY 0E1A 0E1A A06610FFFED325BD 1 000E 1081000102880105FF017101E500
ERXUDP FE80:0000:0000:0000:XXXX:XXXX:XXXX:XXXX FE80:0000:0000:0000:YYYY:YYYY:YYYY:YYYY 0E1A 0E1A A06610FFFED325BD 1 000F 1081000102880105FF017201E50101
積算履歴収集日1:1
SKSENDTO 1 FE80:0000:0000:0000:XXXX:XXXX:XXXX:XXXX 0E1A 1 000E
EVENT 21 FE80:0000:0000:0000:XXXX:XXXX:XXXX:XXXX 01
OK
ERXUDP FE80:0000:0000:0000:XXXX:XXXX:XXXX:XXXX FE80:0000:0000:0000:YYYY:YYYY:YYYY:YYYY 0E1A 0E1A A06610FFFED325BD 1 00D0 1081000102880105FF017201E2C200010003012D0003013600030144000301530003015F0003016B00030177000301840003018E00030199000301A1000301A4000301A6000301B5000301BD000301C5000301CE000301D2000301D4000301D5000301D7000301D8000301D9000301DA000301DC000301DD000301DE000301E0000301E1000301E2000301E3000301E5000301E6000301E7000301E8000301EA000301EB000301EC000301F5000302040003020D000302190003022A0003023E0003024B000302540003025B00030263
積算履歴収集日1:1
積算電力量計測値履歴
196909,196918,196932,196947,196959,196971,196983,196996,197006,197017,197025,197028,197030,197045,197053,197061,197070,197074,197076,197077,197079,197080,197081,197082,197084,197085,197086,197088,197089,197090,197091,197093,197094,197095,197096,197098,197099,197100,197109,197124,197133,197145,197162,197182,197195,197204,197211,197219,

無事、スマートメーターの各種設定値と、1日前の積算履歴(毎時00分、30分の積算電力量)が取得できました。
setE5HistCollectDays1(1) の引数を変更すれば、当日から99日前まで取得できます。

今後

取得した値を別のアプリに喰わせてグラフ化などしたい。

参考にさせて頂いたサイト

qiita.com

mina2.sama.to

echonet.jp

コメント

タイトルとURLをコピーしました