On this page
Custom RPC Backend Service
examples/trade_middleware_example/custom_rpc_backend_service_example.py
import threading
import time
from aitrados_api.common_lib.common import run_asynchronous_function
from aitrados_api.common_lib.response_format import ErrorResponse, UnifiedResponse
from aitrados_api.trade_middleware.backend_service import BackendService
from aitrados_api.trade_middleware.request import FrontendRequest
from examples.trade_middleware_example.custom_identity_example import my_custom_identity_example
from aitrados_api.trade_middleware.response import AsyncBackendResponse
"""
at first ,run "python run_trade_middleware_example.py"
"""
class CustomRpcBackendService(BackendService):
IDENTITY=my_custom_identity_example
def __init__(self):
super().__init__()
def last_ohlc_price_row(self,*args,**kwargs):
# you can get other rpc data
"""
from aitrados_api.common_lib.contant import SchemaAsset, IntervalName
from aitrados_api.trade_middleware.request import FrontendRequest,AsyncFrontendRequest
from aitrados_api.trade_middleware_service.trade_middleware_identity import aitrados_api_identity
params = {
"schema_asset": SchemaAsset.CRYPTO,
"country_symbol": "GLOBAL:BTCUSD",
"interval": IntervalName.M60,
"limit": 1
}
ohlcs = FrontendRequest.call_sync(
aitrados_api_identity.backend_identity,
aitrados_api_identity.fun.OHLCS,
**params,
)
"""
return UnifiedResponse(result="price is 456").model_dump_json()
def first_ohlc_price_row(self,*args,**kwargs):
return UnifiedResponse(result="price is 789").model_dump_json()
def accept(self,function_name:str,*args,**kwargs):
if function_name==my_custom_identity_example.fun.FIRST_OHLC_PRICE_ROW.value:
return self.first_ohlc_price_row(*args,**kwargs)
if function_name==my_custom_identity_example.fun.LAST_OHLC_PRICE_ROW.value:
return self.last_ohlc_price_row(*args,**kwargs)
else:
return ErrorResponse(message=F"Unknown request '{function_name}'").model_dump_json()
async def a_accept(self,function_name:str,*args,**kwargs):
if function_name not in my_custom_identity_example.fun.get_array():
return ErrorResponse(message=F"Unknown request '{function_name}'").model_dump_json()
#to sync function.If the concurrency is high, we recommend using asynchronous functions
return self.accept(function_name,*args,**kwargs)
def run_service():
service = AsyncBackendResponse(CustomRpcBackendService())
run_asynchronous_function(service.init())
def test_request():
print("test req")
result = FrontendRequest.call_sync(
CustomRpcBackendService.IDENTITY.backend_identity,
CustomRpcBackendService.IDENTITY.fun.LAST_OHLC_PRICE_ROW,
full_symbol="CRYPTO:GLOBAL:BTCUSD",
timeout=30 # 10s
)
print(result)
if __name__ == "__main__":
#run rpc backend
router_thread = threading.Thread(target=run_service, daemon = True).start()
time.sleep(5)
#request
test_request()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("closing...")
examples/trade_middleware_example/custom_identity_example.py
from aitrados_api.trade_middleware.identity_mixin import *
class RpcFunction(RpcFunctionMixin):
LAST_OHLC_PRICE_ROW = "last_ohlc_price_row"
FIRST_OHLC_PRICE_ROW = "first_ohlc_price_row"
class Channel(ChannelMixin):
MY_TEST_SUB = b"my_test_sub"
MY_SECOND_SUB = b"my_second_sub"
class Identity(IdentityMixin):
backend_identity = "my_first_package"
fun = RpcFunction
channel = Channel
my_custom_identity_example=Identity