Degen Code

Degen Code

Share this post

Degen Code
Degen Code
Project: Uniswap Router Backrunner

Project: Uniswap Router Backrunner

Mempool Transaction Prediction Wrap-up (Part LXIX)

Aug 12, 2023
∙ Paid
12

Share this post

Degen Code
Degen Code
Project: Uniswap Router Backrunner
1
Share

Building on the efforts of the Uniswap Transaction Prediction series (beginning at Part I, ending at Part VIII), this project demonstrates how to perform WETH cycle backrun arbitrage of mempool transactions to Uniswap routers.

There are several topics I want to cover:

  • Vyper contract performance improvements

  • Ape Framework

  • Async web3py

  • Foundry

  • Closed-pool searching (MEVblocker)

  • Wallet watching

  • Liquidations (Aave and others)

  • Local EVM simulation using pyrevm (and others)

  • Replaying transactions with sothis

  • Data extraction with cryo

  • Historical data recording with TimescaleDB

  • Visualization with Grafana

  • Distributed task queues with Celery

  • External API development with FastAPI

But I couldn’t cover any of these without finishing our exploration of transaction prediction and building a mempool-aware bot.

So…

LFG 🚀

New Features & Improvements

There are several pieces to highlight.

Replace web3-flashbots Middleware

The web3py flashbots middleware was a useful tool when we were learning, but it has several limitations.

  • It can only be used “out of the box” with one relay. If you want to connect to two relays, you must create two web3 objects and inject the middleware into each of them. That’s fine enough, but it’s a very heavy approach.

  • It is slow and consists of synchronous code that cannot be easily modified. It uses the requests module for interaction with the relay endpoint, which is robust but not performant when used in async code.

  • It tightly couples the code to the internals of web3.py and obscures the HTTP calls being used. It is difficult to extend to relays that are not call-for-call compatible with Flashbotrs.

To that end, I’ve removed the middleware and replaced it with lightweight implementations that call endpoints eth_callBundle (for simulation) and eth_sendBundle (for sending) via the aiohttp module for async HTTP.

A key advantage of this module is that I can easily use it with multiple Flashbots-compatible endpoints. The example uses Flashbots and Builder0x69. There are many more, and the reader should modify the list of relays (and the associated HTTP calls and headers) to integrate with those services.

Access Lists

This project integrates an access list generator that will use the eth_createAccessList endpoint of your geth (or compatible) node.

In testing I’ve found that the generated access list occasionally comes back much lower than expected. Profitability depends heavily on gas use, so I’ve included an “emergency relief” check that will abort the bundle if the final simulated gas usage exceeds the estimate.

Process Pool Calculation

The example uses a process pool executor to calculate arbitrage values across multiple processes. For high-scale bots this is a big performance improvement, assuming you have a capable machine with extra processing power. It will not help you on small VPS instances, for example.

Stateless Arbitrage Helper Calculation

I’ve been experimenting with stateless calculations within the UniswapLpCycle class. I’ve come to dislike the best attribute. It is a simple dictionary within each helper that stores the most recent arbitrage calculation results. It makes sense to store these values for an on-chain bot that may observe an arbitrage opportunity across several blocks and only broadcast the transaction when gas is appropriate. But for a transient mempool bot, there are several possible pool states associated with multiple transactions.

When using a process pool for calculation, you must maintain a mental model that keeps track of “which object am I using?” when interacting and passing data around. When I explored using the pickle module for liquidity snapshotting, I quickly realized that doing an arbitrage calculation in a separate process could get messy. Since the pickled/unpickled object in the side process is unrelated to the original helper object, it makes no sense to store the arbitrage calculation results in the best attribute, since that object gets garbage collected after the calculation is complete. My first hacky workaround was the calculate_arbitrage_return_best function, which just returned a copy of the best dictionary so that the results could be passed across the pickle module.

That was just for an on-chain bot that only needed to maintain results for a single pool state. But with a mempool bot, there are multiple pool states that might occur, and maintaining references to all of these within a single object is just a bookkeeping nightmare.

So I’ve taken a different approach that solves both challenges. Rather than attempting to store results within the helper, I’ve written a new “internal” method called _calculate that does the familiar calculation, but packages the results into a new data class called ArbitrageCalculationResult.

This data class is defined as:

@dataclasses.dataclass(slots=True, frozen=True)
class ArbitrageCalculationResult:
    id: str
    input_token: Erc20Token
    profit_token: Erc20Token
    input_amount: int
    profit_amount: int
    swap_amounts: List

The advantage of this approach is that I can extend the class as needed if more values are needed later.

The class has everything I need to evaluate the profitability of this particular arbitrage, and then to generate the payloads associated with it, without needing to store anything in the helper.

