Degen Code

Degen Code

Share this post

Degen Code
Degen Code
Uniswap — Transaction Prediction (Part I)
Copy link
Facebook
Email
Notes
More

Uniswap — Transaction Prediction (Part I)

Building Our Mempool Life Raft

Jan 28, 2023
∙ Paid
10

Share this post

Degen Code
Degen Code
Uniswap — Transaction Prediction (Part I)
Copy link
Facebook
Email
Notes
More
Share

My V2 mempool exploration was both fun and informative, but I was never a big fan of doing so much work inside the watch_pending_transactions function.

Some objections:

  • Extreme verbosity — many lines of code.

  • Not portable — differences among the various DEX code bases made it necessary to handle several function names and token types inside a block of code. Adding V3 functions was even worse, requiring a completely new block of dense code.

  • Monolithic — the watch_pending_transactions function should have a single concern (watching pending transactions, naturally). Giving it the related job of decoding and simulating the effect of observed mempool transactions was too much.

So with V3 I’m going to do it right!

First, let’s talk about what I want to do, then we’ll get around to doing it.

Design Improvements

Instead of asking the execution worker (the bot) to handle the task of simulating and predicting mempool transactions, I will instead shift the task to the liquidity pool helpers. I made a similar choice with the recent UniswapLpCycle helper, which wrapped the payload generation duty into the class, which really cleaned things up on the execution side.

By shifting the burden of prediction to the liquidity helper, the execution worker is now free to observe, route, and execute. Whenever a new mempool transaction is observed, it can be quickly identified as interesting, then added to a queue for further processing.

General Architecture

The helper we design should have a generic interface that is ideally repeatable across the generic DeFi ecosystem, and broadly applicable at worst. A few broad concepts:

  • Every transaction will have some associated state-changing effect. This usually results in a liquidity pool undergoing a set of changes in value.

  • Every transaction will encode some amounts associated with the proposed state-change. A V2 pool swap, for example, could specify an swap amount in and a minimum swap amount out. Whether this swap is possible is determined entirely by the pool state at the time of block building.

  • Every transaction will encode some associated gas values that effect how quickly it will be selected and recorded by validators / miners / searchers.

The gas values are not particularly relevant from the point of view of the liquidity pool, so we will disregard those and consider the state-changing effects only.

In essence, the question we want to answer when observing a given transaction is “what is the liquidity pool state after this transaction executes?”

To determine this information, we need a sensible way to express a given transaction in terms of its requested state-changes .

Let’s think about a simple V2 swap using the function swapExactTokensForTokens. It will be submitted with the following values:

  • uint amountIn

  • uint amountOutMin

  • address[] calldata path

  • address to

  • uint deadline

If these don’t look immediately familiar to you, read the archive!

If asked to do swap prediction, a V2 liquidity pool helper would care about amountIn and, to a lesser extent, amountOutMin. It would rely on the router to split apart the path (a list of tokens), enforce the deadline, perform the swaps, then forward the appropriate final amount onward to the to address.

So if we wanted to design interfaces to handle this event, we’d want to handle two related “packets” of information:

  • The requested swap amounts

  • The token path

For a simple swap with two tokens held by a common pool, a V2 liquidity pool helper can do it all!

But what if the swap isn’t simple, and crosses multiple pools? Now we have to consider a lot more. This complex swap must now keep track of multiple pool states.

The temptation is to extend the liquidity pool helpers to support this mechanism, but that’s a trap that will lead us into clunky helpers that do too much (same problem at managing this at the execution level, just in a different spot).

Tracking pool states across different helpers is similar in purpose to the arbitrage helper. That helper maintains a reference to all pool helpers involved along its path, maintains a copy of the state from each of those pools, and intelligently retrieves information from those pool helpers when needed to perform specialized calculations.

We can take the same approach here.

I propose creating a new helper class, tasked with analyzing the effect of an arbitrary transaction. We need to define it and give it a sensible name. Since it will monitor and predict the effect of Uniswap transactions, we’ll call it UniswapTransaction. We’ll make it V2/V3 from the start to avoid clumsy integrations later.

Beginning with the end in mind, I also propose creating a base class to derive from called Transaction. UniswapTransaction will be derived from Transaction. This generalized Transaction class can be used in the future to derive specific transaction helpers for other ecosystems (CurveTransaction, OpenseaTransaction, AaveTransaction, etc.)

State Tracking

A critical question: what state should this helper store and what state should it defer to others?

When we observe a Uniswap transaction, we care primarily about how the transaction will affect the liquidity pools along its path. So at minimum, the transaction helper should maintain a link to each pool along its path, and their current states (reserves, liquidity, price, tick, etc.)

