306.ラズパイ無双[30 OPC-UA asyncua]
初回:2023/4/5
Raspberry Pi (ラズベリーパイ、通称"ラズパイ")で何か作ってみようという新シリーズです。これから数回に分けて、ラズパイ上にOPC-UAサーバーと、クライアントを入れて、通信させるところまで行ってみたいと思います。今回は、非同期サーバーとクライアントの基本サンプルを少し改造して動作を見てみたいと思います。
P子「非同期サーバーって、難しいよね」※1
実際、まだよくわからないまま、サンプルを動かしているので、間違いがあればご指摘ください。
【目次】
1.async_server.py
まずは、非同期サーバーから見てみましょう。
≪参考1≫
https://devnote.tech/ja/2022/12/how-to-subscribe-data-change-on-variable-nodes-of-opc-ua-server-ja/#toc2
OPC UA サーバー Variable ノードのデータ変更をサブスクライブする方法
参考1 は、前回も示しました、非同期サーバーのオリジナルです。これを少し改造しています。
import asyncio from asyncua import Server from subprocess import getoutput # データ作成用に、CPU温度、クロック等を収集します。 ENDPOINT = 'opc.tcp://0.0.0.0:4840' NAMESPACE = 'http://examples.freeopcua.github.io' async def main() -> None: # Start a server. server = Server() await server.init() server.set_endpoint(ENDPOINT) idx = await server.register_namespace(NAMESPACE) # Create a node. myobj = await server.get_objects_node().add_object(idx, 'MyObject') myvar = await myobj.add_variable(idx, 'MyVariable', 0) await myvar.set_writable() # 書き込み許可 mycount = await myobj.add_variable(idx, "MyCount" , 0) myclock = await myobj.add_variable(idx, "MyClock" , "") mythermal = await myobj.add_variable(idx, "MyThermal" , "") # server.start を、後ろに持ってくる。 await server.start() print(f'Server started: {server}') MHZ = 1000*1000 count = 0 while True: count += 1 await mycount.write_value(count) # データ変更のイベント用に、各値の精度を削る。 # CPUクロックを取得する clock = getoutput('vcgencmd measure_clock arm') # frequency(48)=1500345728 clk = clock.split('=') # '=' で前後に分割 clk[1] = round( float(clk[1])/MHZ ) # MHZ に丸める clock = f'{clk[0]}={clk[1]} MHz' await myclock.write_value( clock ) # CPU温度を取得する # therm = getoutput('vcgencmd measure_temp') # temp=62.3'C temp = getoutput('cat /sys/class/thermal/thermal_zone0/temp') # 1000倍されている値だけの取得 temp = round( float(temp)/1000.0 ) # 小数点を丸める therm = f'temp={temp} ℃' await mythermal.write_value( therm ) # クライアント側で書き込まれた値を取り出す。 value = await myvar.read_value() # カーソル行をクリアしてカーソルを先頭に戻し、その場に表示する。 print(f'\033[2K\r{count} {clock} {therm} {value}',end='') await asyncio.sleep(5) # 判りやすく、5秒にする if __name__ == '__main__': asyncio.run(main())
サーバー側では、MyVariable にクライアントからの書き込みを受信するのと、MyCount、MyClock、MyThermal を送信しています。
2.async_client.py
≪参考2≫
https://devnote.tech/ja/2022/12/how-to-subscribe-data-change-on-variable-nodes-of-opc-ua-server-ja/#toc3
OPC UA サーバー Variable ノードのデータ変更をサブスクライブする方法
非同期クライアントも、同様に改造しています。
import asyncio from asyncua import Client, Node from asyncua.common.subscription import DataChangeNotif, SubHandler ENDPOINT = 'opc.tcp://XXX.XXX.XXX.XXX:4840' NAMESPACE = 'http://examples.freeopcua.github.io' class MyHandler(SubHandler): def __init__(self): self._queue = asyncio.Queue() def datachange_notification(self, node: Node, value, data: DataChangeNotif) -> None: self._queue.put_nowait([node, value, data]) # print(f'Data change notification was received and queued.') async def process(self) -> None: try: while True: # Get data in a queue. [node, value, data] = self._queue.get_nowait() pnode = await node.get_parent() for nd in await pnode.get_children() : val = await nd.get_value() key = await nd.get_path(as_string=True) # 一番最後の値 name = await nd.read_display_name() # idx の無いキー値 print( f'{nd} {key[-1]} {val} {name.Text}' ) val = value path = await node.get_path(as_string=True) key = path[3] name = await node.read_display_name() # *** Write your processing code *** # print(f'New value {value} of "{path}" was processed.') print( f'{node} {key} {val} {name.Text}' ) except asyncio.QueueEmpty: pass async def main() -> None: async with Client(url=ENDPOINT) as client: # Get a variable node. idx = await client.get_namespace_index(NAMESPACE) objnode = client.get_objects_node() # node = await client.get_objects_node().get_child([f'{idx}:MyObject', f'{idx}:MyVariable']) node0 = await objnode.get_child([f'{idx}:MyObject', f'{idx}:MyVariable']) # node1 = await objnode.get_child([f'{idx}:MyObject', f'{idx}:MyCount']) node2 = await objnode.get_child([f'{idx}:MyObject', f'{idx}:MyClock']) # node3 = await objnode.get_child([f'{idx}:MyObject', f'{idx}:MyThermal']) # Subscribe data change. handler = MyHandler() subsc = await client.create_subscription(period=0, handler=handler) await subsc.subscribe_data_change(node2) # await subsc.subscribe_data_change(node3) # Process data change every 100ms value = 0 while True: value += 1 await node0.write_value(value) await handler.process() await asyncio.sleep(0.1) if __name__ == '__main__': asyncio.run(main())
クライアント側は、SubHandler を継承して、datachange_notification で変化時にデータ取得しています。
3.まとめ
とりあえず、非同期サーバーもクライアントも前回の通常版とも通信できます。そのあたりが、通信方式が標準化されている最大のメリットではないかと思います。
P子「最近、解説も手抜きしてない?」
そんなに説明するほどのコードでもないし、やはり自分で動かして、自分で色々と改造してみるのが一番理解が進むと思いますので、皆さんも一度動かしてみてはいかがでしょうか?
次回は、非同期イベントサーバーとクライアントを取り上げてみたいと思います。
ほな、さいなら
======= <<注釈>>=======
※1 P子「非同期サーバーって、難しいよね」
P子とは、私があこがれているツンデレPythonの仮想女性の心の声です。