If you’re still using the calculate_arbitrage method, that’s fine. It still works without issue, and in fact defers its analysis to that same _calculate method. It’s now a wrapper that unpacks the ArbitrageCalculationResult at the end and stores the values in best.

I will be transitioning to this stateless approach going forward, but will leave these wrappers in place to avoid breaking any bots built on the old code.

How Do I Use The Results?

The project takes this approach (pseudo-code):

Get arbitrage result with an overridden pool state from helper
Evaluate the result for profitability
If profitable:
    Identify arbitrage helper from ID attribute
    Generate payload by sending swap_amounts back to arbitrage helper
    Broadcast transaction

Multi-Arbitrage Processing

A mempool transaction may pass through several pools. It is simple to identify arbitrage paths that align with these pools (see Part VII for the discussion on using Python’s set for this task).

After identifying all possible arbitrage paths that cross the pools in that transaction, you can evaluate their profitability at the new pool states.

Once you have that, what’s next?

It’s tempting to just sort by profit and execute the first one, but that is a simplistic approach that leaves money on the table.

Consider a swap with USDC → WETH → WBTC. This crosses two pools (USDC/WETH, and WETH/WBTC). Therefore any arbitrage path that uses either of these pools is eligible for analysis.

However, some arbitrage paths may only use one of the pools. Consider a 2pool arbitrage between the USDC/WETH pool at Uniswap and Sushiswap. It doesn’t care about the WETH/WBTC leg at all. And similarly, a 2pool arbitrage between the WETH/WBTC pools does not care about the USDC/WETH leg.

So you can generate two separate arbitrage transactions from this mempool swap. The challenge is sorting them, but again Python’s set makes this simple.

I won’t get too deep into the code here, but here is the pseudo-code that the bot uses to identify multiple arbitrage opportunities from a multi-pool transaction:

Calculate future pool states from mempool transaction
Identify all arbitrage paths that share a pool
Calculate profitability for all paths
Sort paths by gross profit, discarding profit < some threshold
While loop:
    Select/save the highest profit path from all paths
    Discard paths that share a pool with the selected arb
    [repeat]
Generate and broadcast transaction(s) for non-overlapping paths

An improvement for this technique is to package the multiple arbitrage transactions into a single bundle instead of generating multiple bundles. Smart readers can do this with little trouble.

New Pool Events

It’s common to observe new pools being created by the various Uniswap factories. The event watcher will now generate a pool helper for these newly-generated pools inside the process_new_v2_pool_event and process_new_v3_pool_event functions.

It doesn’t do much with them afterwards, but will set us up for interesting stuff later.

A future improvement will be an generator that can “hot load” an arbitrage helper when a new pool is created, so a bot could scale up automatically instead of needing restarts to add new paths.

Smart Contract — Flexible V3 Callback

The executor smart contract now reads the factory address in the uniswapV3SwapCallback function, and can verify pool addresses for pools generated by the Uniswap and Sushiswap factories.

Smart Contract — Balance Check

The executor smart contract implements a balance check in execute_payloads that reverts if the final WETH balance is reduced after all payloads are delivered. I introduced this feature HERE.

Source Code

Pool Fetcher

ethereum_lp_fetcher_uniswapv2_json.py

import brownie
import sys
import os
import json

BROWNIE_NETWORK = "mainnet-local"
os.environ["ETHERSCAN_TOKEN"] = EDITME

# maximum blocks to process with getLogs
BLOCK_SPAN = 1_000

try:
    brownie.network.connect(BROWNIE_NETWORK)
except:
    sys.exit("Could not connect!")

exchanges = [
    {
        "name": "SushiSwap",
        "filename": "ethereum_lps_sushiswapv2.json",
        "factory_address": "0xC0AEe478e3658e2610c5F7A4A2E1777cE9e4f2Ac",
        "factory_deployment_block": 10_794_229,
        "pool_type": "SushiswapV2",
    },
    {
        "name": "Uniswap V2",
        "filename": "ethereum_lps_uniswapv2.json",
        "factory_address": "0x5C69bEe701ef814a2B6a3EDD4B1652CB9cc5aA6f",
        "factory_deployment_block": 10_000_835,
        "pool_type": "UniswapV2",
    },
]

current_block = brownie.chain.height

