Переглянути джерело

Add mempool program subscription (#4)

buffalu 2 роки тому
батько
коміт
882893f726

+ 27 - 2
jito_searcher_client/examples/searcher-cli.py

@@ -18,6 +18,9 @@ from jito_searcher_client.generated.searcher_pb2 import (
     NextScheduledLeaderResponse,
     PendingTxSubscriptionRequest,
     SendBundleRequest,
+    MempoolSubscription,
+    WriteLockedAccountSubscriptionV0,
+    ProgramSubscriptionV0,
 )
 from jito_searcher_client.generated.searcher_pb2_grpc import SearcherServiceStub
 from jito_searcher_client.searcher import get_searcher_client
@@ -53,14 +56,35 @@ def cli(
 @click.argument("accounts", required=True, nargs=-1)
 def mempool_accounts(client: SearcherServiceStub, accounts: List[str]):
     """
-    Stream pending transactions from write-locked accounts.
+    Stream transactions from the mempool if they write-lock one of the provided accounts
     """
     leader: NextScheduledLeaderResponse = client.GetNextScheduledLeader(NextScheduledLeaderRequest())
     print(
         f"next scheduled leader is {leader.next_leader_identity} in {leader.next_leader_slot - leader.current_slot} slots"
     )
 
-    for notification in client.SubscribePendingTransactions(PendingTxSubscriptionRequest(accounts=accounts)):
+    for notification in client.SubscribeMempool(
+        MempoolSubscription(wla_v0_sub=WriteLockedAccountSubscriptionV0(accounts=accounts))
+    ):
+        for packet in notification.transactions:
+            print(VersionedTransaction.from_bytes(packet.data))
+
+
+@click.command("mempool-programs")
+@click.pass_obj
+@click.argument("programs", required=True, nargs=-1)
+def mempool_programs(client: SearcherServiceStub, programs: List[str]):
+    """
+    Stream transactions from the mempool if they mention one of the provided programs
+    """
+    leader: NextScheduledLeaderResponse = client.GetNextScheduledLeader(NextScheduledLeaderRequest())
+    print(
+        f"next scheduled leader is {leader.next_leader_identity} in {leader.next_leader_slot - leader.current_slot} slots"
+    )
+
+    for notification in client.SubscribeMempool(
+        MempoolSubscription(program_v0_sub=ProgramSubscriptionV0(programs=programs))
+    ):
         for packet in notification.transactions:
             print(VersionedTransaction.from_bytes(packet.data))
 
@@ -204,6 +228,7 @@ def send_bundle(
 
 if __name__ == "__main__":
     cli.add_command(mempool_accounts)
+    cli.add_command(mempool_programs)
     cli.add_command(next_scheduled_leader)
     cli.add_command(connected_leaders)
     cli.add_command(tip_accounts)

Різницю між файлами не показано, бо вона завелика
+ 0 - 0
jito_searcher_client/jito_searcher_client/generated/block_engine_pb2.py


+ 26 - 0
jito_searcher_client/jito_searcher_client/generated/block_engine_pb2.pyi

@@ -148,6 +148,32 @@ class AccountsOfInterestUpdate(google.protobuf.message.Message):
 
 global___AccountsOfInterestUpdate = AccountsOfInterestUpdate
 
+@typing_extensions.final
+class ProgramsOfInterestRequest(google.protobuf.message.Message):
+    DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+    def __init__(
+        self,
+    ) -> None: ...
+
+global___ProgramsOfInterestRequest = ProgramsOfInterestRequest
+
+@typing_extensions.final
+class ProgramsOfInterestUpdate(google.protobuf.message.Message):
+    DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+    PROGRAMS_FIELD_NUMBER: builtins.int
+    @property
+    def programs(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]: ...
+    def __init__(
+        self,
+        *,
+        programs: collections.abc.Iterable[builtins.str] | None = ...,
+    ) -> None: ...
+    def ClearField(self, field_name: typing_extensions.Literal["programs", b"programs"]) -> None: ...
+
+global___ProgramsOfInterestUpdate = ProgramsOfInterestUpdate
+
 @typing_extensions.final
 class ExpiringPacketBatch(google.protobuf.message.Message):
     """A series of packets with an expiration attached to them.

+ 33 - 0
jito_searcher_client/jito_searcher_client/generated/block_engine_pb2_grpc.py

@@ -155,6 +155,11 @@ class BlockEngineRelayerStub(object):
                 request_serializer=block__engine__pb2.AccountsOfInterestRequest.SerializeToString,
                 response_deserializer=block__engine__pb2.AccountsOfInterestUpdate.FromString,
                 )
+        self.SubscribeProgramsOfInterest = channel.unary_stream(
+                '/block_engine.BlockEngineRelayer/SubscribeProgramsOfInterest',
+                request_serializer=block__engine__pb2.ProgramsOfInterestRequest.SerializeToString,
+                response_deserializer=block__engine__pb2.ProgramsOfInterestUpdate.FromString,
+                )
         self.StartExpiringPacketStream = channel.stream_stream(
                 '/block_engine.BlockEngineRelayer/StartExpiringPacketStream',
                 request_serializer=block__engine__pb2.PacketBatchUpdate.SerializeToString,
@@ -176,6 +181,12 @@ class BlockEngineRelayerServicer(object):
         context.set_details('Method not implemented!')
         raise NotImplementedError('Method not implemented!')
 
+    def SubscribeProgramsOfInterest(self, request, context):
+        """Missing associated documentation comment in .proto file."""
+        context.set_code(grpc.StatusCode.UNIMPLEMENTED)
+        context.set_details('Method not implemented!')
+        raise NotImplementedError('Method not implemented!')
+
     def StartExpiringPacketStream(self, request_iterator, context):
         """Validators can subscribe to packets from the relayer and receive a multiplexed signal that contains a mixture
         of packets and heartbeats.
@@ -195,6 +206,11 @@ def add_BlockEngineRelayerServicer_to_server(servicer, server):
                     request_deserializer=block__engine__pb2.AccountsOfInterestRequest.FromString,
                     response_serializer=block__engine__pb2.AccountsOfInterestUpdate.SerializeToString,
             ),
+            'SubscribeProgramsOfInterest': grpc.unary_stream_rpc_method_handler(
+                    servicer.SubscribeProgramsOfInterest,
+                    request_deserializer=block__engine__pb2.ProgramsOfInterestRequest.FromString,
+                    response_serializer=block__engine__pb2.ProgramsOfInterestUpdate.SerializeToString,
+            ),
             'StartExpiringPacketStream': grpc.stream_stream_rpc_method_handler(
                     servicer.StartExpiringPacketStream,
                     request_deserializer=block__engine__pb2.PacketBatchUpdate.FromString,
@@ -229,6 +245,23 @@ class BlockEngineRelayer(object):
             options, channel_credentials,
             insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
 
+    @staticmethod
+    def SubscribeProgramsOfInterest(request,
+            target,
+            options=(),
+            channel_credentials=None,
+            call_credentials=None,
+            insecure=False,
+            compression=None,
+            wait_for_ready=None,
+            timeout=None,
+            metadata=None):
+        return grpc.experimental.unary_stream(request, target, '/block_engine.BlockEngineRelayer/SubscribeProgramsOfInterest',
+            block__engine__pb2.ProgramsOfInterestRequest.SerializeToString,
+            block__engine__pb2.ProgramsOfInterestUpdate.FromString,
+            options, channel_credentials,
+            insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
+
     @staticmethod
     def StartExpiringPacketStream(request_iterator,
             target,

+ 10 - 0
jito_searcher_client/jito_searcher_client/generated/block_engine_pb2_grpc.pyi

@@ -72,6 +72,10 @@ class BlockEngineRelayerStub:
     / For all transactions the relayer receives, it forwards transactions to the block engine which write-lock
     / any of the accounts in the AOI.
     """
+    SubscribeProgramsOfInterest: grpc.UnaryStreamMultiCallable[
+        block_engine_pb2.ProgramsOfInterestRequest,
+        block_engine_pb2.ProgramsOfInterestUpdate,
+    ]
     StartExpiringPacketStream: grpc.StreamStreamMultiCallable[
         block_engine_pb2.PacketBatchUpdate,
         block_engine_pb2.StartExpiringPacketStreamResponse,
@@ -99,6 +103,12 @@ class BlockEngineRelayerServicer(metaclass=abc.ABCMeta):
         / any of the accounts in the AOI.
         """
     @abc.abstractmethod
+    def SubscribeProgramsOfInterest(
+        self,
+        request: block_engine_pb2.ProgramsOfInterestRequest,
+        context: grpc.ServicerContext,
+    ) -> collections.abc.Iterator[block_engine_pb2.ProgramsOfInterestUpdate]: ...
+    @abc.abstractmethod
     def StartExpiringPacketStream(
         self,
         request_iterator: collections.abc.Iterator[block_engine_pb2.PacketBatchUpdate],

Різницю між файлами не показано, бо вона завелика
+ 0 - 0
jito_searcher_client/jito_searcher_client/generated/searcher_pb2.py


+ 54 - 0
jito_searcher_client/jito_searcher_client/generated/searcher_pb2.pyi

@@ -68,6 +68,60 @@ class SendBundleResponse(google.protobuf.message.Message):
 
 global___SendBundleResponse = SendBundleResponse
 
+@typing_extensions.final
+class ProgramSubscriptionV0(google.protobuf.message.Message):
+    DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+    PROGRAMS_FIELD_NUMBER: builtins.int
+    @property
+    def programs(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]: ...
+    def __init__(
+        self,
+        *,
+        programs: collections.abc.Iterable[builtins.str] | None = ...,
+    ) -> None: ...
+    def ClearField(self, field_name: typing_extensions.Literal["programs", b"programs"]) -> None: ...
+
+global___ProgramSubscriptionV0 = ProgramSubscriptionV0
+
+@typing_extensions.final
+class WriteLockedAccountSubscriptionV0(google.protobuf.message.Message):
+    DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+    ACCOUNTS_FIELD_NUMBER: builtins.int
+    @property
+    def accounts(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]: ...
+    def __init__(
+        self,
+        *,
+        accounts: collections.abc.Iterable[builtins.str] | None = ...,
+    ) -> None: ...
+    def ClearField(self, field_name: typing_extensions.Literal["accounts", b"accounts"]) -> None: ...
+
+global___WriteLockedAccountSubscriptionV0 = WriteLockedAccountSubscriptionV0
+
+@typing_extensions.final
+class MempoolSubscription(google.protobuf.message.Message):
+    DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+    PROGRAM_V0_SUB_FIELD_NUMBER: builtins.int
+    WLA_V0_SUB_FIELD_NUMBER: builtins.int
+    @property
+    def program_v0_sub(self) -> global___ProgramSubscriptionV0: ...
+    @property
+    def wla_v0_sub(self) -> global___WriteLockedAccountSubscriptionV0: ...
+    def __init__(
+        self,
+        *,
+        program_v0_sub: global___ProgramSubscriptionV0 | None = ...,
+        wla_v0_sub: global___WriteLockedAccountSubscriptionV0 | None = ...,
+    ) -> None: ...
+    def HasField(self, field_name: typing_extensions.Literal["msg", b"msg", "program_v0_sub", b"program_v0_sub", "wla_v0_sub", b"wla_v0_sub"]) -> builtins.bool: ...
+    def ClearField(self, field_name: typing_extensions.Literal["msg", b"msg", "program_v0_sub", b"program_v0_sub", "wla_v0_sub", b"wla_v0_sub"]) -> None: ...
+    def WhichOneof(self, oneof_group: typing_extensions.Literal["msg", b"msg"]) -> typing_extensions.Literal["program_v0_sub", "wla_v0_sub"] | None: ...
+
+global___MempoolSubscription = MempoolSubscription
+
 @typing_extensions.final
 class PendingTxSubscriptionRequest(google.protobuf.message.Message):
     DESCRIPTOR: google.protobuf.descriptor.Descriptor

+ 35 - 0
jito_searcher_client/jito_searcher_client/generated/searcher_pb2_grpc.py

@@ -25,6 +25,11 @@ class SearcherServiceStub(object):
                 request_serializer=searcher__pb2.PendingTxSubscriptionRequest.SerializeToString,
                 response_deserializer=searcher__pb2.PendingTxNotification.FromString,
                 )
+        self.SubscribeMempool = channel.unary_stream(
+                '/searcher.SearcherService/SubscribeMempool',
+                request_serializer=searcher__pb2.MempoolSubscription.SerializeToString,
+                response_deserializer=searcher__pb2.PendingTxNotification.FromString,
+                )
         self.SendBundle = channel.unary_unary(
                 '/searcher.SearcherService/SendBundle',
                 request_serializer=searcher__pb2.SendBundleRequest.SerializeToString,
@@ -61,6 +66,14 @@ class SearcherServiceServicer(object):
     def SubscribePendingTransactions(self, request, context):
         """RPC endpoint to subscribe to pending transactions. Clients can provide a list of base58 encoded accounts.
         Any transactions that write-lock the provided accounts will be streamed to the searcher.
+        NOTE: DEPRECATED SOON!!!
+        """
+        context.set_code(grpc.StatusCode.UNIMPLEMENTED)
+        context.set_details('Method not implemented!')
+        raise NotImplementedError('Method not implemented!')
+
+    def SubscribeMempool(self, request, context):
+        """RPC endpoint to subscribe to mempool based on a few filters
         """
         context.set_code(grpc.StatusCode.UNIMPLEMENTED)
         context.set_details('Method not implemented!')
@@ -106,6 +119,11 @@ def add_SearcherServiceServicer_to_server(servicer, server):
                     request_deserializer=searcher__pb2.PendingTxSubscriptionRequest.FromString,
                     response_serializer=searcher__pb2.PendingTxNotification.SerializeToString,
             ),
+            'SubscribeMempool': grpc.unary_stream_rpc_method_handler(
+                    servicer.SubscribeMempool,
+                    request_deserializer=searcher__pb2.MempoolSubscription.FromString,
+                    response_serializer=searcher__pb2.PendingTxNotification.SerializeToString,
+            ),
             'SendBundle': grpc.unary_unary_rpc_method_handler(
                     servicer.SendBundle,
                     request_deserializer=searcher__pb2.SendBundleRequest.FromString,
@@ -170,6 +188,23 @@ class SearcherService(object):
             options, channel_credentials,
             insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
 
+    @staticmethod
+    def SubscribeMempool(request,
+            target,
+            options=(),
+            channel_credentials=None,
+            call_credentials=None,
+            insecure=False,
+            compression=None,
+            wait_for_ready=None,
+            timeout=None,
+            metadata=None):
+        return grpc.experimental.unary_stream(request, target, '/searcher.SearcherService/SubscribeMempool',
+            searcher__pb2.MempoolSubscription.SerializeToString,
+            searcher__pb2.PendingTxNotification.FromString,
+            options, channel_credentials,
+            insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
+
     @staticmethod
     def SendBundle(request,
             target,

+ 14 - 0
jito_searcher_client/jito_searcher_client/generated/searcher_pb2_grpc.pyi

@@ -23,7 +23,13 @@ class SearcherServiceStub:
     ]
     """RPC endpoint to subscribe to pending transactions. Clients can provide a list of base58 encoded accounts.
     Any transactions that write-lock the provided accounts will be streamed to the searcher.
+    NOTE: DEPRECATED SOON!!!
     """
+    SubscribeMempool: grpc.UnaryStreamMultiCallable[
+        searcher_pb2.MempoolSubscription,
+        searcher_pb2.PendingTxNotification,
+    ]
+    """RPC endpoint to subscribe to mempool based on a few filters"""
     SendBundle: grpc.UnaryUnaryMultiCallable[
         searcher_pb2.SendBundleRequest,
         searcher_pb2.SendBundleResponse,
@@ -62,8 +68,16 @@ class SearcherServiceServicer(metaclass=abc.ABCMeta):
     ) -> collections.abc.Iterator[searcher_pb2.PendingTxNotification]:
         """RPC endpoint to subscribe to pending transactions. Clients can provide a list of base58 encoded accounts.
         Any transactions that write-lock the provided accounts will be streamed to the searcher.
+        NOTE: DEPRECATED SOON!!!
         """
     @abc.abstractmethod
+    def SubscribeMempool(
+        self,
+        request: searcher_pb2.MempoolSubscription,
+        context: grpc.ServicerContext,
+    ) -> collections.abc.Iterator[searcher_pb2.PendingTxNotification]:
+        """RPC endpoint to subscribe to mempool based on a few filters"""
+    @abc.abstractmethod
     def SendBundle(
         self,
         request: searcher_pb2.SendBundleRequest,

+ 1 - 1
mev-protos

@@ -1 +1 @@
-Subproject commit 5213a506c09543743f843c2c825e6da65d85d561
+Subproject commit 0b1c7494ee9798f7c1df154eca8ae43a38e8c6b6

Деякі файли не було показано, через те що забагато файлів було змінено