Degen Code

Degen Code

Share this post

Degen Code
Degen Code
Flashbots — Project Improvements: Stateful LP Fetcher, WETH Cycle Arbitrage Helper, Balance Updater, Local Gas Estimation, Relay Retry, V3 Multicall Decoder, Unified Submitter, Bundle Recorder
Copy link
Facebook
Email
Notes
More

Flashbots — Project Improvements: Stateful LP Fetcher, WETH Cycle Arbitrage Helper, Balance Updater, Local Gas Estimation, Relay Retry, V3 Multicall Decoder, Unified Submitter, Bundle Recorder

More Features For the UniswapV2 Flashbots Framework

Nov 08, 2022
∙ Paid
10

Share this post

Degen Code
Degen Code
Flashbots — Project Improvements: Stateful LP Fetcher, WETH Cycle Arbitrage Helper, Balance Updater, Local Gas Estimation, Relay Retry, V3 Multicall Decoder, Unified Submitter, Bundle Recorder
Copy link
Facebook
Email
Notes
More
Share

I regularly make modifications and test new features for my bots, and I wrote the first project improvement roll-up post with added features for the UniswapV2 Flashbots Framework a month after publishing the initial bot code.

If you’ve been using that for a while, you’ve likely seen a lot of MEV opportunities, submitted a lot of bundles, and seen a ton of console text flying around. I hope you’ve landed some arbs!

There are several new features I’ve added to make running this bot framework easier.

Stateful LP Fetcher

The first is an updated LP fetcher that will find the last-known pool at startup, and start from there instead of fetching the full list of LPs every time. Big time savings, especially on popular DEX like UniswapV2.

This uses JSON as the data storage format, and is set to use Sushiswap and UniswapV2 in this example.

BUGFIX 2022-11-09: added a missing .abi attribute to line LP_ABI = Contract(factory.allPairs(0))

ethereum_lp_fetcher_json.py

from brownie import network, Contract
import sys
import os
import json
import web3

BROWNIE_NETWORK = "mainnet-local"

os.environ["ETHERSCAN_TOKEN"] = "[CHANGE ME]"

try:
    network.connect(BROWNIE_NETWORK)
except:
    sys.exit("Could not connect!")

exchanges = [
    {
        "name": "SushiSwap",
        "filename": "ethereum_sushiswap_lps.json",
        "factory_address": "0xC0AEe478e3658e2610c5F7A4A2E1777cE9e4f2Ac",
    },
    {
        "name": "Uniswap V2",
        "filename": "ethereum_uniswapv2_lps.json",
        "factory_address": "0x5C69bEe701ef814a2B6a3EDD4B1652CB9cc5aA6f",
    },
]

w3 = web3.Web3(web3.WebsocketProvider())

for name, factory_address, filename in [
    (
        exchange["name"],
        exchange["factory_address"],
        exchange["filename"],
    )
    for exchange in exchanges
]:

    print(f"DEX: {name}")

    try:
        with open(filename) as file:
            lp_data = json.load(file)
    except FileNotFoundError:
        lp_data = []

    if lp_data:
        print(f"Found previous LP data: {len(lp_data)} pools")

    previous_pool_count = len(lp_data)

    try:
        factory = Contract(factory_address)
    except:
        try:
            factory = Contract.from_explorer(factory_address)
        except:
            factory = None
    finally:
        if factory is None:
            sys.exit("FACTORY COULD NOT BE LOADED")

    # Retrieve ABI for typical LP deployed by this factory
    try:
        LP_ABI = Contract(factory.allPairs(0)).abi
    except:
        try:
            LP_ABI = Contract.from_explorer(factory.allPairs(0)).abi
        except:
            LP_ABI = None
    finally:
        if LP_ABI is None:
            sys.exit("LP ABI COULD NOT BE LOADED")

    # count the number of pairs tracked by the factory
    pool_count = int(factory.allPairsLength())

    if pool_count == previous_pool_count:
        print("No new pools\n")
        continue

    print(f"Found {pool_count - len(lp_data)} new pools")

    # retrieve pool addresses found from the factory
    print("• Fetching LP addresses and token data")
    for pool_id in range(
        len(lp_data),
        pool_count,
        1,
    ):
        lp_dict = {}

        lp_address = factory.allPairs(pool_id)
        w3_pool = w3.eth.contract(
            address=lp_address,
            abi=LP_ABI,
        )
        token0 = w3_pool.functions.token0().call()
        token1 = w3_pool.functions.token1().call()

        lp_dict["pool_id"] = pool_id
        lp_dict["pool_address"] = lp_address
        lp_dict["token0"] = token0
        lp_dict["token1"] = token1

        lp_data.append(lp_dict)

    print("• Saving pool data to JSON\n")
    with open(filename, "w") as file:
        json.dump(lp_data, file)