Building the Helper

To begin, I create an abstract base class. It will be used to derive all other transaction helpers:

degenbot/transaction/base.py

from abc import ABC


class Transaction(ABC):
    pass

Then begin deriving my UniswapTransaction class:

degenbot/transaction/uniswap_transaction.py

from .base import Transaction


class UniswapTransaction(Transaction):
    pass

Now let’s consider how the typical lifetime of the helper. Whenever the execution worker observes an interesting transaction, it can instantiate one of these. We should expect that the execution worker will have all necessary information to build this helper, so the helper itself can be “dumb” and just receive the relevant information in its constructor.

A Uniswap transaction helper should be aware of the pool objects involved, a unique identifier for the transaction (its hash works well), and the parameters of the transaction itself.

from .base import Transaction

from typing import Union, List
from degenbot.uniswap.v2 import LiquidityPool
from degenbot.uniswap.v3 import V3LiquidityPool


class UniswapTransaction(Transaction):
    def __init__(
        self,
        hash: str,
        transaction_func: str,
        transaction_params: dict,
        pools: List[Union[LiquidityPool, V3LiquidityPool]],
    ):
        self.pools = pools
        self.hash = hash
        self.transaction_func = transaction_func
        self.transaction_params = transaction_params

        print(f"Identified {transaction_func} transaction with params:")
        # process the key-value pairs in the transaction dictionary
        for key, value in transaction_params.items():
            print(f" • {key} = {value}")

I’ve added some light type hinting here, but will largely skip the input validation in the constructor. After further defining the class, it will raise exceptions if pools does not contain the correct object types, the list of pools does not match the path in transaction_params, the transaction_params dictionary is missing keys, etc.

The transaction_params dict should match the dictionary format provided by web3py’s decode_function_input method. Here’s an example of a V2 swap I pulled from the pending transactions running through my node:

{
    'amountOutMin': 6800518, 
    'path': [
        '0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2', 
        '0xdAC17F958D2ee523a2206206994597C13D831ec7'
    ], 
    'to': '0xde27FA3C1dB3018ffccC78cF37BA30Ff679e2833', 
    'deadline': 1627464375530
}

Now let’s build a simple watcher script that connects to the node, waits for any transaction going to a Uniswap router, decodes it, then builds the UniswapTransaction object.

It’s extremely basic but illustrates how the execution worker can offload the transaction-specific details to the helper after it decodes the transaction.

ethereum_pending_tx_watcher.py

import asyncio
import json
import websockets
import sys
import web3
import brownie
import os

import degenbot as bot

os.environ["ETHERSCAN_TOKEN"] = "[redacted]"
WEBSOCKET_URI = "ws://localhost:8546"
w3 = web3.Web3(web3.WebsocketProvider(WEBSOCKET_URI))
brownie.network.connect("mainnet-local")


async def main():
    await asyncio.gather(asyncio.create_task(watch_events()))


async def watch_events():
    async for websocket in websockets.connect(uri=WEBSOCKET_URI):
        try:
            await websocket.send(
                json.dumps(
                    {
                        "id": 1,
                        "method": "eth_subscribe",
                        "params": ["newPendingTransactions"],
                    }
                )
            )
            result = await websocket.recv()
            print(result)

            while True:
                message = await websocket.recv()
                tx_hash = json.loads(message)["params"]["result"]

                try:
                    transaction = w3.eth.get_transaction(tx_hash)
                except Exception as e:
                    # ignore any TX that cannot be found
                    continue

                if transaction["to"] in ROUTERS.keys():
                    func_object, func_parameters = w3.eth.contract(
                        address=transaction["to"],
                        abi=ROUTERS[transaction["to"]]["abi"],
                    ).decode_function_input(transaction["input"])

                    print()
                    tx_helper = bot.transaction.UniswapTransaction(
                        hash=transaction.hash.hex(),
                        transaction_func=func_object.fn_name,
                        transaction_params=func_parameters,
                        pools=[],
                    )

                    del tx_helper

        except websockets.ConnectionClosed:
            print("reconnecting...")
            continue
        except Exception as e:
            print(e)


