Source code for vertex_protocol.engine_client.execute

from copy import deepcopy
import time
import requests
from functools import singledispatchmethod

from typing import Optional, Type, Union
from eth_account.signers.local import LocalAccount
from vertex_protocol.contracts.eip712.sign import (
    get_eip712_typed_data_digest,
    sign_eip712_typed_data,
    build_eip712_typed_data,
)
from vertex_protocol.engine_client.query import EngineQueryClient
from vertex_protocol.engine_client.types import (
    EngineClientOpts,
)
from vertex_protocol.engine_client.types.execute import (
    BaseParams,
    BurnLpParams,
    CancelAndPlaceParams,
    CancelOrdersParams,
    CancelProductOrdersParams,
    ExecuteParams,
    ExecuteRequest,
    ExecuteResponse,
    LinkSignerParams,
    LiquidateSubaccountParams,
    MintLpParams,
    OrderParams,
    PlaceMarketOrderParams,
    PlaceOrderParams,
    WithdrawCollateralParams,
    to_execute_request,
)
from vertex_protocol.contracts.types import VertexExecuteType
from vertex_protocol.engine_client.types.models import MarketLiquidity
from vertex_protocol.utils.bytes32 import subaccount_to_hex

from vertex_protocol.utils.exceptions import (
    BadStatusCodeException,
    ExecuteFailedException,
)
from vertex_protocol.utils.expiration import OrderType, get_expiration_timestamp
from vertex_protocol.utils.math import mul_x18, round_x18, to_x18
from vertex_protocol.utils.model import VertexBaseModel, is_instance_of_union
from vertex_protocol.utils.nonce import gen_order_nonce
from vertex_protocol.utils.subaccount import Subaccount, SubaccountParams