for name, factory_address, filename, deployment_block, pool_type in [
    (
        exchange["name"],
        exchange["factory_address"],
        exchange["filename"],
        exchange["factory_deployment_block"],
        exchange["pool_type"],
    )
    for exchange in exchanges
]:
    print(f"DEX: {name}")

    try:
        factory_contract = brownie.Contract(factory_address)
    except:
        try:
            factory_contract = brownie.Contract.from_explorer(factory_address)
        except:
            factory_contract = None
    finally:
        if factory_contract is None:
            sys.exit("FACTORY COULD NOT BE LOADED")

    try:
        with open(filename) as file:
            lp_data = json.load(file)
    except FileNotFoundError:
        lp_data = []

    if lp_data:
        previous_pool_count = len(lp_data)
        print(f"Found previously-fetched data: {previous_pool_count} pools")
        previous_block = lp_data[-1].get("block_number")
        print(f"Found pool data up to block {previous_block}")
    else:
        previous_pool_count = 0
        previous_block = deployment_block

    for i in range(previous_block + 1, current_block + 1, BLOCK_SPAN):
        if i + BLOCK_SPAN > current_block:
            end_block = current_block
        else:
            end_block = i + BLOCK_SPAN

        if pool_created_events := factory_contract.events.PairCreated.getLogs(
            fromBlock=i, toBlock=end_block
        ):
            for event in pool_created_events:
                lp_data.append(
                    {
                        "pool_address": event.args.get("pair"),
                        "token0": event.args.get("token0"),
                        "token1": event.args.get("token1"),
                        "block_number": event.get("blockNumber"),
                        "pool_id": event.args.get(""),
                        "type": pool_type,
                    }
                )
        with open(filename, "w") as file:
            json.dump(lp_data, file, indent=2)

    print(
        f"Saved {len(lp_data)} pools ({len(lp_data) - previous_pool_count} new)"
    )

ethereum_lp_fetcher_uniswapv3_json.py

import brownie
import sys
import os
import json

BROWNIE_NETWORK = "mainnet-local"
os.environ["ETHERSCAN_TOKEN"] = EDITME

# maximum blocks to process with getLogs
BLOCK_SPAN = 5_000

try:
    brownie.network.connect(BROWNIE_NETWORK)
except:
    sys.exit("Could not connect!")

exchanges = [
    {
        "name": "Uniswap V3",
        "filename": "ethereum_lps_uniswapv3.json",
        "factory_address": "0x1F98431c8aD98523631AE4a59f267346ea31F984",
        "factory_deployment_block": 12_369_621,
        "pool_type": "UniswapV3",
    },
    {
        "name": "Sushiswap V3",
        "filename": "ethereum_lps_sushiswapv3.json",
        "factory_address": "0xbACEB8eC6b9355Dfc0269C18bac9d6E2Bdc29C4F",
        "factory_deployment_block": 16_955_547,
        "pool_type": "SushiswapV3",
    },
]


current_block = brownie.chain.height

for name, factory_address, filename, deployment_block, pool_type in [
    (
        exchange["name"],
        exchange["factory_address"],
        exchange["filename"],
        exchange["factory_deployment_block"],
        exchange["pool_type"],
    )
    for exchange in exchanges
]:
    print(f"DEX: {name}")

    try:
        factory = brownie.Contract(factory_address)
    except:
        try:
            factory = brownie.Contract.from_explorer(factory_address)
        except:
            factory = None
    finally:
        if factory is None:
            sys.exit("FACTORY COULD NOT BE LOADED")

    try:
        with open(filename) as file:
            lp_data = json.load(file)
    except FileNotFoundError:
        lp_data = []

    if lp_data:
        previous_pool_count = len(lp_data)
        print(f"Found previously-fetched data: {previous_pool_count} pools")
        previous_block = lp_data[-1].get("block_number")
        print(f"Found pool data up to block {previous_block}")
    else:
        previous_pool_count = 0
        previous_block = deployment_block

    factory_contract = brownie.web3.eth.contract(
        address=factory.address, abi=factory.abi
    )

    for i in range(previous_block + 1, current_block + 1, BLOCK_SPAN):
        if i + BLOCK_SPAN > current_block:
            end_block = current_block
        else:
            end_block = i + BLOCK_SPAN

        if pool_created_events := factory_contract.events.PoolCreated.getLogs(
            fromBlock=i, toBlock=end_block
        ):
            for event in pool_created_events:
                lp_data.append(
                    {
                        "pool_address": event.args.pool,
                        "fee": event.args.fee,
                        "token0": event.args.token0,
                        "token1": event.args.token1,
                        "block_number": event.blockNumber,
                        "type": pool_type,
                    }
                )
        with open(filename, "w") as file:
            json.dump(lp_data, file, indent=2)

    print(
        f"Saved {len(lp_data)} pools ({len(lp_data) - previous_pool_count} new)"
    )

Liquidity Fetcher

I’ve included an improvement here that uses a single mocked V3LiquidityPool to do the external updates instead of building a new object each time.

ethereum_uniswapv3_liquidity_events_fetcher.py

import json
import sys
from threading import Lock
from typing import Dict

import brownie
from web3._utils.events import get_event_data
from web3._utils.filters import construct_event_filter_params