WETH Cycle Arbitrage Helper

We have spent a lot of time learning how to do flash borrow arbitrage, but many everyday arbs are on the order of a few dollars at a time, and these can be snagged only if you have an efficient method that minimizes gas use.

The most efficient arbitrage takes a starting balance of some commonly-held wrapped token and swaps across two pools with shared tokens. You see these all the time on the Flashbots Explorer. Swap WETH into pool A, swap the resulting token into pool B, keep the profit. Simple!

I have built an arb helper for just this case called LpSwapWithFuture, which you can find HERE or by pulling/cloning the degenbot code base.

It is a simple arb helper that supports overriding reserves for mempool TX, so if you’re already working with other helpers you’ll feel comfortable with this one.

It relies on your arb contract holding a balance of WETH, so if you have no money don’t bother with this one. You can chase most of the “peanut sized” cycle arbs with 1-2 WETH, so don’t get discouraged.

Cycle arb helpers can be built from the existing 2pool and triangle arb JSON paths you already have. It’s a bit hacky but works well until I can build a more sophisticated parser that really makes this robust.

Bot WETH Balance Updater

Since we are going to be chasing cycle arbs, which are limited by the WETH balance of our contract, we need a way to track our WETH balance and update the arb helpers whenever it increases (usually from a successful arb, but a direct transfer is valid, too).

This simple function runs on every new block, checking for an update, and when found, updates the cycle arb helpers with the new balance.

async def track_balance():

    weth_balance = 0
    last_block = newest_block

    while True:
        await asyncio.sleep(0)

        # check for balance updates once per block
        if last_block == newest_block:
            continue

        try:
            balance = weth.balanceOf(arb_contract.address)
        except Exception as e:
            print(f"track_balance: {e}")
        else:
            if balance != weth_balance:
                print()
                print(f"Updated balance: {balance/(10**18):.3f} WETH")
                print()
                weth_balance = balance
                for arb in degenbot_cycle_arbs.values():
                    arb.max_input = weth_balance
        finally:
            last_block = newest_block

Local Gas Estimation

If you’re running a local Ethereum node (you should be), it is trivial to test potential arbitrage transactions for estimated gas consumption. We can test through the Flashbots relay as well, but this is much slower even on a relatively fast broadband connection. Much better to use our local node wherever possible, and to maintain a record of that gas use to avoid unnecessary re-tests.

This function allows us to simulate a basic cycle arb locally, and return the status of that simulation. If the simulation fails too many times, the arb can be dropped and blacklisted.

async def test_onchain_arb_gas(
    arb_dict: dict,
    arb_id=None,
):

    global arb_simulations

    # get a pointer to the arb helper
    arb_helper = degenbot_cycle_arbs.get(arb_id)

    if VERBOSE_TIMING:
        start = time.monotonic()
        print("starting test_onchain_arb")

    tx_params = {
        "from": bot_account.address,
        "chainId": brownie.chain.id,
        "nonce": bot_account.nonce,
    }

    swap_amount = arb_dict.get("swap_amount")

    # generate payloads for these steps: transfer, swap*
    # *multiple swap payloads may be needed

    try:
        # transfer the input token to the first swap pool
        transfer_payload = [
            # address
            input_token.address,
            # bytes calldata
            w3.keccak(text="transfer(address,uint256)")[0:4]
            + eth_abi.encode(
                [
                    "address",
                    "uint256",
                ],
                [
                    arb_dict.get("swap_pool_addresses")[0],
                    swap_amount,
                ],
            ),
            # msg.value
            0,
        ]
    except Exception as e:
        print(f"transfer_payload: {e}")
        print(arb_dict)
        print(f"id: {arb_id}")
        return

    try:
        swap_payloads = []
        swap_steps = len(arb_dict.get("swap_pool_addresses"))

        for i, swap_pool_address in enumerate(
            arb_dict.get("swap_pool_addresses")
        ):

            # use the next swap pool address as the destination unless
            # this is the last swap
            swap_destination = (
                arb_contract.address
                if i == swap_steps - 1
                else arb_dict.get("swap_pool_addresses")[i + 1]
            )

            swap_payloads.append(
                [
                    # address
                    swap_pool_address,
                    # bytes calldata
                    w3.keccak(
                        text="swap(uint256,uint256,address,bytes)"
                    )[0:4]
                    + eth_abi.encode(
                        [
                            "uint256",
                            "uint256",
                            "address",
                            "bytes",
                        ],
                        [
                            *arb_dict.get("swap_pool_amounts")[i],
                            swap_destination,
                            b"",
                        ],
                    ),
                    # msg.value
                    0,
                ]
            )
    except Exception as e:
        print(f"swap_payload: {e}")
        print(arb_dict)
        print(f"id: {arb_id}")
        return

    success, gas_estimate = test_gas(
        arb_helper,
        [transfer_payload, *swap_payloads],
        tx_params,
        arb_id=arb_helper.id,
    )
    if not success:
        return
    else:
        arb_helper.gas_estimate = gas_estimate

    if VERBOSE_TIMING:
        print(
            f"test_onchain_arb completed in {time.monotonic() - start:0.4f}s"
        )