ROUTERS = {
    w3.toChecksumAddress(router_address): dex_info
    for dex in [
        {
            "0xd9e1cE17f2641f24aE83637ab66a2cca9C378B9F": {
                "name": "Sushiswap",
                "uniswap_version": 2,
            }
        },
        {
            "0xf164fC0Ec4E93095b804a4795bBe1e041497b92a": {
                "name": "UniswapV2: Router",
                "uniswap_version": 2,
            }
        },
        {
            "0x7a250d5630B4cF539739dF2C5dAcb4c659F2488D": {
                "name": "UniswapV2: Router 2",
                "uniswap_version": 2,
            }
        },
        {
            "0xE592427A0AEce92De3Edee1F18E0157C05861564": {
                "name": "UniswapV3: Router",
                "uniswap_version": 3,
            }
        },
        {
            "0x68b3465833fb72A70ecDF485E0e4C7bD8665Fc45": {
                "name": "UniswapV3: Router 2",
                "uniswap_version": 3,
            }
        },
    ]
    for router_address, dex_info in dex.items()
}


for router_address in ROUTERS.keys():

    factoryV2_address = None
    factoryV2_contract = None

    try:
        router_contract = brownie.Contract(router_address)
    except Exception as e:
        print(e)
        try:
            router_contract = brownie.Contract.from_explorer(
                router_address
            )
        except Exception as e:
            print(e)
    else:
        ROUTERS[router_address]["abi"] = router_contract.abi
        ROUTERS[router_address]["web3_contract"] = w3.eth.contract(
            address=router_address,
            abi=router_contract.abi,
        )

    uniswap_version = ROUTERS[router_address].get("uniswap_version")
    assert uniswap_version in [2, 3]

    if uniswap_version == 2:
        try:
            factoryV2_address = w3.toChecksumAddress(
                router_contract.factory()
            )
        except Exception as e:
            print(e)
        else:
            try:
                factoryV2_contract = brownie.Contract(factoryV2_address)
            except Exception as e:
                print(e)
                try:
                    factoryV2_contract = brownie.Contract.from_explorer(
                        factoryV2_address
                    )
                except Exception as e:
                    print(e)
        finally:
            if factoryV2_address and factoryV2_contract:
                ROUTERS[router_address][
                    "factoryV2_address"
                ] = factoryV2_address
                ROUTERS[router_address][
                    "factoryV2_contract"
                ] = factoryV2_contract
            else:
                sys.exit("FAILED TO LOAD V2 FACTORY")

    elif uniswap_version == 3:
        try:
            factoryV2_address = w3.toChecksumAddress(
                router_contract.factoryV2()
            )
        except AttributeError:
            pass
        except Exception as e:
            print(e)
        else:
            try:
                factoryV2_contract = brownie.Contract(factoryV2_address)
            except:
                factoryV2_contract = brownie.Contract.from_explorer(
                    factoryV2_address
                )
        finally:
            if factoryV2_address and factoryV2_contract:
                ROUTERS[router_address][
                    "factoryV2_address"
                ] = factoryV2_address
                ROUTERS[router_address][
                    "factoryV2_contract"
                ] = factoryV2_contract

        try:
            factoryV3_address = w3.toChecksumAddress(
                router_contract.factory()
            )
        except Exception as e:
            print(e)
        else:
            try:
                factoryV3_contract = brownie.Contract(factoryV3_address)
            except:
                factoryV3_contract = brownie.Contract.from_explorer(
                    factoryV3_address
                )
        finally:
            if factoryV3_address and factoryV3_contract:
                ROUTERS[router_address][
                    "factoryV3_address"
                ] = factoryV3_address
                ROUTERS[router_address][
                    "factoryV3_contract"
                ] = factoryV3_contract
            else:
                sys.exit("FAILED TO LOAD V3 FACTORY")


if __name__ == "__main__":

    asyncio.run(main())

NOTE: I have passed an empty pools list since I don’t intend to do any real processing (yet).

Running it for a bit, many Uniswap transactions appear:

Identified swapExactETHForTokensSupportingFeeOnTransferTokens transaction with params:
 • amountOutMin = 3371839655230
 • path = ['0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2', '0xd6EfAfe584A56a359ce8d752339f696209Fd874E']
 • to = 0x7a0C47ca5b68ef655924E0BF7088f8Dc5afc61B0
 • deadline = 1674930320

Identified swapExactETHForTokens transaction with params:
 • amountOutMin = 73973508291586
 • path = ['0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2', '0xbE5958a396115a5D6E9c9D86ca683fa3D642569E']
 • to = 0x111aCd3B56b12090fe422E8CEdC2aBE6C5b92b65
 • deadline = 1674930204

Identified swapExactETHForTokens transaction with params:
 • amountOutMin = 504889116269959214561326906
 • path = ['0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2', '0x378bF3A945891D9C7048D3615F71bA4efdA8EC72']
 • to = 0xC8CD043B79509Cc3fea214e4946cfB74a9825b45
 • deadline = 1674930505