import degenbot as bot
from degenbot.uniswap.v3.v3_liquidity_pool import (
    UniswapV3BitmapAtWord,
    UniswapV3LiquidityAtTick,
    UniswapV3PoolExternalUpdate,
)

BROWNIE_NETWORK = "mainnet-local"
SNAPSHOT_FILENAME = "ethereum_v3_liquidity_snapshot.json"
UNISWAPV3_START_BLOCK = 12_369_621

TICKSPACING_BY_FEE: Dict = {
    100: 1,
    500: 10,
    3000: 60,
    10000: 200,
}


class MockV3LiquidityPool(bot.V3LiquidityPool):
    def __init__(self):
        pass


def prime_pools():
    print("Starting pool primer")

    liquidity_snapshot: Dict[str, Dict] = {}

    lp_data: Dict[str, Dict] = {}

    for path in [
        "ethereum_lps_sushiswapv3.json",
        "ethereum_lps_uniswapv3.json",
    ]:
        try:
            with open(path, "r") as file:
                l = json.load(file)
            for lp in l:
                lp_data[lp["pool_address"]] = lp
        except Exception as e:
            print(e)

    try:
        with open(SNAPSHOT_FILENAME, "r") as file:
            json_liquidity_snapshot = json.load(file)
    except:
        snapshot_last_block = None
    else:
        snapshot_last_block = json_liquidity_snapshot.pop("snapshot_block")
        print(
            f"Loaded LP snapshot: {len(json_liquidity_snapshot)} pools @ block {snapshot_last_block}"
        )

        assert (
            snapshot_last_block < newest_block
        ), f"Aborting, snapshot block ({snapshot_last_block}) is newer than current chain height ({newest_block})"

        # Transform the JSON-encoded info from the snapshot to the dataclass
        # used by V3LiquidityPool
        for pool_address, snapshot in json_liquidity_snapshot.items():
            liquidity_snapshot[pool_address] = {
                "tick_bitmap": {
                    int(k): UniswapV3BitmapAtWord(**v)
                    for k, v in snapshot["tick_bitmap"].items()
                },
                "tick_data": {
                    int(k): UniswapV3LiquidityAtTick(**v)
                    for k, v in snapshot["tick_data"].items()
                },
            }

    V3LP = brownie.web3.eth.contract(
        abi=bot.uniswap.v3.abi.UNISWAP_V3_POOL_ABI
    )

    liquidity_events = {}

    for event in [V3LP.events.Mint, V3LP.events.Burn]:
        print(f"processing {event.event_name} events")

        start_block = (
            max(UNISWAPV3_START_BLOCK, snapshot_last_block + 1)
            if snapshot_last_block is not None
            else UNISWAPV3_START_BLOCK
        )
        block_span = 10_000
        done = False

        event_abi = event._get_event_abi()

        while not done:
            end_block = min(newest_block, start_block + block_span)

            _, event_filter_params = construct_event_filter_params(
                event_abi=event_abi,
                abi_codec=brownie.web3.codec,
                argument_filters={},
                fromBlock=start_block,
                toBlock=end_block,
            )

            try:
                event_logs = brownie.web3.eth.get_logs(event_filter_params)
            except:
                block_span = int(0.75 * block_span)
                continue

            for event in event_logs:
                decoded_event = get_event_data(
                    brownie.web3.codec, event_abi, event
                )

                pool_address = decoded_event["address"]
                block = decoded_event["blockNumber"]
                tx_index = decoded_event["transactionIndex"]
                liquidity = decoded_event["args"]["amount"] * (
                    -1 if decoded_event["event"] == "Burn" else 1
                )
                tick_lower = decoded_event["args"]["tickLower"]
                tick_upper = decoded_event["args"]["tickUpper"]

                # skip zero liquidity events
                if liquidity == 0:
                    continue

                try:
                    liquidity_events[pool_address]
                except KeyError:
                    liquidity_events[pool_address] = []

                liquidity_events[pool_address].append(
                    (
                        block,
                        tx_index,
                        (
                            liquidity,
                            tick_lower,
                            tick_upper,
                        ),
                    )
                )

            print(f"Fetched events: block span [{start_block},{end_block}]")

            if end_block == newest_block:
                done = True
            else:
                start_block = end_block + 1
                block_span = int(1.05 * block_span)

    lp_helper = MockV3LiquidityPool()
    lp_helper.sparse_bitmap = False
    lp_helper._liquidity_lock = Lock()
    lp_helper._slot0_lock = Lock()
    lp_helper._update_log = list()

    for pool_address in liquidity_events.keys():
        # Ignore all pool addresses not held in the LP data dict. A strange
        # pool (0x820e891b14149e98b48b39ee2667157Ef750539b) was triggering an
        # early termination because it had liquidity events, but was not
        # associated with the known factories.
        if not lp_data.get(pool_address):
            continue

        try:
            previous_snapshot_tick_data = liquidity_snapshot[pool_address][
                "tick_data"
            ]
        except KeyError:
            previous_snapshot_tick_data = {}

        try:
            previous_snapshot_tick_bitmap = liquidity_snapshot[pool_address][
                "tick_bitmap"
            ]
        except KeyError:
            previous_snapshot_tick_bitmap = {}

        lp_helper.address = "0x0000000000000000000000000000000000000000"
        lp_helper.liquidity = 1 << 256
        lp_helper.tick_data = previous_snapshot_tick_data
        lp_helper.tick_bitmap = previous_snapshot_tick_bitmap
        lp_helper.update_block = snapshot_last_block or UNISWAPV3_START_BLOCK
        lp_helper.liquidity_update_block = (
            snapshot_last_block or UNISWAPV3_START_BLOCK
        )
        lp_helper.tick = 0
        lp_helper.fee = lp_data[pool_address]["fee"]
        lp_helper.tick_spacing = TICKSPACING_BY_FEE[lp_helper.fee]

        sorted_liquidity_events = sorted(
            liquidity_events[pool_address],
            key=lambda event: (event[0], event[1]),
        )

        for liquidity_event in sorted_liquidity_events:
            (
                event_block,
                _,
                (liquidity_delta, tick_lower, tick_upper),
            ) = liquidity_event

            # Push the liquidity events into the mock helper
            lp_helper.external_update(
                update=UniswapV3PoolExternalUpdate(
                    block_number=event_block,
                    liquidity_change=(
                        liquidity_delta,
                        tick_lower,
                        tick_upper,
                    ),
                ),
            )

        # After all events have been pushed, update the liquidity snapshot with
        # the full liquidity data from the helper
        try:
            liquidity_snapshot[pool_address]
        except KeyError:
            liquidity_snapshot[pool_address] = {
                "tick_bitmap": {},
                "tick_data": {},
            }

        liquidity_snapshot[pool_address]["tick_bitmap"].update(
            lp_helper.tick_bitmap
        )
        liquidity_snapshot[pool_address]["tick_data"].update(
            lp_helper.tick_data
        )

    for pool_address in liquidity_snapshot:
        # Convert all liquidity data to JSON format so it can be exported
        liquidity_snapshot[pool_address] = {
            "tick_data": {
                key: value.to_dict()
                for key, value in liquidity_snapshot[pool_address][
                    "tick_data"
                ].items()
            },
            "tick_bitmap": {
                key: value.to_dict()
                for key, value in liquidity_snapshot[pool_address][
                    "tick_bitmap"
                ].items()
                if value.bitmap  # skip empty bitmaps
            },
        }

    liquidity_snapshot["snapshot_block"] = newest_block

    with open(SNAPSHOT_FILENAME, "w") as file:
        json.dump(
            liquidity_snapshot,
            file,
            indent=2,
            sort_keys=True,
        )
        print("Wrote LP snapshot")


