308.ラズパイ無双[31 OPC-UA events]
初回:2023/4/19
Raspberry Pi (ラズベリーパイ、通称"ラズパイ")で何か作ってみようという新シリーズです。これから数回に分けて、ラズパイ上にOPC-UAサーバーと、クライアントを入れて、通信させるところまで行ってみたいと思います。今回は、非同期サーバーとクライアントにおける、イベント系のサンプルを少し改造して動作を見てみたいと思います。そして、今回をもって、Pythonによる OPC-UA関係の記事は、終了となります。
P子「案外続いたわね」※1
もっと、さらっと終わらすつもりでしたが、結構色々な手法があることが判り、私も困惑しながら書いていました。どこかで整理して、一番良い形を見つけてご紹介したいと思います。
P子「期待してないで、待ってるわ」
【目次】
1.events_server.py
まずは、元記事のご紹介です。
≪参考1≫
https://github.com/FreeOpcUa/opcua-asyncio/blob/master/examples/server-events.py
以下に解説するソースは、参考1 を少しだけ修正したものです。元記事は、server-events.py というファイル名ですが、改造版ソースは、events_server.py として、特徴名を先に持ってきています。別にどちらでも問題ないでしょう。
import asyncio from asyncua import ua from asyncua.server import Server, EventGenerator ENDPOINT = 'opc.tcp://0.0.0.0:4840/freeopcua/server/' NAMESPACE = 'http://examples.freeopcua.github.io' async def main(): server = Server() await server.init() server.set_endpoint(ENDPOINT) idx = await server.register_namespace(NAMESPACE) # populating our address space myobj = await server.nodes.objects.add_object(idx, "MyObject") # Creating a custom event: Approach 1 # The custom event object automatically will have members from its parent (BaseEventType) etype = await server.create_custom_event_type( idx, 'MyFirstEvent', ua.ObjectIds.BaseEventType, [('MyNumericProperty', ua.VariantType.Float), ('MyStringProperty', ua.VariantType.String)] ) myevgen = await server.get_event_generator(etype, myobj) # Creating a custom event: Approach 2 custom_etype = await server.nodes.base_event_type.add_object_type(2, 'MySecondEvent') await custom_etype.add_property(2, 'MyIntProperty', ua.Variant(0, ua.VariantType.Int32)) await custom_etype.add_property(2, 'MyBoolProperty', ua.Variant(True, ua.VariantType.Boolean)) mysecondevgen = await server.get_event_generator(custom_etype, myobj) async with server: count = 0 while True: await asyncio.sleep(1) myevgen.event.Message = ua.LocalizedText("MyFirstEvent %d" % count) myevgen.event.Severity = count myevgen.event.MyNumericProperty = count myevgen.event.MyStringProperty = "Property %d" % count await myevgen.trigger() await mysecondevgen.trigger(message="MySecondEvent %d" % count) count += 1 if __name__ == "__main__": asyncio.run(main())
使用パッケージは、asyncua と asyncio ですが、実態のServer が async_server.py と、events_server.py では異なります。
2.events_client.py
≪参考2≫
https://github.com/FreeOpcUa/opcua-asyncio/blob/master/examples/client-events.py
import asyncio from asyncua import Client class SubHandler: def event_notification(self, event): print(str(event.EventId)) print(str(event.EventType)) print(str(event.LocalTime)) print(event.Message.Text) print(event.MyNumericProperty) print(event.MyStringProperty) print(event.ReceiveTime) print(event.Severity) print(event.SourceName) print(str(event.SourceNode)) print(event.Time) print('-------------------------------------') return ENDPOINT = 'opc.tcp://XXX.XXX.XXX.XXX:4840/freeopcua/server/' async def main(): async with Client(url=ENDPOINT) as client: # Client has a few methods to get proxy to UA nodes that should always be in address space such as Root or Objects print("Objects node is: ", client.nodes.root) # Now getting a variable node using its browse path obj = await client.nodes.root.get_child(["0:Objects", "2:MyObject"]) print("MyObject is: ", obj) myevent = await client.nodes.root.get_child(["0:Types", "0:EventTypes", "0:BaseEventType", "2:MyFirstEvent"]) print("MyFirstEventType is: ", myevent) msclt = SubHandler() sub = await client.create_subscription(period=100, handler=msclt) handle = await sub.subscribe_events(obj, myevent) await asyncio.sleep(10) await sub.unsubscribe(handle) await sub.delete() if __name__ == "__main__": asyncio.run(main())
クライアントのイベント版のサンプルは、前回の非同期クライアントと同様に、ハンドラーを使用しています。
3.まとめ
python版 OPC-UA関係のコラムは、今回で終了です。これらを応用して色々と試していきますが、まだ、実装が終わっていません。さらに言うと、これらのテスト環境ではなく、本物の OPC-UAサーバーとの通信を行いたいと思っていますが、現状、そのようなサーバーを持っていませんので、またの機会に評価してみたいと思います。
P子「まあ、サンプルを少し修正するくらいじゃわからないものね」
ラズパイ無双も、テーマが尽きてきましたので、ちょっと考えます。
ほな、さいなら
======= <<注釈>>=======
※1 P子「案外続いたわね」
P子とは、私があこがれているツンデレPythonの仮想女性の心の声です。