[docs]class EngineExecuteClient: """ Client class for executing operations against the off-chain engine. """
[docs] def __init__( self, opts: EngineClientOpts, querier: Optional[EngineQueryClient] = None ): """ Initialize the EngineExecuteClient with provided options. Args: opts (EngineClientOpts): Options for the client. querier (EngineQueryClient, optional): An EngineQueryClient instance. If not provided, a new one is created. """ self._querier = querier or EngineQueryClient(opts) self._opts: EngineClientOpts = EngineClientOpts.parse_obj(opts) self.url: str = self._opts.url self.session = requests.Session()
def _inject_owner_if_needed(self, params: Type[BaseParams]) -> Type[BaseParams]: """ Inject the owner if needed. Args: params (Type[BaseParams]): The parameters. Returns: Type[BaseParams]: The parameters with the owner injected if needed. """ if isinstance(params.sender, SubaccountParams): params.sender.subaccount_owner = ( params.sender.subaccount_owner or self.signer.address ) params.sender = params.serialize_sender(params.sender) return params def _inject_nonce_if_needed( self, params: Type[BaseParams], use_order_nonce: bool ) -> Type[BaseParams]: """ Inject the nonce if needed. Args: params (Type[BaseParams]): The parameters. Returns: Type[BaseParams]: The parameters with the nonce injected if needed. """ if params.nonce is not None: return params params.nonce = ( self.order_nonce() if use_order_nonce else self.tx_nonce(subaccount_to_hex(params.sender)) ) return params
[docs] def tx_nonce(self, sender: str) -> int: """ Get the transaction nonce. Used to perform executes such as `withdraw_collateral`. Returns: int: The transaction nonce. """ return int(self._querier.get_nonces(sender[:42]).tx_nonce)
[docs] def order_nonce(self, recv_time_ms: Optional[int] = None) -> int: """ Generate the order nonce. Used for oder placements and cancellations. Args: recv_time_ms (int, optional): Received time in milliseconds. Returns: int: The generated order nonce. """ return gen_order_nonce(recv_time_ms)
[docs] def prepare_execute_params(self, params, use_order_nonce: bool): """ Prepares the parameters for execution by ensuring that both owner and nonce are correctly set. Args: params (Type[BaseParams]): The original parameters. Returns: Type[BaseParams]: A copy of the original parameters with owner and nonce injected if needed. """ params = deepcopy(params) params = self._inject_owner_if_needed(params) params = self._inject_nonce_if_needed(params, use_order_nonce) return params
[docs] @singledispatchmethod def execute(self, params: Union[ExecuteParams, ExecuteRequest]) -> ExecuteResponse: """ Executes the operation defined by the provided parameters. Args: params (ExecuteParams): The parameters for the operation to execute. This can represent a variety of operations, such as placing orders, cancelling orders, and more. Returns: ExecuteResponse: The response from the executed operation. """ req: ExecuteRequest = ( params if is_instance_of_union(params, ExecuteRequest) else to_execute_request(params) # type: ignore ) return self._execute(req)
@execute.register def _(self, req: dict) -> ExecuteResponse: """ Overloaded method to execute the operation defined by the provided request. Args: req (dict): The request data for the operation to execute. Can be a dictionary or an instance of ExecuteRequest. Returns: ExecuteResponse: The response from the executed operation. """ parsed_req: ExecuteRequest = VertexBaseModel.parse_obj(req) # type: ignore return self._execute(parsed_req) def _execute(self, req: ExecuteRequest) -> ExecuteResponse: """ Internal method to execute the operation. Sends request to the server. Args: req (ExecuteRequest): The request data for the operation to execute. Returns: ExecuteResponse: The response from the executed operation. Raises: BadStatusCodeException: If the server response status code is not 200. ExecuteFailedException: If there's an error in the execution or the response status is not "success". """ res = self.session.post(f"{self.url}/execute", json=req.dict()) if res.status_code != 200: raise BadStatusCodeException(res.text) try: execute_res = ExecuteResponse(**res.json(), req=req.dict()) except Exception: raise ExecuteFailedException(res.text) if execute_res.status != "success": raise ExecuteFailedException(res.text) return execute_res @property def endpoint_addr(self) -> str: if self._opts.endpoint_addr is None: raise AttributeError("Endpoint address not set.") return self._opts.endpoint_addr @endpoint_addr.setter def endpoint_addr(self, addr: str) -> None: self._opts.endpoint_addr = addr @property def book_addrs(self) -> list[str]: if self._opts.book_addrs is None: raise AttributeError("Book addresses are not set.") return self._opts.book_addrs @book_addrs.setter def book_addrs(self, book_addrs: list[str]) -> None: self._opts.book_addrs = book_addrs @property def chain_id(self) -> int: if self._opts.chain_id is None: raise AttributeError("Chain ID is not set.") return self._opts.chain_id @chain_id.setter def chain_id(self, chain_id: Union[int, str]) -> None: self._opts.chain_id = int(chain_id) @property def signer(self) -> LocalAccount: if self._opts.signer is None: raise AttributeError("Signer is not set.") assert isinstance(self._opts.signer, LocalAccount) return self._opts.signer @signer.setter def signer(self, signer: LocalAccount) -> None: self._opts.signer = signer @property def linked_signer(self) -> LocalAccount: if self._opts.linked_signer is not None: assert isinstance(self._opts.linked_signer, LocalAccount) return self._opts.linked_signer if self._opts.signer is not None: assert isinstance(self._opts.signer, LocalAccount) return self.signer raise AttributeError("Signer is not set.") @linked_signer.setter def linked_signer(self, linked_signer: LocalAccount) -> None: if self._opts.signer is None: raise AttributeError( "Must set a `signer` first before setting `linked_signer`." ) self._opts.linked_signer = linked_signer
[docs] def book_addr(self, product_id: int) -> str: """ Retrieves the book address corresponding to the provided product ID. Needed for signing order placement executes for different products. Args: product_id (int): The ID of the product. Returns: str: The book address associated with the given product ID. Raises: ValueError: If the provided product_id is greater than or equal to the number of book addresses available. """ if product_id >= len(self.book_addrs): raise ValueError(f"Invalid product_id {product_id} provided.") return self.book_addrs[product_id]
[docs] def get_order_digest(self, order: OrderParams, product_id: int) -> str: """ Generates the order digest for a given order and product ID. Args: order (OrderParams): The order parameters. product_id (int): The ID of the product. Returns: str: The generated order digest. """ return self.build_digest( VertexExecuteType.PLACE_ORDER, order.dict(), self.book_addr(product_id), self.chain_id, )
def _sign( self, execute: VertexExecuteType, msg: dict, product_id: Optional[int] = None ) -> str: """ Internal method to create an EIP-712 signature for the given operation type and message. Args: execute (VertexExecuteType): The Vertex execute type to sign. msg (dict): The message to be signed. product_id (int, optional): Required for 'PLACE_ORDER' operation, specifying the product ID. Returns: str: The generated EIP-712 signature. Raises: ValueError: If the operation type is 'PLACE_ORDER' and no product_id is provided. Notes: The contract used for verification varies based on the operation type: - For 'PLACE_ORDER', it's derived from the book address associated with the product_id. - For other operations, it's the endpoint address. """ is_place_order = execute == VertexExecuteType.PLACE_ORDER if is_place_order and product_id is None: raise ValueError("Missing `product_id` to sign place_order execute") verifying_contract = ( self.book_addr(product_id) if is_place_order and product_id else self.endpoint_addr ) return self.sign( execute, msg, verifying_contract, self.chain_id, self.linked_signer )
[docs] def build_digest( self, execute: VertexExecuteType, msg: dict, verifying_contract: str, chain_id: int, ) -> str: """ Build an EIP-712 compliant digest from given parameters. Must provide the same input to build an EIP-712 typed data as the one provided for signing via `.sign(...)` Args: execute (VertexExecuteType): The Vertex execute type to build digest for. msg (dict): The EIP712 message. verifying_contract (str): The contract used for verification. chain_id (int): The network chain ID. Returns: str: The digest computed from the provided parameters. """ return get_eip712_typed_data_digest( build_eip712_typed_data(execute, msg, verifying_contract, chain_id) )
def _assert_book_not_empty( self, bids: list[MarketLiquidity], asks: list[MarketLiquidity], is_bid: bool ): book_is_empty = (is_bid and len(bids) == 0) or (not is_bid and len(asks) == 0) if book_is_empty: raise Exception("Orderbook is empty.")
[docs] def sign( self, execute: VertexExecuteType, msg: dict, verifying_contract: str, chain_id: int, signer: LocalAccount, ) -> str: """ Signs the EIP-712 typed data using the provided signer account. Args: execute (VertexExecuteType): The type of operation. msg (dict): The message to be signed. verifying_contract (str): The contract used for verification. chain_id (int): The network chain ID. signer (LocalAccount): The account used to sign the data. Returns: str: The generated EIP-712 signature. """ return sign_eip712_typed_data( typed_data=build_eip712_typed_data( execute, msg, verifying_contract, chain_id ), signer=signer, )
[docs] def place_order(self, params: PlaceOrderParams) -> ExecuteResponse: """ Execute a place order operation. Args: params (PlaceOrderParams): Parameters required for placing an order. The parameters include the order details and the product_id. Returns: ExecuteResponse: Response of the execution, including status and potential error message. """ params = PlaceOrderParams.parse_obj(params) params.order = self.prepare_execute_params(params.order, True) params.signature = params.signature or self._sign( VertexExecuteType.PLACE_ORDER, params.order.dict(), params.product_id ) return self.execute(params)
[docs] def place_market_order(self, params: PlaceMarketOrderParams) -> ExecuteResponse: """ Places an FOK order using top of the book price with provided slippage. Args: params (PlaceMarketOrderParams): Parameters required for placing a market order. Returns: ExecuteResponse: Response of the execution, including status and potential error message. """ orderbook = self._querier.get_market_liquidity(params.product_id, 1) is_bid = int(params.market_order.amount) > 0 self._assert_book_not_empty(orderbook.bids, orderbook.asks, is_bid) slippage = to_x18(params.slippage or 0.005) # defaults to 0.5% market_price_x18 = ( mul_x18(orderbook.bids[0][0], to_x18(1) + slippage) if is_bid else mul_x18(orderbook.asks[0][0], to_x18(1) - slippage) ) price_increment_x18 = self._querier._get_subaccount_product_position( subaccount_to_hex(params.market_order.sender), params.product_id ).product.book_info.price_increment_x18 order = OrderParams( sender=params.market_order.sender, amount=params.market_order.amount, nonce=params.market_order.nonce, priceX18=round_x18(market_price_x18, price_increment_x18), expiration=get_expiration_timestamp( OrderType.FOK, int(time.time()) + 1000, ), ) return self.place_order( PlaceOrderParams( # type: ignore product_id=params.product_id, order=order, spot_leverage=params.spot_leverage, signature=params.signature, ) )
[docs] def cancel_orders(self, params: CancelOrdersParams) -> ExecuteResponse: """ Execute a cancel orders operation. Args: params (CancelOrdersParams): Parameters required for canceling orders. The parameters include the order digests to be cancelled. Returns: ExecuteResponse: Response of the execution, including status and potential error message. """ params = self.prepare_execute_params(CancelOrdersParams.parse_obj(params), True) params.signature = params.signature or self._sign( VertexExecuteType.CANCEL_ORDERS, params.dict() ) return self.execute(params)
[docs] def cancel_product_orders( self, params: CancelProductOrdersParams ) -> ExecuteResponse: """ Execute a cancel product orders operation. Args: params (CancelProductOrdersParams): Parameters required for bulk canceling orders of specific products. The parameters include a list of product ids to bulk cancel orders for. Returns: ExecuteResponse: Response of the execution, including status and potential error message. """ params = self.prepare_execute_params( CancelProductOrdersParams.parse_obj(params), True ) params.signature = params.signature or self._sign( VertexExecuteType.CANCEL_PRODUCT_ORDERS, params.dict() ) return self.execute(params)
[docs] def cancel_and_place(self, params: CancelAndPlaceParams) -> ExecuteResponse: """ Execute a cancel and place operation. Args: params (CancelAndPlaceParams): Parameters required for cancel and place. Returns: ExecuteResponse: Response of the execution, including status and potential error message. """ cancel_orders: CancelOrdersParams = self.prepare_execute_params( CancelOrdersParams.parse_obj(params.cancel_orders), True ) cancel_orders.signature = cancel_orders.signature or self._sign( VertexExecuteType.CANCEL_ORDERS, cancel_orders.dict() ) place_order: PlaceOrderParams = PlaceOrderParams.parse_obj(params.place_order) place_order.order = self.prepare_execute_params(place_order.order, True) place_order.signature = place_order.signature or self._sign( VertexExecuteType.PLACE_ORDER, place_order.order.dict(), place_order.product_id, ) return self.execute( CancelAndPlaceParams(cancel_orders=cancel_orders, place_order=place_order) )
[docs] def withdraw_collateral(self, params: WithdrawCollateralParams) -> ExecuteResponse: """ Execute a withdraw collateral operation. Args: params (WithdrawCollateralParams): Parameters required for withdrawing collateral. The parameters include the collateral details. Returns: ExecuteResponse: Response of the execution, including status and potential error message. """ params = self.prepare_execute_params( WithdrawCollateralParams.parse_obj(params), False ) params.signature = params.signature or self._sign( VertexExecuteType.WITHDRAW_COLLATERAL, params.dict() ) return self.execute(params)
[docs] def liquidate_subaccount( self, params: LiquidateSubaccountParams ) -> ExecuteResponse: """ Execute a liquidate subaccount operation. Args: params (LiquidateSubaccountParams): Parameters required for liquidating a subaccount. The parameters include the liquidatee details. Returns: ExecuteResponse: Response of the execution, including status and potential error message. """ params = self.prepare_execute_params( LiquidateSubaccountParams.parse_obj(params), False ) params.signature = params.signature or self._sign( VertexExecuteType.LIQUIDATE_SUBACCOUNT, params.dict(), ) return self.execute(params)
[docs] def mint_lp(self, params: MintLpParams) -> ExecuteResponse: """ Execute a mint LP tokens operation. Args: params (MintLpParams): Parameters required for minting LP tokens. The parameters include the LP details. Returns: ExecuteResponse: Response of the execution, including status and potential error message. """ params = self.prepare_execute_params(MintLpParams.parse_obj(params), False) params.signature = params.signature or self._sign( VertexExecuteType.MINT_LP, params.dict(), ) return self.execute(params)
[docs] def burn_lp(self, params: BurnLpParams) -> ExecuteResponse: """ Execute a burn LP tokens operation. Args: params (BurnLpParams): Parameters required for burning LP tokens. The parameters include the LP details. Returns: ExecuteResponse: Response of the execution, including status and potential error message. """ params = self.prepare_execute_params(BurnLpParams.parse_obj(params), False) params.signature = params.signature or self._sign( VertexExecuteType.BURN_LP, params.dict(), ) return self.execute(params)
[docs] def close_position( self, subaccount: Subaccount, product_id: int ) -> ExecuteResponse: """ Execute a place order operation to close a position for the provided `product_id`. Attributes: subaccount (Subaccount): The subaccount to close position for. product_id (int): The ID of the product to close position for. Returns: ExecuteResponse: Response of the execution, including status and potential error message. """ subaccount = subaccount_to_hex(subaccount) position = self._querier._get_subaccount_product_position( subaccount, product_id ) balance, product = position.balance, position.product closing_spread_x18 = to_x18(0.005) closing_price_x18 = ( mul_x18(product.oracle_price_x18, to_x18(1) - closing_spread_x18) if int(balance.balance.amount) > 0 else mul_x18(product.oracle_price_x18, to_x18(1) + closing_spread_x18) ) return self.place_order( PlaceOrderParams( # type: ignore product_id=product_id, order=OrderParams( # type: ignore sender=subaccount, amount=-round_x18( balance.balance.amount, product.book_info.size_increment, ), priceX18=round_x18( closing_price_x18, product.book_info.price_increment_x18, ), expiration=get_expiration_timestamp( OrderType.FOK, int(time.time()) + 1000, ), ), ) )