This function relies on there being an actual opportunity to test, so there is a corresponding function that will continually look for cycle arb helpers that have both (1) no previous gas estimate set and (2) some positive expected profit, even if trivially small.

This function works hand-in-hand with the gas estimator, and in fact calls it directly.

async def activate_arbs():

    # pre-update all arbs, yielding to the event loop frequently to allow the websocket to receive new blocks and keep pool states up to date
    for arb_helper in degenbot_cycle_arbs.values():
        await asyncio.sleep(0)
        arb_helper.update_reserves()

    while True:

        await asyncio.sleep(0)

        if (
            not status_new_blocks
            or not status_sync_events
            or not status_pools_updated
        ):
            continue

        # generator expression to identify all potential arbs that have not been simulated for gas
        arbs_to_process = (
            arb_helper
            for arb_helper in degenbot_cycle_arbs.values()
            if not arb_helper.gas_estimate
            if arb_helper.best.get("profit_amount")
        )

        try:
            # get the first match only
            arb_helper = next(arbs_to_process)
        except StopIteration:
            continue

        # yield to the event loop, then check if the arb is still valid
        try:
            await asyncio.sleep(0)
            arb_helper.update_reserves()
        except Exception as e:
            print(f"estimate_arbs: {e}")
            print(type(e))
        else:
            if not arb_helper.best.get("profit_amount"):
                continue

            if VERBOSE_ACTIVATION:
                print(f"Gas test for arb: {arb_helper} ...")

            await test_onchain_arb_gas(
                arb_dict=arb_helper.best,
                arb_id=arb_helper.id,
            )

Relay Retry

The frustrating this about the Flashbots Relay is sometimes it can’t be reached. If you’ve found a really hot arb but the Relay throws some trivial error (connection reset, for example) and you miss it, you’re gonna feel real bad.

I’ve made some improvements to the simulator and bundle submission code blocks that will attempt to retry the relay up to 5 times (adjustable) whenever an error is encountered. It’s not sophisticated enough to stop on failed simulations, but this catches the easy “false negatives” when the relay goes AWOL for a bit.

V3 Multicall Decoder

In the UniswapV3 Mempool Watcher lesson, we developed a simple transaction watcher that observed and decoded multicall transactions. As luck would have it, some of these multicall transactions are swaps through V2 pools that are being sent through the V3 front-end.

Other than the added difficulty of deciphering these multicall transactions, they can be handled as before and backrun to your heart’s content.

The V3 multicall section is a bit primitive (for now), and only processes the first V2 transaction it finds. Some users submit two or more V2 swaps via multicall, and it will not catch these until I make the handling more sophisticated.

Unified Submitter

I have also collapsed the relay execution into a single function that can assemble a bundle with an arb transaction and an optional backrun or frontrun from the mempool. This simplifies the logic and eliminates the need for separate functions to handle the various types.

Bundle Recorder

If you’re going to be submitting bundles, wouldn’t you like to see what they were?

I have written a bundle recorder function that maintains a JSON dict of all successfully submitted bundles. This is very useful if you want to inspect a bundle later, because there is no central database to query for the bundles you’ve already sent.

The JSON dict is keyed by bundleHash, and the information inside each entry includes the block it targeted, the calldata for every transaction in the bundle, the arb identifier, and the time it was submitted.

Bundles targeting a mempool TX will include both the mempool TX and the arb TX, and it will store multiple block targets, if you chose to do so.

What to Expect

This version operates a bit differently from the earlier version. It is built to handle a lot of simultaneous arbs, and it’s very “chatty” with the default settings. Feel free to adjust the various VERBOSE_ options to reduce the visual spam.

Initially, it will look like the bot isn’t doing much as it updates all of the pools. However once it does, it will start testing arbs for gas estimates, processing mempool transactions, and really chugging along. You’ll also see A LOT of these messages:

Source Code

Please report bugs to me directly or in the Discord. Happy searching!

BUGFIX 2022-11-09:

  • Commented out a status check in watch_pending_transactions that caused it to skip processing transactions during pool updates.

  • Removed a +1 in the send_bundle and record_bundle steps, which was incrementing the target blocks “off by one”

ethereum_flashbots_cycle.py

This post is for paid subscribers

Already a paid subscriber? Sign in
© 2025 BowTiedDevil
Privacy ∙ Terms ∙ Collection notice
Start writingGet the app
Substack is the home for great culture

Share

Copy link
Facebook
Email
Notes
More