On this page
Custom Pub/Sub Service
examples/trade_middleware_example/custom_pubsub_service_example.py
import threading
import time
from aitrados_api.trade_middleware.publisher import async_publisher_instance
"""
at first ,run "python run_trade_middleware_example.py"
"""
test_sub_topic="on_my_first_sub_topic"
def publish_service_example():
i = 0
while True:
msg = f"Hello {i} {time.time()}".encode()
async_publisher_instance.send_topic(test_sub_topic,msg)
i += 1
time.sleep(2)
def subscriber_client_example():
from aitrados_api.trade_middleware.subscriber import AsyncSubscriber
class MyAsyncSubscriber(AsyncSubscriber):
"""
Asynchronous function callback
"""
async def on_my_first_sub_topic(self, msg):
# my_first_sub_topic is from custom_pub_service_example.py
print(test_sub_topic, msg)
pass
subscriber = MyAsyncSubscriber()
subscriber.run()
subscriber.subscribe_topics(test_sub_topic)
if __name__ == "__main__":
threading.Thread(target=publish_service_example, daemon=True).start()
subscriber_client_example()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("close...")
examples/trade_middleware_example/custom_identity_example.py
from aitrados_api.trade_middleware.identity_mixin import *
class RpcFunction(RpcFunctionMixin):
LAST_OHLC_PRICE_ROW = "last_ohlc_price_row"
FIRST_OHLC_PRICE_ROW = "first_ohlc_price_row"
class Channel(ChannelMixin):
MY_TEST_SUB = b"my_test_sub"
MY_SECOND_SUB = b"my_second_sub"
class Identity(IdentityMixin):
backend_identity = "my_first_package"
fun = RpcFunction
channel = Channel
my_custom_identity_example=Identity