try:
    brownie.network.connect(BROWNIE_NETWORK)
except:
    sys.exit(
        "Could not connect! Verify your Brownie network settings using 'brownie networks list'"
    )

newest_block = brownie.chain.height

if __name__ == "__main__":
    prime_pools()
    print("Complete")

Pool Parser

ethereum_parser_2pool.py

import json
import web3
import networkx as nx
import itertools

WETH_ADDRESS = "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"

BLACKLISTED_TOKENS = [
    # add addresses here if you want to exclude a token from consideration during pathfinding
    # e.g.
    # "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48",  # USDC
    # "0xdAC17F958D2ee523a2206206994597C13D831ec7",  # USDT
]

sushiv2_lp_data = {}
univ2_lp_data = {}
sushiv3_lp_data = {}
univ3_lp_data = {}

with open("ethereum_lps_sushiswapv2.json") as file:
    for pool in json.load(file):
        sushiv2_lp_data[pool.get("pool_address")] = {
            key: value for key, value in pool.items() if key not in ["pool_id"]
        }
print(f"Found {len(sushiv2_lp_data)} Sushiswap V2 pools")

with open("ethereum_lps_uniswapv2.json") as file:
    for pool in json.load(file):
        univ2_lp_data[pool.get("pool_address")] = {
            key: value for key, value in pool.items() if key not in ["pool_id"]
        }
print(f"Found {len(univ2_lp_data)} Uniswap V2 pools")


with open("ethereum_lps_sushiswapv3.json") as file:
    for pool in json.load(file):
        sushiv3_lp_data[pool.get("pool_address")] = {
            key: value
            for key, value in pool.items()
            if key not in ["block_number"]
        }
