We made it! Thanks for sticking with me as I stumbled through the maze of V3 contracts, built some helpers, tested, tested, tested, then found and fixed all of the bugs*.
*some bugs likely remain, so I look forward to your pull requests and bug reports on Discord
First I will do a high level overview of the bot, then discuss some improvements I’ve made in various places, then share all of the code you need to start going after V3 pools.
Overview
This project searches for two-pool arbitrage opportunities on Ethereum mainnet by cycling WETH through Uniswap-based AMM pools. Both types of pools are supported (V2 and V3, including Sushiswap), and in any combination.
It is built on top of asyncio for coroutine-based concurrency, and makes heavy use of the websockets library to listen for events that track changes to pool states.
I built this project for those operating a local node. It will work if you have a 3rd party RPC, but overall latency will be higher and I expect you’ll run into rate limits and request caps if you are not careful.
It constructs transactions and builds bundles for submission via the Flashbots relay.
New Features
Asynchronous Startup: The bot now generates token, pool, and arbitrage helpers inside an async coroutine. The connection to the node is now established within seconds of the bot starting up (along with other coroutines that watch for new blocks and events). As arbitrage helpers are “hot loaded”, the bot will process and execute on them in the background. This means that the bot can continue to load new opportunities as it runs, which opens up the possibility of a long-running bot that is fed by another program.
Next-block Gas Prediction: The bot uses a nice method from web3py called
fee_history
that allows us to predict what the base fee for the next block will be, based on total gas utilization for the current block. No more guessing or adding “fudge factors” for the base fee, we always know if gas will go up or down and by how much, and can construct our transactions appropriately.Latency Watchdog: With so much concurrency, it’s common to see the bot get overloaded and process blocks too late to capture any opportunity. There is now a “watchdog” process that sets a “paused” flag whenever the next block has not arrived after some time delay. Other coroutines that consume time and effort can watch this flag and suspend their operation to allow the bot to catch up to the current state.
Single Event Watcher: Rather than starting and attempting to synchronize multiple connections to receive events, the bot now requests that all events be sent to it and filters them appropriately with various callback functions. There is now a built-in “soft timeout” feature that will wait for all events to be received before processing the batch completely to reduce the chance of generating an arb from an event in the middle of a block.
Smart Arb Selection: After processing all updates, the bot identifies only the arbs that were affected and recalculates their potential for profit.
Pool Refresher: A long-running coroutine periodically scans all liquidity pool helpers to identify any that were not updated before the new block and event watcher came online. For pools that are marked as externally-updated, this is critical to ensure that the local state we track matches the on-chain state. If either the block or event watcher disconnect, this health check will spring into action and force-update all of the “stale” pools.
Planned Features
The bot is limited in scope and will serve as a foundation for future improvements. These familiar components from previous UniswapV2 bots are not included:
Mempool-Aware Backrunning: this will be added after I incorporate future-state prediction into the V3LiquidityPool class. The function that builds and transmits bundles still has some arguments that will handle mempool-aware bundles in the future, but they do nothing in this iteration.
Flash Borrow Arbitrage: I have not developed a helper class that will calculate flash borrow arbitrage starting at a V3 pool.
Triangle+ Arbitrage: I have only tested this version with 2-pool arbitrage paths, i.e. arbitraging between two pools with the same token pair. The helper will support 3-or-more pools without much trouble, but I have not tested it thoroughly enough to responsibly share that code.
Helper Improvements
Beyond the improvements I’ve shared in previous posts, the V3LiquidityPool and UniswapLpCycle classes have been beefed up.
Here are a few highlights:
Better Exceptions: instead of raising generic Exceptions, now each class has a dedicated base exceptions (ArbitrageError, Erc20TokenError, LiquidityPoolError for example) that can be handled inside and outside of the class.
Flexible Pool Helper Updates: The V3 pool helper can be updated in two ways:
auto_update
will do a refresh of the pool state from the built-in Brownie contract. This is done at start-up, but does not have to be used afterwards.external_update
will accept a dictionary of updates with various items (tick, sqrt_price_x96, liquidity, liquidity_change) that adjust the pool state tracked by the helper.
User-Triggered Arbitrage Calculations: The
UniswapLpCycle
helper does not automatically calculates arbitrage values on updates. Theauto_update
andexternal_update
return an “updated” boolean that allows the bot builder to determine whether an arbitrage calculation should be initiated. e.g. if you run the updater and it returns False, you know there has been no change to the arbitrage potential. Consequently, there is a helpfulcalculate_arbitrage
method that returns a similar “profitable”True/False
flag.On-demand Payload Generation: I covered this in the previous post, but it’s too good not to mention. The arbitrage helper is responsible for generating its own payloads, which eliminates a lot of headache and code duplication.
Multicall Tick Fetching: For any Brownie network with the
multicall2
network key set, the V3LiquidityPool helper will attempt to pre-fetch extra ticks via multicall instead of doing them one-by-one.Zero-Liquidity Short Circuits: The V3 pool helper will raise various exceptions if it detects an arbitrage or calculation that cannot be performed. An example is if a pool has no liquidity in the direction of the proposed trade, it will raise a
ZeroLiquidityError
immediately instead of wasting time calculating an impossible result. TheUniswapLpCycle
arbitrage helper also detects these errors and raises the appropriateArbitrageError
when these are found. The proejct bot handles them appropriately by disregarding them from consideration until another pool state changes.Bug Fixes: A particularly nasty bug that was messing me up for almost a month is the behavior of floor division in Python, which rounds to negative infinity instead of toward zero (which matches EVM’s behavior). Very subtle differences in swap behavior resulted in a bunch of reverted transactions that stumped me for a long time. This and other are now fixed in the degenbot repo.
Now, on to the code!
V3-Compatible Payload Executor Contract
ethereum_executor_v3.vy
# @version ^0.3
OWNER_ADDR: immutable(address)
WETH_ADDR: constant(address) = 0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2
V3_FACTORY: constant(address) = 0x1F98431c8aD98523631AE4a59f267346ea31F984
MAX_PAYLOADS: constant(uint256) = 16
MAX_PAYLOAD_BYTES: constant(uint256) = 1024
struct payload:
target: address
calldata: Bytes[MAX_PAYLOAD_BYTES]
value: uint256
@external
@payable
def __init__():
OWNER_ADDR = msg.sender
# wrap initial Ether to WETH
if msg.value > 0:
raw_call(
WETH_ADDR,
method_id('deposit()'),
value=msg.value
)
@external
@payable
def execute_payloads(
payloads: DynArray[payload, MAX_PAYLOADS],
):
assert msg.sender == OWNER_ADDR, "!OWNER"
for _payload in payloads:
raw_call(
_payload.target,
_payload.calldata,
value=_payload.value,
)
@internal
@pure
def verifyCallback(
tokenA: address,
tokenB: address,
fee: uint24
) -> address:
token0: address = tokenA
token1: address = tokenB
if convert(tokenA,uint160) > convert(tokenB,uint160):
token0 = tokenB
token1 = tokenA
return convert(
slice(
convert(
convert(
keccak256(
concat(
b'\xFF',
convert(V3_FACTORY,bytes20),
keccak256(
_abi_encode(
token0,
token1,
fee
)
),
0xe34f199b19b2b4f47f68442619d555527d244f78a3297ea89325f843f87b8b54,
)
),
uint256
),
bytes32
),
12,
20,
),
address
)
@external
@payable
def uniswapV3SwapCallback(
amount0: int256,
amount1: int256,
data: Bytes[32]
):
# get the token0/token1 addresses and fee reported by msg.sender
token0: address = extract32(
raw_call(
msg.sender,
method_id('token0()'),
max_outsize=32,
),
0,
output_type=address
)
token1: address = extract32(
raw_call(
msg.sender,
method_id('token1()'),
max_outsize=32,
),
0,
output_type=address
)
fee: uint24 = extract32(
raw_call(
msg.sender,
method_id('fee()'),
max_outsize=32,
),
0,
output_type=uint24
)
assert msg.sender == self.verifyCallback(token0,token1,fee), "!V3LP"
# transfer token back to pool
if amount0 > 0:
raw_call(
token0,
_abi_encode(
msg.sender,
amount0,
method_id=method_id('transfer(address,uint256)')
)
)
elif amount1 > 0:
raw_call(
token1,
_abi_encode(
msg.sender,
amount1,
method_id=method_id('transfer(address,uint256)')
)
)
@external
@payable
def __default__():
# accept basic Ether transfers to the contract with no calldata
if len(msg.data) == 0:
return
# revert on all other calls
else:
raise
LP Fetchers
ethereum_lp_fetcher_uniswapv2_json.py
from brownie import network, Contract
import sys
import os
import json
import web3
BROWNIE_NETWORK = "mainnet-local"
os.environ["ETHERSCAN_TOKEN"] = "[redacted]"
# maximum blocks to process with getLogs
BLOCK_SPAN = 50_000
# number of pools to process at a time before flushing to disk
CHUNK_SIZE = 1000
try:
network.connect(BROWNIE_NETWORK)
except:
sys.exit("Could not connect!")
exchanges = [
{
"name": "SushiSwap",
"filename": "ethereum_sushiswap_lps.json",
"factory_address": "0xC0AEe478e3658e2610c5F7A4A2E1777cE9e4f2Ac",
"factory_deployment_block": 10_794_229,
},
{
"name": "Uniswap V2",
"filename": "ethereum_uniswapv2_lps.json",
"factory_address": "0x5C69bEe701ef814a2B6a3EDD4B1652CB9cc5aA6f",
"factory_deployment_block": 10_000_835,
},
]
# requires a local node running with the websocket endpoint exposed
# adjust this as needed if you are not using this configuration
w3 = web3.Web3(web3.WebsocketProvider())
current_block = w3.eth.get_block_number()
for name, factory_address, filename, deployment_block in [
(
exchange["name"],
exchange["factory_address"],
exchange["filename"],
exchange["factory_deployment_block"],
)
for exchange in exchanges
]:
print(f"DEX: {name}")
try:
factory_contract = Contract(factory_address)
except:
try:
factory_contract = Contract.from_explorer(factory_address)
except:
factory_contract = None
finally:
if factory_contract is None:
sys.exit("FACTORY COULD NOT BE LOADED")
try:
with open(filename) as file:
lp_data = json.load(file)
except FileNotFoundError:
lp_data = []
if lp_data:
previous_pool_count = len(lp_data)
print(f"Found previously-fetched data: {previous_pool_count} pools")
previous_block = lp_data[-1].get("block_number")
print(f"Found pool data up to block {previous_block}")
else:
previous_pool_count = 0
previous_block = deployment_block
for i in range(previous_block + 1, current_block + 1, BLOCK_SPAN):
if i + BLOCK_SPAN > current_block:
end_block = current_block
else:
end_block = i + BLOCK_SPAN
if pool_created_events := factory_contract.events.PairCreated.getLogs(
fromBlock=i, toBlock=end_block
):
for event in pool_created_events:
lp_data.append(
{
"pool_address": event.args.get("pair"),
"token0": event.args.get("token0"),
"token1": event.args.get("token1"),
"block_number": event.get("blockNumber"),
"pool_id": event.args.get(""),
"type": "UniswapV2",
}
)
with open(filename, "w") as file:
json.dump(lp_data, file, indent=2)
ethereum_lp_fetcher_uniswapv3_json.py
from brownie import network, Contract
import sys
import os
import json
import web3
BROWNIE_NETWORK = "mainnet-local"
os.environ["ETHERSCAN_TOKEN"] = "[redacted]"
# maximum blocks to process with getLogs
BLOCK_SPAN = 50_000
FACTORY_DEPLOYMENT_BLOCK = 12369621
try:
network.connect(BROWNIE_NETWORK)
except:
sys.exit("Could not connect!")
exchanges = [
{
"name": "Uniswap V3",
"filename": "ethereum_uniswapv3_lps.json",
"factory_address": "0x1F98431c8aD98523631AE4a59f267346ea31F984",
},
]
# requires a local node running with the websocket endpoint exposed
# adjust this as needed if you are not using this configuration
w3 = web3.Web3(web3.WebsocketProvider())
current_block = w3.eth.get_block_number()
for name, factory_address, filename in [
(
exchange["name"],
exchange["factory_address"],
exchange["filename"],
)
for exchange in exchanges
]:
print(f"DEX: {name}")
try:
factory = Contract(factory_address)
except:
try:
factory = Contract.from_explorer(factory_address)
except:
factory = None
finally:
if factory is None:
sys.exit("FACTORY COULD NOT BE LOADED")
try:
with open(filename) as file:
lp_data = json.load(file)
except FileNotFoundError:
lp_data = []
if lp_data:
previous_block = lp_data[-1].get("block_number")
print(f"Found pool data up to block {previous_block}")
else:
previous_block = FACTORY_DEPLOYMENT_BLOCK
factory_contract = w3.eth.contract(
address=factory.address, abi=factory.abi
)
previously_found_pools = len(lp_data)
print(f"previously found {previously_found_pools} pools")
for i in range(previous_block + 1, current_block + 1, BLOCK_SPAN):
if i + BLOCK_SPAN > current_block:
end_block = current_block
else:
end_block = i + BLOCK_SPAN
if pool_created_events := factory_contract.events.PoolCreated.getLogs(
fromBlock=i, toBlock=end_block
):
for event in pool_created_events:
lp_data.append(
{
"pool_address": event.args.pool,
"fee": event.args.fee,
"token0": event.args.token0,
"token1": event.args.token1,
"block_number": event.blockNumber,
"type": "UniswapV3",
}
)
with open(filename, "w") as file:
json.dump(lp_data, file, indent=2)
print(f"Saved {len(lp_data) - previously_found_pools} new pools")
2-Pool Arbitrage Builder
ethereum_parser_2pool_univ3.py
import json
import web3
import networkx as nx
import itertools
WETH_ADDRESS = "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"
w3 = web3.Web3()
v2_lp_data = {}
for filename in [
"ethereum_sushiswap_lps.json",
"ethereum_uniswapv2_lps.json",
]:
with open(filename) as file:
for pool in json.load(file):
v2_lp_data[pool.get("pool_address")] = {
key: value
for key, value in pool.items()
if key not in ["pool_id"]
}
print(f"Found {len(v2_lp_data)} V2 pools")
v3_lp_data = {}
for filename in [
"ethereum_uniswapv3_lps.json",
]:
with open(filename) as file:
for pool in json.load(file):
v3_lp_data[pool.get("pool_address")] = {
key: value
for key, value in pool.items()
if key not in ["block_number"]
}
print(f"Found {len(v3_lp_data)} V3 pools")
all_v2_pools = set(v2_lp_data.keys())
all_v3_pools = set(v3_lp_data.keys())
all_tokens = set(
[lp.get("token0") for lp in v2_lp_data.values()]
+ [lp.get("token1") for lp in v2_lp_data.values()]
+ [lp.get("token0") for lp in v3_lp_data.values()]
+ [lp.get("token1") for lp in v3_lp_data.values()]
)
# build the graph with tokens as nodes, adding an edge
# between any two tokens held by a liquidity pool
G = nx.MultiGraph()
for pool in v2_lp_data.values():
G.add_edge(
pool.get("token0"),
pool.get("token1"),
lp_address=pool.get("pool_address"),
pool_type="UniswapV2",
)
for pool in v3_lp_data.values():
G.add_edge(
pool.get("token0"),
pool.get("token1"),
lp_address=pool.get("pool_address"),
pool_type="UniswapV3",
)
print(f"G ready: {len(G.nodes)} nodes, {len(G.edges)} edges")
all_tokens_with_weth_pool = list(G.neighbors(WETH_ADDRESS))
print(f"Found {len(all_tokens_with_weth_pool)} tokens with a WETH pair")
print("*** Finding two-pool arbitrage paths ***")
two_pool_arb_paths = {}
for token in all_tokens_with_weth_pool:
pools = G.get_edge_data(token, WETH_ADDRESS).values()
# skip tokens with only one pool
if len(pools) < 2:
continue
for pool_a, pool_b in itertools.permutations(pools, 2):
if pool_a.get("pool_type") == "UniswapV2":
pool_a_dict = v2_lp_data.get(pool_a.get("lp_address"))
elif pool_a.get("pool_type") == "UniswapV3":
pool_a_dict = v3_lp_data.get(pool_a.get("lp_address"))
else:
raise Exception(f"could not identify pool {pool_a}")
if pool_b.get("pool_type") == "UniswapV2":
pool_b_dict = v2_lp_data.get(pool_b.get("lp_address"))
elif pool_b.get("pool_type") == "UniswapV3":
pool_b_dict = v3_lp_data.get(pool_b.get("lp_address"))
else:
raise Exception(f"could not identify pool {pool_b}")
two_pool_arb_paths[id] = {
"id": (
id := w3.keccak(
hexstr="".join(
[
pool_a.get("lp_address")[2:],
pool_b.get("lp_address")[2:],
]
)
).hex()
),
"pools": {
pool_a.get("lp_address"): pool_a_dict,
pool_b.get("lp_address"): pool_b_dict,
},
"arb_types": ["cycle", "flash_borrow_lp_swap"],
"path": [pool.get("lp_address") for pool in [pool_a, pool_b]],
}
print(f"Found {len(two_pool_arb_paths)} unique two-pool arbitrage paths")
print("• Saving arb paths to JSON")
with open("ethereum_arbs_2pool_withv3.json", "w") as file:
json.dump(two_pool_arb_paths, file, indent=2)
2-Pool Arbitrage Bot
ethereum_flashbots_2pool_cycle_univ3.py