Flashbots — Project Improvements: Stateful LP Fetcher, WETH Cycle Arbitrage Helper, Balance Updater, Local Gas Estimation, Relay Retry, V3 Multicall Decoder, Unified Submitter, Bundle Recorder
More Features For the UniswapV2 Flashbots Framework
I regularly make modifications and test new features for my bots, and I wrote the first project improvement roll-up post with added features for the UniswapV2 Flashbots Framework a month after publishing the initial bot code.
If you’ve been using that for a while, you’ve likely seen a lot of MEV opportunities, submitted a lot of bundles, and seen a ton of console text flying around. I hope you’ve landed some arbs!
There are several new features I’ve added to make running this bot framework easier.
Stateful LP Fetcher
The first is an updated LP fetcher that will find the last-known pool at startup, and start from there instead of fetching the full list of LPs every time. Big time savings, especially on popular DEX like UniswapV2.
This uses JSON as the data storage format, and is set to use Sushiswap and UniswapV2 in this example.
BUGFIX 2022-11-09: added a missing .abi attribute to line LP_ABI = Contract(factory.allPairs(0))
ethereum_lp_fetcher_json.py
from brownie import network, Contract
import sys
import os
import json
import web3
BROWNIE_NETWORK = "mainnet-local"
os.environ["ETHERSCAN_TOKEN"] = "[CHANGE ME]"
try:
network.connect(BROWNIE_NETWORK)
except:
sys.exit("Could not connect!")
exchanges = [
{
"name": "SushiSwap",
"filename": "ethereum_sushiswap_lps.json",
"factory_address": "0xC0AEe478e3658e2610c5F7A4A2E1777cE9e4f2Ac",
},
{
"name": "Uniswap V2",
"filename": "ethereum_uniswapv2_lps.json",
"factory_address": "0x5C69bEe701ef814a2B6a3EDD4B1652CB9cc5aA6f",
},
]
w3 = web3.Web3(web3.WebsocketProvider())
for name, factory_address, filename in [
(
exchange["name"],
exchange["factory_address"],
exchange["filename"],
)
for exchange in exchanges
]:
print(f"DEX: {name}")
try:
with open(filename) as file:
lp_data = json.load(file)
except FileNotFoundError:
lp_data = []
if lp_data:
print(f"Found previous LP data: {len(lp_data)} pools")
previous_pool_count = len(lp_data)
try:
factory = Contract(factory_address)
except:
try:
factory = Contract.from_explorer(factory_address)
except:
factory = None
finally:
if factory is None:
sys.exit("FACTORY COULD NOT BE LOADED")
# Retrieve ABI for typical LP deployed by this factory
try:
LP_ABI = Contract(factory.allPairs(0)).abi
except:
try:
LP_ABI = Contract.from_explorer(factory.allPairs(0)).abi
except:
LP_ABI = None
finally:
if LP_ABI is None:
sys.exit("LP ABI COULD NOT BE LOADED")
# count the number of pairs tracked by the factory
pool_count = int(factory.allPairsLength())
if pool_count == previous_pool_count:
print("No new pools\n")
continue
print(f"Found {pool_count - len(lp_data)} new pools")
# retrieve pool addresses found from the factory
print("• Fetching LP addresses and token data")
for pool_id in range(
len(lp_data),
pool_count,
1,
):
lp_dict = {}
lp_address = factory.allPairs(pool_id)
w3_pool = w3.eth.contract(
address=lp_address,
abi=LP_ABI,
)
token0 = w3_pool.functions.token0().call()
token1 = w3_pool.functions.token1().call()
lp_dict["pool_id"] = pool_id
lp_dict["pool_address"] = lp_address
lp_dict["token0"] = token0
lp_dict["token1"] = token1
lp_data.append(lp_dict)
print("• Saving pool data to JSON\n")
with open(filename, "w") as file:
json.dump(lp_data, file)
WETH Cycle Arbitrage Helper
We have spent a lot of time learning how to do flash borrow arbitrage, but many everyday arbs are on the order of a few dollars at a time, and these can be snagged only if you have an efficient method that minimizes gas use.
The most efficient arbitrage takes a starting balance of some commonly-held wrapped token and swaps across two pools with shared tokens. You see these all the time on the Flashbots Explorer. Swap WETH into pool A, swap the resulting token into pool B, keep the profit. Simple!
I have built an arb helper for just this case called LpSwapWithFuture
, which you can find HERE or by pulling/cloning the degenbot code base.
It is a simple arb helper that supports overriding reserves for mempool TX, so if you’re already working with other helpers you’ll feel comfortable with this one.
It relies on your arb contract holding a balance of WETH, so if you have no money don’t bother with this one. You can chase most of the “peanut sized” cycle arbs with 1-2 WETH, so don’t get discouraged.
Cycle arb helpers can be built from the existing 2pool and triangle arb JSON paths you already have. It’s a bit hacky but works well until I can build a more sophisticated parser that really makes this robust.
Bot WETH Balance Updater
Since we are going to be chasing cycle arbs, which are limited by the WETH balance of our contract, we need a way to track our WETH balance and update the arb helpers whenever it increases (usually from a successful arb, but a direct transfer is valid, too).
This simple function runs on every new block, checking for an update, and when found, updates the cycle arb helpers with the new balance.
async def track_balance():
weth_balance = 0
last_block = newest_block
while True:
await asyncio.sleep(0)
# check for balance updates once per block
if last_block == newest_block:
continue
try:
balance = weth.balanceOf(arb_contract.address)
except Exception as e:
print(f"track_balance: {e}")
else:
if balance != weth_balance:
print()
print(f"Updated balance: {balance/(10**18):.3f} WETH")
print()
weth_balance = balance
for arb in degenbot_cycle_arbs.values():
arb.max_input = weth_balance
finally:
last_block = newest_block
Local Gas Estimation
If you’re running a local Ethereum node (you should be), it is trivial to test potential arbitrage transactions for estimated gas consumption. We can test through the Flashbots relay as well, but this is much slower even on a relatively fast broadband connection. Much better to use our local node wherever possible, and to maintain a record of that gas use to avoid unnecessary re-tests.
This function allows us to simulate a basic cycle arb locally, and return the status of that simulation. If the simulation fails too many times, the arb can be dropped and blacklisted.
async def test_onchain_arb_gas(
arb_dict: dict,
arb_id=None,
):
global arb_simulations
# get a pointer to the arb helper
arb_helper = degenbot_cycle_arbs.get(arb_id)
if VERBOSE_TIMING:
start = time.monotonic()
print("starting test_onchain_arb")
tx_params = {
"from": bot_account.address,
"chainId": brownie.chain.id,
"nonce": bot_account.nonce,
}
swap_amount = arb_dict.get("swap_amount")
# generate payloads for these steps: transfer, swap*
# *multiple swap payloads may be needed
try:
# transfer the input token to the first swap pool
transfer_payload = [
# address
input_token.address,
# bytes calldata
w3.keccak(text="transfer(address,uint256)")[0:4]
+ eth_abi.encode(
[
"address",
"uint256",
],
[
arb_dict.get("swap_pool_addresses")[0],
swap_amount,
],
),
# msg.value
0,
]
except Exception as e:
print(f"transfer_payload: {e}")
print(arb_dict)
print(f"id: {arb_id}")
return
try:
swap_payloads = []
swap_steps = len(arb_dict.get("swap_pool_addresses"))
for i, swap_pool_address in enumerate(
arb_dict.get("swap_pool_addresses")
):
# use the next swap pool address as the destination unless
# this is the last swap
swap_destination = (
arb_contract.address
if i == swap_steps - 1
else arb_dict.get("swap_pool_addresses")[i + 1]
)
swap_payloads.append(
[
# address
swap_pool_address,
# bytes calldata
w3.keccak(
text="swap(uint256,uint256,address,bytes)"
)[0:4]
+ eth_abi.encode(
[
"uint256",
"uint256",
"address",
"bytes",
],
[
*arb_dict.get("swap_pool_amounts")[i],
swap_destination,
b"",
],
),
# msg.value
0,
]
)
except Exception as e:
print(f"swap_payload: {e}")
print(arb_dict)
print(f"id: {arb_id}")
return
success, gas_estimate = test_gas(
arb_helper,
[transfer_payload, *swap_payloads],
tx_params,
arb_id=arb_helper.id,
)
if not success:
return
else:
arb_helper.gas_estimate = gas_estimate
if VERBOSE_TIMING:
print(
f"test_onchain_arb completed in {time.monotonic() - start:0.4f}s"
)
This function relies on there being an actual opportunity to test, so there is a corresponding function that will continually look for cycle arb helpers that have both (1) no previous gas estimate set and (2) some positive expected profit, even if trivially small.
This function works hand-in-hand with the gas estimator, and in fact calls it directly.
async def activate_arbs():
# pre-update all arbs, yielding to the event loop frequently to allow the websocket to receive new blocks and keep pool states up to date
for arb_helper in degenbot_cycle_arbs.values():
await asyncio.sleep(0)
arb_helper.update_reserves()
while True:
await asyncio.sleep(0)
if (
not status_new_blocks
or not status_sync_events
or not status_pools_updated
):
continue
# generator expression to identify all potential arbs that have not been simulated for gas
arbs_to_process = (
arb_helper
for arb_helper in degenbot_cycle_arbs.values()
if not arb_helper.gas_estimate
if arb_helper.best.get("profit_amount")
)
try:
# get the first match only
arb_helper = next(arbs_to_process)
except StopIteration:
continue
# yield to the event loop, then check if the arb is still valid
try:
await asyncio.sleep(0)
arb_helper.update_reserves()
except Exception as e:
print(f"estimate_arbs: {e}")
print(type(e))
else:
if not arb_helper.best.get("profit_amount"):
continue
if VERBOSE_ACTIVATION:
print(f"Gas test for arb: {arb_helper} ...")
await test_onchain_arb_gas(
arb_dict=arb_helper.best,
arb_id=arb_helper.id,
)
Relay Retry
The frustrating this about the Flashbots Relay is sometimes it can’t be reached. If you’ve found a really hot arb but the Relay throws some trivial error (connection reset, for example) and you miss it, you’re gonna feel real bad.
I’ve made some improvements to the simulator and bundle submission code blocks that will attempt to retry the relay up to 5 times (adjustable) whenever an error is encountered. It’s not sophisticated enough to stop on failed simulations, but this catches the easy “false negatives” when the relay goes AWOL for a bit.
V3 Multicall Decoder
In the UniswapV3 Mempool Watcher lesson, we developed a simple transaction watcher that observed and decoded multicall transactions. As luck would have it, some of these multicall transactions are swaps through V2 pools that are being sent through the V3 front-end.
Other than the added difficulty of deciphering these multicall transactions, they can be handled as before and backrun to your heart’s content.
The V3 multicall section is a bit primitive (for now), and only processes the first V2 transaction it finds. Some users submit two or more V2 swaps via multicall, and it will not catch these until I make the handling more sophisticated.
Unified Submitter
I have also collapsed the relay execution into a single function that can assemble a bundle with an arb transaction and an optional backrun or frontrun from the mempool. This simplifies the logic and eliminates the need for separate functions to handle the various types.
Bundle Recorder
If you’re going to be submitting bundles, wouldn’t you like to see what they were?
I have written a bundle recorder function that maintains a JSON dict of all successfully submitted bundles. This is very useful if you want to inspect a bundle later, because there is no central database to query for the bundles you’ve already sent.
The JSON dict is keyed by bundleHash, and the information inside each entry includes the block it targeted, the calldata for every transaction in the bundle, the arb identifier, and the time it was submitted.
Bundles targeting a mempool TX will include both the mempool TX and the arb TX, and it will store multiple block targets, if you chose to do so.
What to Expect
This version operates a bit differently from the earlier version. It is built to handle a lot of simultaneous arbs, and it’s very “chatty” with the default settings. Feel free to adjust the various VERBOSE_ options to reduce the visual spam.
Initially, it will look like the bot isn’t doing much as it updates all of the pools. However once it does, it will start testing arbs for gas estimates, processing mempool transactions, and really chugging along. You’ll also see A LOT of these messages:
Source Code
Please report bugs to me directly or in the Discord. Happy searching!
BUGFIX 2022-11-09:
Commented out a status check in
watch_pending_transactions
that caused it to skip processing transactions during pool updates.Removed a +1 in the send_bundle and record_bundle steps, which was incrementing the target blocks “off by one”
ethereum_flashbots_cycle.py