print(f"Found {len(sushiv3_lp_data)} Sushiswap V3 pools")


with open("ethereum_lps_uniswapv3.json") as file:
    for pool in json.load(file):
        univ3_lp_data[pool.get("pool_address")] = {
            key: value
            for key, value in pool.items()
            if key not in ["block_number"]
        }
print(f"Found {len(univ3_lp_data)} Uniswap V3 pools")


# build the graph with tokens as nodes, adding an edge
# between any two tokens held by a liquidity pool
G = nx.MultiGraph()
for pool in univ2_lp_data.values():
    G.add_edge(
        pool.get("token0"),
        pool.get("token1"),
        lp_address=pool.get("pool_address"),
        pool_type="UniswapV2",
    )

for pool in sushiv2_lp_data.values():
    G.add_edge(
        pool.get("token0"),
        pool.get("token1"),
        lp_address=pool.get("pool_address"),
        pool_type="SushiswapV2",
    )

for pool in univ3_lp_data.values():
    G.add_edge(
        pool.get("token0"),
        pool.get("token1"),
        lp_address=pool.get("pool_address"),
        pool_type="UniswapV3",
    )


for pool in sushiv3_lp_data.values():
    G.add_edge(
        pool.get("token0"),
        pool.get("token1"),
        lp_address=pool.get("pool_address"),
        pool_type="SushiswapV3",
    )

# delete nodes for blacklisted tokens
G.remove_nodes_from(BLACKLISTED_TOKENS)

print(f"G ready: {len(G.nodes)} nodes, {len(G.edges)} edges")

all_tokens_with_weth_pool = list(G.neighbors(WETH_ADDRESS))
print(f"Found {len(all_tokens_with_weth_pool)} tokens with a WETH pair")

print("*** Finding two-pool arbitrage paths ***")
two_pool_arb_paths = {}

for token in all_tokens_with_weth_pool:
    pools = G.get_edge_data(token, WETH_ADDRESS).values()

    # skip tokens with only one pool
    if len(pools) < 2:
        continue

    for pool_a, pool_b in itertools.permutations(pools, 2):
        pool_a_address = pool_a["lp_address"]
        pool_b_address = pool_b["lp_address"]
        pool_a_type = pool_a["pool_type"]
        pool_b_type = pool_b["pool_type"]

        if pool_a_type == "UniswapV2":
            pool_a_dict = univ2_lp_data[pool_a_address]
        elif pool_a_type == "SushiswapV2":
            pool_a_dict = sushiv2_lp_data[pool_a_address]
        elif pool_a_type == "UniswapV3":
            pool_a_dict = univ3_lp_data[pool_a_address]
        elif pool_a_type == "SushiswapV3":
            pool_a_dict = sushiv3_lp_data[pool_a_address]
        else:
            raise Exception(f"could not identify pool {pool_a}")

        if pool_b_type == "UniswapV2":
            pool_b_dict = univ2_lp_data[pool_b_address]
        elif pool_b_type == "SushiswapV2":
            pool_b_dict = sushiv2_lp_data[pool_b_address]
        elif pool_b_type == "UniswapV3":
            pool_b_dict = univ3_lp_data[pool_b_address]
        elif pool_b_type == "SushiswapV3":
            pool_b_dict = sushiv3_lp_data[pool_b_address]
        else:
            raise Exception(f"could not identify pool {pool_b}")

        two_pool_arb_paths[id] = {
            "id": (
                id := web3.Web3.keccak(
                    hexstr="".join(
                        [
                            pool_a.get("lp_address")[2:],
                            pool_b.get("lp_address")[2:],
                        ]
                    )
                ).hex()
            ),
            "pools": {
                pool_a.get("lp_address"): pool_a_dict,
                pool_b.get("lp_address"): pool_b_dict,
            },
            "arb_types": ["cycle", "flash_borrow_lp_swap"],
            "path": [pool.get("lp_address") for pool in [pool_a, pool_b]],
        }
print(f"Found {len(two_pool_arb_paths)} unique two-pool arbitrage paths")

print("• Saving arb paths to JSON")
with open("ethereum_arbs_2pool.json", "w") as file:
    json.dump(two_pool_arb_paths, file, indent=2)

ethereum_parser_3pool.py

import json
import web3
import networkx as nx
import itertools
import time

WETH_ADDRESS = "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"

start_timer = time.perf_counter()

BLACKLISTED_TOKENS = [
    # "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48",  # USDC
    # "0xdAC17F958D2ee523a2206206994597C13D831ec7",  # USDT
]

v2_lp_data = {}
for filename in [
    "ethereum_lps_sushiswapv2.json",
    "ethereum_lps_uniswapv2.json",
]:
    with open(filename) as file:
        for pool in json.load(file):
            v2_lp_data[pool.get("pool_address")] = {
                key: value
                for key, value in pool.items()
                if key not in ["pool_id"]
            }