Identified swapExactETHForTokens transaction with params:
 • amountOutMin = 100093038836420359387
 • path = ['0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2', '0xa5f2211B9b8170F694421f2046281775E8468044']
 • to = 0xABf11abcc905277D9EB43F3c8Ef12Ca81101a3a7
 • deadline = 1674931871

[...]

I want to filter this down without adding too much complexity, so I’m going to watch only transactions where the USDC token (address 0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48) and USDT token (0xdAC17F958D2ee523a2206206994597C13D831ec7) appears in the swap path. There’s a lot of activity with these tokens so it should not take long to see results.

Modify this code block slightly to include that token filter:

if transaction["to"] in ROUTERS.keys():
    func_object, func_parameters = w3.eth.contract(
        address=transaction["to"],
        abi=ROUTERS[transaction["to"]]["abi"],
    ).decode_function_input(transaction["input"])

    if func_parameters.get("path") and (
        "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"
            in func_parameters["path"]
        )
        or (
            "0xdAC17F958D2ee523a2206206994597C13D831ec7"
            in func_parameters["path"]
        )
    ):
        print()
        tx_helper = bot.transaction.UniswapTransaction(
            hash=transaction.hash.hex(),
            transaction_func=func_object.fn_name,
            transaction_params=func_parameters,
            pools=[],
        )

        del tx_helper

Now I see fewer transactions when I run the script.

Identified swapTokensForExactTokens transaction with params:
 • amountOut = 1894810597757438699773
 • amountInMax = 1781432504
 • path = ['0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48', '0x0f51bb10119727a7e5eA3538074fb341F56B09Ad']
 • to = 0x28B2399C9aB816351351851952f2981BC9fb69Aa
 • deadline = 9999999999999

Identified swapExactETHForTokens transaction with params:
 • amountOutMin = 129238599547
 • path = ['0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2', '0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48', '0x2b591e99afE9f32eAA6214f7B7629768c40Eeb39']
 • to = 0x25e7eA5986ebE98a576C124007c1454628f5bEBF
 • deadline = 1630622022

[...]

Now I’ll make an “on demand” pool loader that will fetch the V2 pool address from the factory, create a LiquidityPool helper for it, then proceed to build the transaction helper from those pools:

###### START OF ADDED CODE
tx_token_pairs = [
    pair
    for pair in itertools.pairwise(
        func_parameters["path"]
    )
]

tx_lp_objects = []

for token0, token1 in tx_token_pairs:
    print(f"{token0} → {token1}")

    # get info from the V2 factory
    lp_address = ROUTERS[transaction["to"]][
        "factoryV2_contract"
    ].getPair(token0, token1)

    # build a LiquidityPool object from the token addresses
    lp_helper = bot.uniswap.v2.LiquidityPool(
        address=lp_address, silent=True
    )
    tx_lp_objects.append(lp_helper)
###### END OF ADDED CODE

And add another code block to the helper that displays pool info:

print("Identified pools:")
for pool in pools:
    print(f" • {pool}")

Now the script is displaying a lot more stuff:

Identified swapTokensForExactTokens transaction with params:
 • amountOut = 95988860000000000000000
 • amountInMax = 172355384
 • path = ['0xdAC17F958D2ee523a2206206994597C13D831ec7', '0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2', '0x5CB3ce6D081fB00d5f6677d196f2d70010EA3f4a']
 • to = 0xfB0fce91022Ccf15f1CfC247B77047C21fC742C0
 • deadline = 1675112364
Identified pools:
 • WETH-USDT (V2, 0.30%)
 • BUSY-WETH (V2, 0.30%)

Identified swapExactTokensForTokens transaction with params:
 • amountIn = 200000000
 • amountOutMin = 97197016356944748230098992
 • path = ['0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48', '0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2', '0xcb44CDEdFC80a07BC794Ce285Dff3ACF6b1d7909']
 • to = 0x4BeA976f3FD74935bba444E58388a4F6b205DAb1
 • deadline = 1674933515
Identified pools:
 • USDC-WETH (V2, 0.30%)
 • WETH-MRSHIB (V2, 0.30%)

[...]

NOTE: 😂 at the MRSHIB token. You see the weirdest shitcoins when you’re playing around on mainnet.

Transaction Simulation

Now let’s add one more feature.

This will be limited to a single case (swapExactTokensForTokens) for sake of brevity. We will build a new method called simulate that will model the effect of the pending transaction on the current pool state. It will return a dictionary with these future state values.

This post is for paid subscribers

Already a paid subscriber? Sign in
© 2025 BowTiedDevil
Privacy ∙ Terms ∙ Collection notice
Start writingGet the app
Substack is the home for great culture

Share

Copy link
Facebook
Email
Notes
More