My V2 mempool exploration was both fun and informative, but I was never a big fan of doing so much work inside the watch_pending_transactions
function.
Some objections:
Extreme verbosity — many lines of code.
Not portable — differences among the various DEX code bases made it necessary to handle several function names and token types inside a block of code. Adding V3 functions was even worse, requiring a completely new block of dense code.
Monolithic — the
watch_pending_transactions
function should have a single concern (watching pending transactions, naturally). Giving it the related job of decoding and simulating the effect of observed mempool transactions was too much.
So with V3 I’m going to do it right!
First, let’s talk about what I want to do, then we’ll get around to doing it.
Design Improvements
Instead of asking the execution worker (the bot) to handle the task of simulating and predicting mempool transactions, I will instead shift the task to the liquidity pool helpers. I made a similar choice with the recent UniswapLpCycle
helper, which wrapped the payload generation duty into the class, which really cleaned things up on the execution side.
By shifting the burden of prediction to the liquidity helper, the execution worker is now free to observe, route, and execute. Whenever a new mempool transaction is observed, it can be quickly identified as interesting, then added to a queue for further processing.
General Architecture
The helper we design should have a generic interface that is ideally repeatable across the generic DeFi ecosystem, and broadly applicable at worst. A few broad concepts:
Every transaction will have some associated state-changing effect. This usually results in a liquidity pool undergoing a set of changes in value.
Every transaction will encode some amounts associated with the proposed state-change. A V2 pool swap, for example, could specify an swap amount in and a minimum swap amount out. Whether this swap is possible is determined entirely by the pool state at the time of block building.
Every transaction will encode some associated gas values that effect how quickly it will be selected and recorded by validators / miners / searchers.
The gas values are not particularly relevant from the point of view of the liquidity pool, so we will disregard those and consider the state-changing effects only.
In essence, the question we want to answer when observing a given transaction is “what is the liquidity pool state after this transaction executes?”
To determine this information, we need a sensible way to express a given transaction in terms of its requested state-changes .
Let’s think about a simple V2 swap using the function swapExactTokensForTokens. It will be submitted with the following values:
uint amountIn
uint amountOutMin
address[] calldata path
address to
uint deadline
If these don’t look immediately familiar to you, read the archive!
If asked to do swap prediction, a V2 liquidity pool helper would care about amountIn
and, to a lesser extent, amountOutMin
. It would rely on the router to split apart the path
(a list of tokens), enforce the deadline
, perform the swaps, then forward the appropriate final amount onward to the to
address.
So if we wanted to design interfaces to handle this event, we’d want to handle two related “packets” of information:
The requested swap amounts
The token path
For a simple swap with two tokens held by a common pool, a V2 liquidity pool helper can do it all!
But what if the swap isn’t simple, and crosses multiple pools? Now we have to consider a lot more. This complex swap must now keep track of multiple pool states.
The temptation is to extend the liquidity pool helpers to support this mechanism, but that’s a trap that will lead us into clunky helpers that do too much (same problem at managing this at the execution level, just in a different spot).
Tracking pool states across different helpers is similar in purpose to the arbitrage helper. That helper maintains a reference to all pool helpers involved along its path, maintains a copy of the state from each of those pools, and intelligently retrieves information from those pool helpers when needed to perform specialized calculations.
We can take the same approach here.
I propose creating a new helper class, tasked with analyzing the effect of an arbitrary transaction. We need to define it and give it a sensible name. Since it will monitor and predict the effect of Uniswap transactions, we’ll call it UniswapTransaction
. We’ll make it V2/V3 from the start to avoid clumsy integrations later.
Beginning with the end in mind, I also propose creating a base class to derive from called Transaction
. UniswapTransaction
will be derived from Transaction
. This generalized Transaction
class can be used in the future to derive specific transaction helpers for other ecosystems (CurveTransaction, OpenseaTransaction
, AaveTransaction
, etc.)
State Tracking
A critical question: what state should this helper store and what state should it defer to others?
When we observe a Uniswap transaction, we care primarily about how the transaction will affect the liquidity pools along its path. So at minimum, the transaction helper should maintain a link to each pool along its path, and their current states (reserves, liquidity, price, tick, etc.)
Building the Helper
To begin, I create an abstract base class. It will be used to derive all other transaction helpers:
degenbot/transaction/base.py
from abc import ABC
class Transaction(ABC):
pass
Then begin deriving my UniswapTransaction
class:
degenbot/transaction/uniswap_transaction.py
from .base import Transaction
class UniswapTransaction(Transaction):
pass
Now let’s consider how the typical lifetime of the helper. Whenever the execution worker observes an interesting transaction, it can instantiate one of these. We should expect that the execution worker will have all necessary information to build this helper, so the helper itself can be “dumb” and just receive the relevant information in its constructor.
A Uniswap transaction helper should be aware of the pool objects involved, a unique identifier for the transaction (its hash works well), and the parameters of the transaction itself.
from .base import Transaction
from typing import Union, List
from degenbot.uniswap.v2 import LiquidityPool
from degenbot.uniswap.v3 import V3LiquidityPool
class UniswapTransaction(Transaction):
def __init__(
self,
hash: str,
transaction_func: str,
transaction_params: dict,
pools: List[Union[LiquidityPool, V3LiquidityPool]],
):
self.pools = pools
self.hash = hash
self.transaction_func = transaction_func
self.transaction_params = transaction_params
print(f"Identified {transaction_func} transaction with params:")
# process the key-value pairs in the transaction dictionary
for key, value in transaction_params.items():
print(f" • {key} = {value}")
I’ve added some light type hinting here, but will largely skip the input validation in the constructor. After further defining the class, it will raise exceptions if pools
does not contain the correct object types, the list of pools
does not match the path
in transaction_params
, the transaction_params
dictionary is missing keys, etc.
The transaction_params
dict should match the dictionary format provided by web3py
’s decode_function_input
method. Here’s an example of a V2 swap I pulled from the pending transactions running through my node:
{
'amountOutMin': 6800518,
'path': [
'0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2',
'0xdAC17F958D2ee523a2206206994597C13D831ec7'
],
'to': '0xde27FA3C1dB3018ffccC78cF37BA30Ff679e2833',
'deadline': 1627464375530
}
Now let’s build a simple watcher script that connects to the node, waits for any transaction going to a Uniswap router, decodes it, then builds the UniswapTransaction
object.
It’s extremely basic but illustrates how the execution worker can offload the transaction-specific details to the helper after it decodes the transaction.
ethereum_pending_tx_watcher.py
import asyncio
import json
import websockets
import sys
import web3
import brownie
import os
import degenbot as bot
os.environ["ETHERSCAN_TOKEN"] = "[redacted]"
WEBSOCKET_URI = "ws://localhost:8546"
w3 = web3.Web3(web3.WebsocketProvider(WEBSOCKET_URI))
brownie.network.connect("mainnet-local")
async def main():
await asyncio.gather(asyncio.create_task(watch_events()))
async def watch_events():
async for websocket in websockets.connect(uri=WEBSOCKET_URI):
try:
await websocket.send(
json.dumps(
{
"id": 1,
"method": "eth_subscribe",
"params": ["newPendingTransactions"],
}
)
)
result = await websocket.recv()
print(result)
while True:
message = await websocket.recv()
tx_hash = json.loads(message)["params"]["result"]
try:
transaction = w3.eth.get_transaction(tx_hash)
except Exception as e:
# ignore any TX that cannot be found
continue
if transaction["to"] in ROUTERS.keys():
func_object, func_parameters = w3.eth.contract(
address=transaction["to"],
abi=ROUTERS[transaction["to"]]["abi"],
).decode_function_input(transaction["input"])
print()
tx_helper = bot.transaction.UniswapTransaction(
hash=transaction.hash.hex(),
transaction_func=func_object.fn_name,
transaction_params=func_parameters,
pools=[],
)
del tx_helper
except websockets.ConnectionClosed:
print("reconnecting...")
continue
except Exception as e:
print(e)
ROUTERS = {
w3.toChecksumAddress(router_address): dex_info
for dex in [
{
"0xd9e1cE17f2641f24aE83637ab66a2cca9C378B9F": {
"name": "Sushiswap",
"uniswap_version": 2,
}
},
{
"0xf164fC0Ec4E93095b804a4795bBe1e041497b92a": {
"name": "UniswapV2: Router",
"uniswap_version": 2,
}
},
{
"0x7a250d5630B4cF539739dF2C5dAcb4c659F2488D": {
"name": "UniswapV2: Router 2",
"uniswap_version": 2,
}
},
{
"0xE592427A0AEce92De3Edee1F18E0157C05861564": {
"name": "UniswapV3: Router",
"uniswap_version": 3,
}
},
{
"0x68b3465833fb72A70ecDF485E0e4C7bD8665Fc45": {
"name": "UniswapV3: Router 2",
"uniswap_version": 3,
}
},
]
for router_address, dex_info in dex.items()
}
for router_address in ROUTERS.keys():
factoryV2_address = None
factoryV2_contract = None
try:
router_contract = brownie.Contract(router_address)
except Exception as e:
print(e)
try:
router_contract = brownie.Contract.from_explorer(
router_address
)
except Exception as e:
print(e)
else:
ROUTERS[router_address]["abi"] = router_contract.abi
ROUTERS[router_address]["web3_contract"] = w3.eth.contract(
address=router_address,
abi=router_contract.abi,
)
uniswap_version = ROUTERS[router_address].get("uniswap_version")
assert uniswap_version in [2, 3]
if uniswap_version == 2:
try:
factoryV2_address = w3.toChecksumAddress(
router_contract.factory()
)
except Exception as e:
print(e)
else:
try:
factoryV2_contract = brownie.Contract(factoryV2_address)
except Exception as e:
print(e)
try:
factoryV2_contract = brownie.Contract.from_explorer(
factoryV2_address
)
except Exception as e:
print(e)
finally:
if factoryV2_address and factoryV2_contract:
ROUTERS[router_address][
"factoryV2_address"
] = factoryV2_address
ROUTERS[router_address][
"factoryV2_contract"
] = factoryV2_contract
else:
sys.exit("FAILED TO LOAD V2 FACTORY")
elif uniswap_version == 3:
try:
factoryV2_address = w3.toChecksumAddress(
router_contract.factoryV2()
)
except AttributeError:
pass
except Exception as e:
print(e)
else:
try:
factoryV2_contract = brownie.Contract(factoryV2_address)
except:
factoryV2_contract = brownie.Contract.from_explorer(
factoryV2_address
)
finally:
if factoryV2_address and factoryV2_contract:
ROUTERS[router_address][
"factoryV2_address"
] = factoryV2_address
ROUTERS[router_address][
"factoryV2_contract"
] = factoryV2_contract
try:
factoryV3_address = w3.toChecksumAddress(
router_contract.factory()
)
except Exception as e:
print(e)
else:
try:
factoryV3_contract = brownie.Contract(factoryV3_address)
except:
factoryV3_contract = brownie.Contract.from_explorer(
factoryV3_address
)
finally:
if factoryV3_address and factoryV3_contract:
ROUTERS[router_address][
"factoryV3_address"
] = factoryV3_address
ROUTERS[router_address][
"factoryV3_contract"
] = factoryV3_contract
else:
sys.exit("FAILED TO LOAD V3 FACTORY")
if __name__ == "__main__":
asyncio.run(main())
NOTE: I have passed an empty pools
list since I don’t intend to do any real processing (yet).
Running it for a bit, many Uniswap transactions appear:
Identified swapExactETHForTokensSupportingFeeOnTransferTokens transaction with params:
• amountOutMin = 3371839655230
• path = ['0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2', '0xd6EfAfe584A56a359ce8d752339f696209Fd874E']
• to = 0x7a0C47ca5b68ef655924E0BF7088f8Dc5afc61B0
• deadline = 1674930320
Identified swapExactETHForTokens transaction with params:
• amountOutMin = 73973508291586
• path = ['0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2', '0xbE5958a396115a5D6E9c9D86ca683fa3D642569E']
• to = 0x111aCd3B56b12090fe422E8CEdC2aBE6C5b92b65
• deadline = 1674930204
Identified swapExactETHForTokens transaction with params:
• amountOutMin = 504889116269959214561326906
• path = ['0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2', '0x378bF3A945891D9C7048D3615F71bA4efdA8EC72']
• to = 0xC8CD043B79509Cc3fea214e4946cfB74a9825b45
• deadline = 1674930505
Identified swapExactETHForTokens transaction with params:
• amountOutMin = 100093038836420359387
• path = ['0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2', '0xa5f2211B9b8170F694421f2046281775E8468044']
• to = 0xABf11abcc905277D9EB43F3c8Ef12Ca81101a3a7
• deadline = 1674931871
[...]
I want to filter this down without adding too much complexity, so I’m going to watch only transactions where the USDC token (address 0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48) and USDT token (0xdAC17F958D2ee523a2206206994597C13D831ec7) appears in the swap path. There’s a lot of activity with these tokens so it should not take long to see results.
Modify this code block slightly to include that token filter:
if transaction["to"] in ROUTERS.keys():
func_object, func_parameters = w3.eth.contract(
address=transaction["to"],
abi=ROUTERS[transaction["to"]]["abi"],
).decode_function_input(transaction["input"])
if func_parameters.get("path") and (
"0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"
in func_parameters["path"]
)
or (
"0xdAC17F958D2ee523a2206206994597C13D831ec7"
in func_parameters["path"]
)
):
print()
tx_helper = bot.transaction.UniswapTransaction(
hash=transaction.hash.hex(),
transaction_func=func_object.fn_name,
transaction_params=func_parameters,
pools=[],
)
del tx_helper
Now I see fewer transactions when I run the script.
Identified swapTokensForExactTokens transaction with params:
• amountOut = 1894810597757438699773
• amountInMax = 1781432504
• path = ['0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48', '0x0f51bb10119727a7e5eA3538074fb341F56B09Ad']
• to = 0x28B2399C9aB816351351851952f2981BC9fb69Aa
• deadline = 9999999999999
Identified swapExactETHForTokens transaction with params:
• amountOutMin = 129238599547
• path = ['0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2', '0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48', '0x2b591e99afE9f32eAA6214f7B7629768c40Eeb39']
• to = 0x25e7eA5986ebE98a576C124007c1454628f5bEBF
• deadline = 1630622022
[...]
Now I’ll make an “on demand” pool loader that will fetch the V2 pool address from the factory, create a LiquidityPool helper for it, then proceed to build the transaction helper from those pools:
###### START OF ADDED CODE
tx_token_pairs = [
pair
for pair in itertools.pairwise(
func_parameters["path"]
)
]
tx_lp_objects = []
for token0, token1 in tx_token_pairs:
print(f"{token0} → {token1}")
# get info from the V2 factory
lp_address = ROUTERS[transaction["to"]][
"factoryV2_contract"
].getPair(token0, token1)
# build a LiquidityPool object from the token addresses
lp_helper = bot.uniswap.v2.LiquidityPool(
address=lp_address, silent=True
)
tx_lp_objects.append(lp_helper)
###### END OF ADDED CODE
And add another code block to the helper that displays pool info:
print("Identified pools:")
for pool in pools:
print(f" • {pool}")
Now the script is displaying a lot more stuff:
Identified swapTokensForExactTokens transaction with params:
• amountOut = 95988860000000000000000
• amountInMax = 172355384
• path = ['0xdAC17F958D2ee523a2206206994597C13D831ec7', '0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2', '0x5CB3ce6D081fB00d5f6677d196f2d70010EA3f4a']
• to = 0xfB0fce91022Ccf15f1CfC247B77047C21fC742C0
• deadline = 1675112364
Identified pools:
• WETH-USDT (V2, 0.30%)
• BUSY-WETH (V2, 0.30%)
Identified swapExactTokensForTokens transaction with params:
• amountIn = 200000000
• amountOutMin = 97197016356944748230098992
• path = ['0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48', '0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2', '0xcb44CDEdFC80a07BC794Ce285Dff3ACF6b1d7909']
• to = 0x4BeA976f3FD74935bba444E58388a4F6b205DAb1
• deadline = 1674933515
Identified pools:
• USDC-WETH (V2, 0.30%)
• WETH-MRSHIB (V2, 0.30%)
[...]
NOTE: 😂 at the MRSHIB token. You see the weirdest shitcoins when you’re playing around on mainnet.
Transaction Simulation
Now let’s add one more feature.
This will be limited to a single case (swapExactTokensForTokens
) for sake of brevity. We will build a new method called simulate
that will model the effect of the pending transaction on the current pool state. It will return a dictionary with these future state values.