print(f"Found {len(v2_lp_data)} V2 pools")

v3_lp_data = {}
for filename in [
    "ethereum_lps_sushiswapv3.json",
    "ethereum_lps_uniswapv3.json",
]:
    with open(filename) as file:
        for pool in json.load(file):
            v3_lp_data[pool.get("pool_address")] = {
                key: value
                for key, value in pool.items()
                if key not in ["block_number"]
            }
print(f"Found {len(v3_lp_data)} V3 pools")

all_v2_pools = set(v2_lp_data.keys())
all_v3_pools = set(v3_lp_data.keys())

all_tokens = set(
    [lp.get("token0") for lp in v2_lp_data.values()]
    + [lp.get("token1") for lp in v2_lp_data.values()]
    + [lp.get("token0") for lp in v3_lp_data.values()]
    + [lp.get("token1") for lp in v3_lp_data.values()]
)


# build the graph with tokens as nodes, adding an edge
# between any two tokens held by a liquidity pool
G = nx.MultiGraph()
for pool in v2_lp_data.values():
    G.add_edge(
        pool.get("token0"),
        pool.get("token1"),
        lp_address=pool.get("pool_address"),
        pool_type="UniswapV2",
    )

for pool in v3_lp_data.values():
    G.add_edge(
        pool.get("token0"),
        pool.get("token1"),
        lp_address=pool.get("pool_address"),
        pool_type="UniswapV3",
    )

# delete nodes for blacklisted tokens
G.remove_nodes_from(BLACKLISTED_TOKENS)

print(f"G ready: {len(G.nodes)} nodes, {len(G.edges)} edges")

all_tokens_with_weth_pool = list(G.neighbors(WETH_ADDRESS))
print(f"Found {len(all_tokens_with_weth_pool)} tokens with a WETH pair")

print("*** Finding triangular arbitrage paths ***")
triangle_arb_paths = {}

# only consider tokens with degree > 1 (number of pools holding the token)
filtered_tokens = [
    token for token in all_tokens_with_weth_pool if G.degree(token) > 1
]
print(f"Processing {len(filtered_tokens)} tokens with degree > 1")

# loop through all possible token pair
for token_a, token_b in itertools.combinations(filtered_tokens, 2):
    # find tokenA/tokenB pools, skip if a tokenA/tokenB pool is not found
    if not G.get_edge_data(token_a, token_b):
        continue

    inside_pools = [
        edge.get("lp_address")
        for edge in G.get_edge_data(token_a, token_b).values()
    ]

    # find tokenA/WETH pools
    outside_pools_tokenA = [
        edge.get("lp_address")
        for edge in G.get_edge_data(token_a, WETH_ADDRESS).values()
    ]

    # find tokenB/WETH pools
    outside_pools_tokenB = [
        edge.get("lp_address")
        for edge in G.get_edge_data(token_b, WETH_ADDRESS).values()
    ]

    # find all triangular arbitrage paths of form:
    # tokenA/WETH -> tokenA/tokenB -> tokenB/WETH
    for pool_addresses in itertools.product(
        outside_pools_tokenA, inside_pools, outside_pools_tokenB
    ):
        pool_data = {}
        for pool_address in pool_addresses:
            if pool_address in all_v2_pools:
                pool_info = {
                    pool_address: {
                        key: value
                        for key, value in v2_lp_data.get(pool_address).items()
                    }
                }
            elif pool_address in all_v3_pools:
                pool_info = {
                    pool_address: {
                        key: value
                        for key, value in v3_lp_data.get(pool_address).items()
                    }
                }
            else:
                raise Exception
            pool_data.update(pool_info)

        triangle_arb_paths[id] = {
            "id": (
                id := web3.Web3.keccak(
                    hexstr="".join(
                        [pool_address[2:] for pool_address in pool_addresses]
                    )
                ).hex()
            ),
            "path": pool_addresses,
            "pools": pool_data,
        }

    # find all triangular arbitrage paths of form:
    # tokenB/WETH -> tokenA/tokenB -> tokenA/WETH
    for pool_addresses in itertools.product(
        outside_pools_tokenB, inside_pools, outside_pools_tokenA
    ):
        pool_data = {}
        for pool_address in pool_addresses:
            if pool_address in all_v2_pools:
                pool_info = {
                    pool_address: {
                        key: value
                        for key, value in v2_lp_data.get(pool_address).items()
                    }
                }
            elif pool_address in all_v3_pools:
                pool_info = {
                    pool_address: {
                        key: value
                        for key, value in v3_lp_data.get(pool_address).items()
                    }
                }
            else:
                raise Exception
            pool_data.update(pool_info)

        triangle_arb_paths[id] = {
            "id": (
                id := web3.Web3.keccak(
                    hexstr="".join(
                        [pool_address[2:] for pool_address in pool_addresses]
                    )
                ).hex()
            ),
            "path": pool_addresses,
            "pools": pool_data,
        }

print(
    f"Found {len(triangle_arb_paths)} triangle arb paths in {time.perf_counter() - start_timer:.1f}s"
)

print("• Saving arb paths to JSON")
with open("ethereum_arbs_3pool.json", "w") as file:
    json.dump(triangle_arb_paths, file, indent=2)

Smart Contract

ethereum_executor_v3_balancecheck.vy

# @version ^0.3

from vyper.interfaces import ERC20 as IERC20

interface IWETH:    
    def deposit(): payable

interface IUniswapV2Pool:
    def factory() -> address: view
    def swap(
        amount0Out: uint256, 
        amount1Out: uint256, 
        to: address, 
        data: Bytes[1024]
        ): nonpayable
    def token0() -> address: view
    def token1() -> address: view

interface IUniswapV3Pool:
    def factory() -> address: view
    def fee() -> uint24: view
    def tickSpacing() -> int24: view
    def token0() -> address: view
    def token1() -> address: view
    def maxLiquidityPerTick() -> uint128: view
    def swap(
        recipient: address, 
        zeroForOne: bool, 
        amountSpecified: int256, 
        sqrtPriceLimitX96: uint160, 
        data: Bytes[32]
        ) -> (int256, int256): nonpayable

OWNER_ADDR: immutable(address)
WETH_ADDR: constant(address) = 0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2
POOL_INIT_CODE_HASH: constant(bytes32) = 0xe34f199b19b2b4f47f68442619d555527d244f78a3297ea89325f843f87b8b54

MAX_PAYLOADS: constant(uint256) = 16
MAX_PAYLOAD_BYTES: constant(uint256) = 1024

struct payload:
    target: address
    calldata: Bytes[MAX_PAYLOAD_BYTES]
    value: uint256


@external
@payable
def __init__():
    OWNER_ADDR = msg.sender

    # wrap initial Ether to WETH
    if msg.value > 0:
        IWETH(WETH_ADDR).deposit(value=msg.value)


@external
@payable
def execute_payloads(
    payloads: DynArray[payload, MAX_PAYLOADS],
    balance_check: bool = True,
):
    assert msg.sender == OWNER_ADDR, "!OWNER"

    weth_balance_before: uint256 = empty(uint256)

    if balance_check:
        weth_balance_before = IERC20(WETH_ADDR).balanceOf(self)

    for _payload in payloads:
        raw_call(
            _payload.target,
            _payload.calldata,
            value=_payload.value,
        )

    if balance_check:
        assert IERC20(WETH_ADDR).balanceOf(self) > weth_balance_before, "WETH BALANCE REDUCTION"


@external
@payable
def uniswapV3SwapCallback(
    amount0_delta: int256,
    amount1_delta: int256,
    data: Bytes[32]
):
    # reject callbacks that did not originate from the owner's EOA
    assert tx.origin == OWNER_ADDR, "!OWNER"

    assert amount0_delta > 0 or amount1_delta > 0, "REJECTED 0 LIQUIDITY SWAP"

    # get the token0/token1 addresses and fee reported by msg.sender
    factory: address = IUniswapV3Pool(msg.sender).factory()   
    token0: address = IUniswapV3Pool(msg.sender).token0()
    token1: address = IUniswapV3Pool(msg.sender).token1()
    fee: uint24 = IUniswapV3Pool(msg.sender).fee()

    assert msg.sender == convert(
        slice(
            keccak256(
                concat(
                    0xFF,
                    convert(factory,bytes20),
                    keccak256(_abi_encode(token0, token1, fee)),
                    POOL_INIT_CODE_HASH,
                )
            ),
            12,
            20,
        ),
        address
    ), "INVALID V3 LP ADDRESS"

    # repay token back to pool
    if amount0_delta > 0:
        IERC20(token0).transfer(msg.sender,convert(amount0_delta, uint256))
    else:
        IERC20(token1).transfer(msg.sender,convert(amount1_delta, uint256))


@external
@payable
def __default__():
    # accept basic Ether transfers to the contract with no calldata
    if len(msg.data) == 0:
        return
    # revert on all other calls
    else:
        raise

Mempool Backrun Bot

ethereum_flashbots_backrun.py

This post is for paid subscribers

Already a paid subscriber? Sign in
© 2025 BowTiedDevil
Privacy ∙ Terms ∙ Collection notice
Start writingGet the app
Substack is the home for great culture

Share