Building on the efforts of the Uniswap Transaction Prediction series (beginning at Part I, ending at Part VIII), this project demonstrates how to perform WETH cycle backrun arbitrage of mempool transactions to Uniswap routers.
There are several topics I want to cover:
Vyper contract performance improvements
Closed-pool searching (MEVblocker)
Wallet watching
Liquidations (Aave and others)
Local EVM simulation using pyrevm (and others)
Replaying transactions with sothis
Data extraction with cryo
Historical data recording with TimescaleDB
Visualization with Grafana
Distributed task queues with Celery
External API development with FastAPI
But I couldn’t cover any of these without finishing our exploration of transaction prediction and building a mempool-aware bot.
So…
LFG 🚀
New Features & Improvements
There are several pieces to highlight.
Replace web3-flashbots Middleware
The web3py flashbots middleware was a useful tool when we were learning, but it has several limitations.
It can only be used “out of the box” with one relay. If you want to connect to two relays, you must create two web3 objects and inject the middleware into each of them. That’s fine enough, but it’s a very heavy approach.
It is slow and consists of synchronous code that cannot be easily modified. It uses the requests module for interaction with the relay endpoint, which is robust but not performant when used in async code.
It tightly couples the code to the internals of web3.py and obscures the HTTP calls being used. It is difficult to extend to relays that are not call-for-call compatible with Flashbotrs.
To that end, I’ve removed the middleware and replaced it with lightweight implementations that call endpoints eth_callBundle
(for simulation) and eth_sendBundle
(for sending) via the aiohttp module for async HTTP.
A key advantage of this module is that I can easily use it with multiple Flashbots-compatible endpoints. The example uses Flashbots and Builder0x69. There are many more, and the reader should modify the list of relays (and the associated HTTP calls and headers) to integrate with those services.
Access Lists
This project integrates an access list generator that will use the eth_createAccessList
endpoint of your geth (or compatible) node.
In testing I’ve found that the generated access list occasionally comes back much lower than expected. Profitability depends heavily on gas use, so I’ve included an “emergency relief” check that will abort the bundle if the final simulated gas usage exceeds the estimate.
Process Pool Calculation
The example uses a process pool executor to calculate arbitrage values across multiple processes. For high-scale bots this is a big performance improvement, assuming you have a capable machine with extra processing power. It will not help you on small VPS instances, for example.
Stateless Arbitrage Helper Calculation
I’ve been experimenting with stateless calculations within the UniswapLpCycle
class. I’ve come to dislike the best
attribute. It is a simple dictionary within each helper that stores the most recent arbitrage calculation results. It makes sense to store these values for an on-chain bot that may observe an arbitrage opportunity across several blocks and only broadcast the transaction when gas is appropriate. But for a transient mempool bot, there are several possible pool states associated with multiple transactions.
When using a process pool for calculation, you must maintain a mental model that keeps track of “which object am I using?” when interacting and passing data around. When I explored using the pickle module for liquidity snapshotting, I quickly realized that doing an arbitrage calculation in a separate process could get messy. Since the pickled/unpickled object in the side process is unrelated to the original helper object, it makes no sense to store the arbitrage calculation results in the best
attribute, since that object gets garbage collected after the calculation is complete. My first hacky workaround was the calculate_arbitrage_return_best
function, which just returned a copy of the best
dictionary so that the results could be passed across the pickle module.
That was just for an on-chain bot that only needed to maintain results for a single pool state. But with a mempool bot, there are multiple pool states that might occur, and maintaining references to all of these within a single object is just a bookkeeping nightmare.
So I’ve taken a different approach that solves both challenges. Rather than attempting to store results within the helper, I’ve written a new “internal” method called _calculate
that does the familiar calculation, but packages the results into a new data class called ArbitrageCalculationResult
.
This data class is defined as:
@dataclasses.dataclass(slots=True, frozen=True)
class ArbitrageCalculationResult:
id: str
input_token: Erc20Token
profit_token: Erc20Token
input_amount: int
profit_amount: int
swap_amounts: List
The advantage of this approach is that I can extend the class as needed if more values are needed later.
The class has everything I need to evaluate the profitability of this particular arbitrage, and then to generate the payloads associated with it, without needing to store anything in the helper.
If you’re still using the calculate_arbitrage
method, that’s fine. It still works without issue, and in fact defers its analysis to that same _calculate
method. It’s now a wrapper that unpacks the ArbitrageCalculationResult
at the end and stores the values in best
.
I will be transitioning to this stateless approach going forward, but will leave these wrappers in place to avoid breaking any bots built on the old code.
How Do I Use The Results?
The project takes this approach (pseudo-code):
Get arbitrage result with an overridden pool state from helper
Evaluate the result for profitability
If profitable:
Identify arbitrage helper from ID attribute
Generate payload by sending swap_amounts back to arbitrage helper
Broadcast transaction
Multi-Arbitrage Processing
A mempool transaction may pass through several pools. It is simple to identify arbitrage paths that align with these pools (see Part VII for the discussion on using Python’s set
for this task).
After identifying all possible arbitrage paths that cross the pools in that transaction, you can evaluate their profitability at the new pool states.
Once you have that, what’s next?
It’s tempting to just sort by profit and execute the first one, but that is a simplistic approach that leaves money on the table.
Consider a swap with USDC → WETH → WBTC. This crosses two pools (USDC/WETH, and WETH/WBTC). Therefore any arbitrage path that uses either of these pools is eligible for analysis.
However, some arbitrage paths may only use one of the pools. Consider a 2pool arbitrage between the USDC/WETH pool at Uniswap and Sushiswap. It doesn’t care about the WETH/WBTC leg at all. And similarly, a 2pool arbitrage between the WETH/WBTC pools does not care about the USDC/WETH leg.
So you can generate two separate arbitrage transactions from this mempool swap. The challenge is sorting them, but again Python’s set
makes this simple.
I won’t get too deep into the code here, but here is the pseudo-code that the bot uses to identify multiple arbitrage opportunities from a multi-pool transaction:
Calculate future pool states from mempool transaction
Identify all arbitrage paths that share a pool
Calculate profitability for all paths
Sort paths by gross profit, discarding profit < some threshold
While loop:
Select/save the highest profit path from all paths
Discard paths that share a pool with the selected arb
[repeat]
Generate and broadcast transaction(s) for non-overlapping paths
An improvement for this technique is to package the multiple arbitrage transactions into a single bundle instead of generating multiple bundles. Smart readers can do this with little trouble.
New Pool Events
It’s common to observe new pools being created by the various Uniswap factories. The event watcher will now generate a pool helper for these newly-generated pools inside the process_new_v2_pool_event
and process_new_v3_pool_event
functions.
It doesn’t do much with them afterwards, but will set us up for interesting stuff later.
A future improvement will be an generator that can “hot load” an arbitrage helper when a new pool is created, so a bot could scale up automatically instead of needing restarts to add new paths.
Smart Contract — Flexible V3 Callback
The executor smart contract now reads the factory address in the uniswapV3SwapCallback
function, and can verify pool addresses for pools generated by the Uniswap and Sushiswap factories.
Smart Contract — Balance Check
The executor smart contract implements a balance check in execute_payloads
that reverts if the final WETH balance is reduced after all payloads are delivered. I introduced this feature HERE.
Source Code
Pool Fetcher
ethereum_lp_fetcher_uniswapv2_json.py
import brownie
import sys
import os
import json
BROWNIE_NETWORK = "mainnet-local"
os.environ["ETHERSCAN_TOKEN"] = EDITME
# maximum blocks to process with getLogs
BLOCK_SPAN = 1_000
try:
brownie.network.connect(BROWNIE_NETWORK)
except:
sys.exit("Could not connect!")
exchanges = [
{
"name": "SushiSwap",
"filename": "ethereum_lps_sushiswapv2.json",
"factory_address": "0xC0AEe478e3658e2610c5F7A4A2E1777cE9e4f2Ac",
"factory_deployment_block": 10_794_229,
"pool_type": "SushiswapV2",
},
{
"name": "Uniswap V2",
"filename": "ethereum_lps_uniswapv2.json",
"factory_address": "0x5C69bEe701ef814a2B6a3EDD4B1652CB9cc5aA6f",
"factory_deployment_block": 10_000_835,
"pool_type": "UniswapV2",
},
]
current_block = brownie.chain.height
for name, factory_address, filename, deployment_block, pool_type in [
(
exchange["name"],
exchange["factory_address"],
exchange["filename"],
exchange["factory_deployment_block"],
exchange["pool_type"],
)
for exchange in exchanges
]:
print(f"DEX: {name}")
try:
factory_contract = brownie.Contract(factory_address)
except:
try:
factory_contract = brownie.Contract.from_explorer(factory_address)
except:
factory_contract = None
finally:
if factory_contract is None:
sys.exit("FACTORY COULD NOT BE LOADED")
try:
with open(filename) as file:
lp_data = json.load(file)
except FileNotFoundError:
lp_data = []
if lp_data:
previous_pool_count = len(lp_data)
print(f"Found previously-fetched data: {previous_pool_count} pools")
previous_block = lp_data[-1].get("block_number")
print(f"Found pool data up to block {previous_block}")
else:
previous_pool_count = 0
previous_block = deployment_block
for i in range(previous_block + 1, current_block + 1, BLOCK_SPAN):
if i + BLOCK_SPAN > current_block:
end_block = current_block
else:
end_block = i + BLOCK_SPAN
if pool_created_events := factory_contract.events.PairCreated.getLogs(
fromBlock=i, toBlock=end_block
):
for event in pool_created_events:
lp_data.append(
{
"pool_address": event.args.get("pair"),
"token0": event.args.get("token0"),
"token1": event.args.get("token1"),
"block_number": event.get("blockNumber"),
"pool_id": event.args.get(""),
"type": pool_type,
}
)
with open(filename, "w") as file:
json.dump(lp_data, file, indent=2)
print(
f"Saved {len(lp_data)} pools ({len(lp_data) - previous_pool_count} new)"
)
ethereum_lp_fetcher_uniswapv3_json.py
import brownie
import sys
import os
import json
BROWNIE_NETWORK = "mainnet-local"
os.environ["ETHERSCAN_TOKEN"] = EDITME
# maximum blocks to process with getLogs
BLOCK_SPAN = 5_000
try:
brownie.network.connect(BROWNIE_NETWORK)
except:
sys.exit("Could not connect!")
exchanges = [
{
"name": "Uniswap V3",
"filename": "ethereum_lps_uniswapv3.json",
"factory_address": "0x1F98431c8aD98523631AE4a59f267346ea31F984",
"factory_deployment_block": 12_369_621,
"pool_type": "UniswapV3",
},
{
"name": "Sushiswap V3",
"filename": "ethereum_lps_sushiswapv3.json",
"factory_address": "0xbACEB8eC6b9355Dfc0269C18bac9d6E2Bdc29C4F",
"factory_deployment_block": 16_955_547,
"pool_type": "SushiswapV3",
},
]
current_block = brownie.chain.height
for name, factory_address, filename, deployment_block, pool_type in [
(
exchange["name"],
exchange["factory_address"],
exchange["filename"],
exchange["factory_deployment_block"],
exchange["pool_type"],
)
for exchange in exchanges
]:
print(f"DEX: {name}")
try:
factory = brownie.Contract(factory_address)
except:
try:
factory = brownie.Contract.from_explorer(factory_address)
except:
factory = None
finally:
if factory is None:
sys.exit("FACTORY COULD NOT BE LOADED")
try:
with open(filename) as file:
lp_data = json.load(file)
except FileNotFoundError:
lp_data = []
if lp_data:
previous_pool_count = len(lp_data)
print(f"Found previously-fetched data: {previous_pool_count} pools")
previous_block = lp_data[-1].get("block_number")
print(f"Found pool data up to block {previous_block}")
else:
previous_pool_count = 0
previous_block = deployment_block
factory_contract = brownie.web3.eth.contract(
address=factory.address, abi=factory.abi
)
for i in range(previous_block + 1, current_block + 1, BLOCK_SPAN):
if i + BLOCK_SPAN > current_block:
end_block = current_block
else:
end_block = i + BLOCK_SPAN
if pool_created_events := factory_contract.events.PoolCreated.getLogs(
fromBlock=i, toBlock=end_block
):
for event in pool_created_events:
lp_data.append(
{
"pool_address": event.args.pool,
"fee": event.args.fee,
"token0": event.args.token0,
"token1": event.args.token1,
"block_number": event.blockNumber,
"type": pool_type,
}
)
with open(filename, "w") as file:
json.dump(lp_data, file, indent=2)
print(
f"Saved {len(lp_data)} pools ({len(lp_data) - previous_pool_count} new)"
)
Liquidity Fetcher
I’ve included an improvement here that uses a single mocked V3LiquidityPool
to do the external updates instead of building a new object each time.
ethereum_uniswapv3_liquidity_events_fetcher.py
import json
import sys
from threading import Lock
from typing import Dict
import brownie
from web3._utils.events import get_event_data
from web3._utils.filters import construct_event_filter_params
import degenbot as bot
from degenbot.uniswap.v3.v3_liquidity_pool import (
UniswapV3BitmapAtWord,
UniswapV3LiquidityAtTick,
UniswapV3PoolExternalUpdate,
)
BROWNIE_NETWORK = "mainnet-local"
SNAPSHOT_FILENAME = "ethereum_v3_liquidity_snapshot.json"
UNISWAPV3_START_BLOCK = 12_369_621
TICKSPACING_BY_FEE: Dict = {
100: 1,
500: 10,
3000: 60,
10000: 200,
}
class MockV3LiquidityPool(bot.V3LiquidityPool):
def __init__(self):
pass
def prime_pools():
print("Starting pool primer")
liquidity_snapshot: Dict[str, Dict] = {}
lp_data: Dict[str, Dict] = {}
for path in [
"ethereum_lps_sushiswapv3.json",
"ethereum_lps_uniswapv3.json",
]:
try:
with open(path, "r") as file:
l = json.load(file)
for lp in l:
lp_data[lp["pool_address"]] = lp
except Exception as e:
print(e)
try:
with open(SNAPSHOT_FILENAME, "r") as file:
json_liquidity_snapshot = json.load(file)
except:
snapshot_last_block = None
else:
snapshot_last_block = json_liquidity_snapshot.pop("snapshot_block")
print(
f"Loaded LP snapshot: {len(json_liquidity_snapshot)} pools @ block {snapshot_last_block}"
)
assert (
snapshot_last_block < newest_block
), f"Aborting, snapshot block ({snapshot_last_block}) is newer than current chain height ({newest_block})"
# Transform the JSON-encoded info from the snapshot to the dataclass
# used by V3LiquidityPool
for pool_address, snapshot in json_liquidity_snapshot.items():
liquidity_snapshot[pool_address] = {
"tick_bitmap": {
int(k): UniswapV3BitmapAtWord(**v)
for k, v in snapshot["tick_bitmap"].items()
},
"tick_data": {
int(k): UniswapV3LiquidityAtTick(**v)
for k, v in snapshot["tick_data"].items()
},
}
V3LP = brownie.web3.eth.contract(
abi=bot.uniswap.v3.abi.UNISWAP_V3_POOL_ABI
)
liquidity_events = {}
for event in [V3LP.events.Mint, V3LP.events.Burn]:
print(f"processing {event.event_name} events")
start_block = (
max(UNISWAPV3_START_BLOCK, snapshot_last_block + 1)
if snapshot_last_block is not None
else UNISWAPV3_START_BLOCK
)
block_span = 10_000
done = False
event_abi = event._get_event_abi()
while not done:
end_block = min(newest_block, start_block + block_span)
_, event_filter_params = construct_event_filter_params(
event_abi=event_abi,
abi_codec=brownie.web3.codec,
argument_filters={},
fromBlock=start_block,
toBlock=end_block,
)
try:
event_logs = brownie.web3.eth.get_logs(event_filter_params)
except:
block_span = int(0.75 * block_span)
continue
for event in event_logs:
decoded_event = get_event_data(
brownie.web3.codec, event_abi, event
)
pool_address = decoded_event["address"]
block = decoded_event["blockNumber"]
tx_index = decoded_event["transactionIndex"]
liquidity = decoded_event["args"]["amount"] * (
-1 if decoded_event["event"] == "Burn" else 1
)
tick_lower = decoded_event["args"]["tickLower"]
tick_upper = decoded_event["args"]["tickUpper"]
# skip zero liquidity events
if liquidity == 0:
continue
try:
liquidity_events[pool_address]
except KeyError:
liquidity_events[pool_address] = []
liquidity_events[pool_address].append(
(
block,
tx_index,
(
liquidity,
tick_lower,
tick_upper,
),
)
)
print(f"Fetched events: block span [{start_block},{end_block}]")
if end_block == newest_block:
done = True
else:
start_block = end_block + 1
block_span = int(1.05 * block_span)
lp_helper = MockV3LiquidityPool()
lp_helper.sparse_bitmap = False
lp_helper._liquidity_lock = Lock()
lp_helper._slot0_lock = Lock()
lp_helper._update_log = list()
for pool_address in liquidity_events.keys():
# Ignore all pool addresses not held in the LP data dict. A strange
# pool (0x820e891b14149e98b48b39ee2667157Ef750539b) was triggering an
# early termination because it had liquidity events, but was not
# associated with the known factories.
if not lp_data.get(pool_address):
continue
try:
previous_snapshot_tick_data = liquidity_snapshot[pool_address][
"tick_data"
]
except KeyError:
previous_snapshot_tick_data = {}
try:
previous_snapshot_tick_bitmap = liquidity_snapshot[pool_address][
"tick_bitmap"
]
except KeyError:
previous_snapshot_tick_bitmap = {}
lp_helper.address = "0x0000000000000000000000000000000000000000"
lp_helper.liquidity = 1 << 256
lp_helper.tick_data = previous_snapshot_tick_data
lp_helper.tick_bitmap = previous_snapshot_tick_bitmap
lp_helper.update_block = snapshot_last_block or UNISWAPV3_START_BLOCK
lp_helper.liquidity_update_block = (
snapshot_last_block or UNISWAPV3_START_BLOCK
)
lp_helper.tick = 0
lp_helper.fee = lp_data[pool_address]["fee"]
lp_helper.tick_spacing = TICKSPACING_BY_FEE[lp_helper.fee]
sorted_liquidity_events = sorted(
liquidity_events[pool_address],
key=lambda event: (event[0], event[1]),
)
for liquidity_event in sorted_liquidity_events:
(
event_block,
_,
(liquidity_delta, tick_lower, tick_upper),
) = liquidity_event
# Push the liquidity events into the mock helper
lp_helper.external_update(
update=UniswapV3PoolExternalUpdate(
block_number=event_block,
liquidity_change=(
liquidity_delta,
tick_lower,
tick_upper,
),
),
)
# After all events have been pushed, update the liquidity snapshot with
# the full liquidity data from the helper
try:
liquidity_snapshot[pool_address]
except KeyError:
liquidity_snapshot[pool_address] = {
"tick_bitmap": {},
"tick_data": {},
}
liquidity_snapshot[pool_address]["tick_bitmap"].update(
lp_helper.tick_bitmap
)
liquidity_snapshot[pool_address]["tick_data"].update(
lp_helper.tick_data
)
for pool_address in liquidity_snapshot:
# Convert all liquidity data to JSON format so it can be exported
liquidity_snapshot[pool_address] = {
"tick_data": {
key: value.to_dict()
for key, value in liquidity_snapshot[pool_address][
"tick_data"
].items()
},
"tick_bitmap": {
key: value.to_dict()
for key, value in liquidity_snapshot[pool_address][
"tick_bitmap"
].items()
if value.bitmap # skip empty bitmaps
},
}
liquidity_snapshot["snapshot_block"] = newest_block
with open(SNAPSHOT_FILENAME, "w") as file:
json.dump(
liquidity_snapshot,
file,
indent=2,
sort_keys=True,
)
print("Wrote LP snapshot")
try:
brownie.network.connect(BROWNIE_NETWORK)
except:
sys.exit(
"Could not connect! Verify your Brownie network settings using 'brownie networks list'"
)
newest_block = brownie.chain.height
if __name__ == "__main__":
prime_pools()
print("Complete")
Pool Parser
ethereum_parser_2pool.py
import json
import web3
import networkx as nx
import itertools
WETH_ADDRESS = "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"
BLACKLISTED_TOKENS = [
# add addresses here if you want to exclude a token from consideration during pathfinding
# e.g.
# "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48", # USDC
# "0xdAC17F958D2ee523a2206206994597C13D831ec7", # USDT
]
sushiv2_lp_data = {}
univ2_lp_data = {}
sushiv3_lp_data = {}
univ3_lp_data = {}
with open("ethereum_lps_sushiswapv2.json") as file:
for pool in json.load(file):
sushiv2_lp_data[pool.get("pool_address")] = {
key: value for key, value in pool.items() if key not in ["pool_id"]
}
print(f"Found {len(sushiv2_lp_data)} Sushiswap V2 pools")
with open("ethereum_lps_uniswapv2.json") as file:
for pool in json.load(file):
univ2_lp_data[pool.get("pool_address")] = {
key: value for key, value in pool.items() if key not in ["pool_id"]
}
print(f"Found {len(univ2_lp_data)} Uniswap V2 pools")
with open("ethereum_lps_sushiswapv3.json") as file:
for pool in json.load(file):
sushiv3_lp_data[pool.get("pool_address")] = {
key: value
for key, value in pool.items()
if key not in ["block_number"]
}
print(f"Found {len(sushiv3_lp_data)} Sushiswap V3 pools")
with open("ethereum_lps_uniswapv3.json") as file:
for pool in json.load(file):
univ3_lp_data[pool.get("pool_address")] = {
key: value
for key, value in pool.items()
if key not in ["block_number"]
}
print(f"Found {len(univ3_lp_data)} Uniswap V3 pools")
# build the graph with tokens as nodes, adding an edge
# between any two tokens held by a liquidity pool
G = nx.MultiGraph()
for pool in univ2_lp_data.values():
G.add_edge(
pool.get("token0"),
pool.get("token1"),
lp_address=pool.get("pool_address"),
pool_type="UniswapV2",
)
for pool in sushiv2_lp_data.values():
G.add_edge(
pool.get("token0"),
pool.get("token1"),
lp_address=pool.get("pool_address"),
pool_type="SushiswapV2",
)
for pool in univ3_lp_data.values():
G.add_edge(
pool.get("token0"),
pool.get("token1"),
lp_address=pool.get("pool_address"),
pool_type="UniswapV3",
)
for pool in sushiv3_lp_data.values():
G.add_edge(
pool.get("token0"),
pool.get("token1"),
lp_address=pool.get("pool_address"),
pool_type="SushiswapV3",
)
# delete nodes for blacklisted tokens
G.remove_nodes_from(BLACKLISTED_TOKENS)
print(f"G ready: {len(G.nodes)} nodes, {len(G.edges)} edges")
all_tokens_with_weth_pool = list(G.neighbors(WETH_ADDRESS))
print(f"Found {len(all_tokens_with_weth_pool)} tokens with a WETH pair")
print("*** Finding two-pool arbitrage paths ***")
two_pool_arb_paths = {}
for token in all_tokens_with_weth_pool:
pools = G.get_edge_data(token, WETH_ADDRESS).values()
# skip tokens with only one pool
if len(pools) < 2:
continue
for pool_a, pool_b in itertools.permutations(pools, 2):
pool_a_address = pool_a["lp_address"]
pool_b_address = pool_b["lp_address"]
pool_a_type = pool_a["pool_type"]
pool_b_type = pool_b["pool_type"]
if pool_a_type == "UniswapV2":
pool_a_dict = univ2_lp_data[pool_a_address]
elif pool_a_type == "SushiswapV2":
pool_a_dict = sushiv2_lp_data[pool_a_address]
elif pool_a_type == "UniswapV3":
pool_a_dict = univ3_lp_data[pool_a_address]
elif pool_a_type == "SushiswapV3":
pool_a_dict = sushiv3_lp_data[pool_a_address]
else:
raise Exception(f"could not identify pool {pool_a}")
if pool_b_type == "UniswapV2":
pool_b_dict = univ2_lp_data[pool_b_address]
elif pool_b_type == "SushiswapV2":
pool_b_dict = sushiv2_lp_data[pool_b_address]
elif pool_b_type == "UniswapV3":
pool_b_dict = univ3_lp_data[pool_b_address]
elif pool_b_type == "SushiswapV3":
pool_b_dict = sushiv3_lp_data[pool_b_address]
else:
raise Exception(f"could not identify pool {pool_b}")
two_pool_arb_paths[id] = {
"id": (
id := web3.Web3.keccak(
hexstr="".join(
[
pool_a.get("lp_address")[2:],
pool_b.get("lp_address")[2:],
]
)
).hex()
),
"pools": {
pool_a.get("lp_address"): pool_a_dict,
pool_b.get("lp_address"): pool_b_dict,
},
"arb_types": ["cycle", "flash_borrow_lp_swap"],
"path": [pool.get("lp_address") for pool in [pool_a, pool_b]],
}
print(f"Found {len(two_pool_arb_paths)} unique two-pool arbitrage paths")
print("• Saving arb paths to JSON")
with open("ethereum_arbs_2pool.json", "w") as file:
json.dump(two_pool_arb_paths, file, indent=2)
ethereum_parser_3pool.py
import json
import web3
import networkx as nx
import itertools
import time
WETH_ADDRESS = "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"
start_timer = time.perf_counter()
BLACKLISTED_TOKENS = [
# "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48", # USDC
# "0xdAC17F958D2ee523a2206206994597C13D831ec7", # USDT
]
v2_lp_data = {}
for filename in [
"ethereum_lps_sushiswapv2.json",
"ethereum_lps_uniswapv2.json",
]:
with open(filename) as file:
for pool in json.load(file):
v2_lp_data[pool.get("pool_address")] = {
key: value
for key, value in pool.items()
if key not in ["pool_id"]
}
print(f"Found {len(v2_lp_data)} V2 pools")
v3_lp_data = {}
for filename in [
"ethereum_lps_sushiswapv3.json",
"ethereum_lps_uniswapv3.json",
]:
with open(filename) as file:
for pool in json.load(file):
v3_lp_data[pool.get("pool_address")] = {
key: value
for key, value in pool.items()
if key not in ["block_number"]
}
print(f"Found {len(v3_lp_data)} V3 pools")
all_v2_pools = set(v2_lp_data.keys())
all_v3_pools = set(v3_lp_data.keys())
all_tokens = set(
[lp.get("token0") for lp in v2_lp_data.values()]
+ [lp.get("token1") for lp in v2_lp_data.values()]
+ [lp.get("token0") for lp in v3_lp_data.values()]
+ [lp.get("token1") for lp in v3_lp_data.values()]
)
# build the graph with tokens as nodes, adding an edge
# between any two tokens held by a liquidity pool
G = nx.MultiGraph()
for pool in v2_lp_data.values():
G.add_edge(
pool.get("token0"),
pool.get("token1"),
lp_address=pool.get("pool_address"),
pool_type="UniswapV2",
)
for pool in v3_lp_data.values():
G.add_edge(
pool.get("token0"),
pool.get("token1"),
lp_address=pool.get("pool_address"),
pool_type="UniswapV3",
)
# delete nodes for blacklisted tokens
G.remove_nodes_from(BLACKLISTED_TOKENS)
print(f"G ready: {len(G.nodes)} nodes, {len(G.edges)} edges")
all_tokens_with_weth_pool = list(G.neighbors(WETH_ADDRESS))
print(f"Found {len(all_tokens_with_weth_pool)} tokens with a WETH pair")
print("*** Finding triangular arbitrage paths ***")
triangle_arb_paths = {}
# only consider tokens with degree > 1 (number of pools holding the token)
filtered_tokens = [
token for token in all_tokens_with_weth_pool if G.degree(token) > 1
]
print(f"Processing {len(filtered_tokens)} tokens with degree > 1")
# loop through all possible token pair
for token_a, token_b in itertools.combinations(filtered_tokens, 2):
# find tokenA/tokenB pools, skip if a tokenA/tokenB pool is not found
if not G.get_edge_data(token_a, token_b):
continue
inside_pools = [
edge.get("lp_address")
for edge in G.get_edge_data(token_a, token_b).values()
]
# find tokenA/WETH pools
outside_pools_tokenA = [
edge.get("lp_address")
for edge in G.get_edge_data(token_a, WETH_ADDRESS).values()
]
# find tokenB/WETH pools
outside_pools_tokenB = [
edge.get("lp_address")
for edge in G.get_edge_data(token_b, WETH_ADDRESS).values()
]
# find all triangular arbitrage paths of form:
# tokenA/WETH -> tokenA/tokenB -> tokenB/WETH
for pool_addresses in itertools.product(
outside_pools_tokenA, inside_pools, outside_pools_tokenB
):
pool_data = {}
for pool_address in pool_addresses:
if pool_address in all_v2_pools:
pool_info = {
pool_address: {
key: value
for key, value in v2_lp_data.get(pool_address).items()
}
}
elif pool_address in all_v3_pools:
pool_info = {
pool_address: {
key: value
for key, value in v3_lp_data.get(pool_address).items()
}
}
else:
raise Exception
pool_data.update(pool_info)
triangle_arb_paths[id] = {
"id": (
id := web3.Web3.keccak(
hexstr="".join(
[pool_address[2:] for pool_address in pool_addresses]
)
).hex()
),
"path": pool_addresses,
"pools": pool_data,
}
# find all triangular arbitrage paths of form:
# tokenB/WETH -> tokenA/tokenB -> tokenA/WETH
for pool_addresses in itertools.product(
outside_pools_tokenB, inside_pools, outside_pools_tokenA
):
pool_data = {}
for pool_address in pool_addresses:
if pool_address in all_v2_pools:
pool_info = {
pool_address: {
key: value
for key, value in v2_lp_data.get(pool_address).items()
}
}
elif pool_address in all_v3_pools:
pool_info = {
pool_address: {
key: value
for key, value in v3_lp_data.get(pool_address).items()
}
}
else:
raise Exception
pool_data.update(pool_info)
triangle_arb_paths[id] = {
"id": (
id := web3.Web3.keccak(
hexstr="".join(
[pool_address[2:] for pool_address in pool_addresses]
)
).hex()
),
"path": pool_addresses,
"pools": pool_data,
}
print(
f"Found {len(triangle_arb_paths)} triangle arb paths in {time.perf_counter() - start_timer:.1f}s"
)
print("• Saving arb paths to JSON")
with open("ethereum_arbs_3pool.json", "w") as file:
json.dump(triangle_arb_paths, file, indent=2)
Smart Contract
ethereum_executor_v3_balancecheck.vy
# @version ^0.3
from vyper.interfaces import ERC20 as IERC20
interface IWETH:
def deposit(): payable
interface IUniswapV2Pool:
def factory() -> address: view
def swap(
amount0Out: uint256,
amount1Out: uint256,
to: address,
data: Bytes[1024]
): nonpayable
def token0() -> address: view
def token1() -> address: view
interface IUniswapV3Pool:
def factory() -> address: view
def fee() -> uint24: view
def tickSpacing() -> int24: view
def token0() -> address: view
def token1() -> address: view
def maxLiquidityPerTick() -> uint128: view
def swap(
recipient: address,
zeroForOne: bool,
amountSpecified: int256,
sqrtPriceLimitX96: uint160,
data: Bytes[32]
) -> (int256, int256): nonpayable
OWNER_ADDR: immutable(address)
WETH_ADDR: constant(address) = 0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2
POOL_INIT_CODE_HASH: constant(bytes32) = 0xe34f199b19b2b4f47f68442619d555527d244f78a3297ea89325f843f87b8b54
MAX_PAYLOADS: constant(uint256) = 16
MAX_PAYLOAD_BYTES: constant(uint256) = 1024
struct payload:
target: address
calldata: Bytes[MAX_PAYLOAD_BYTES]
value: uint256
@external
@payable
def __init__():
OWNER_ADDR = msg.sender
# wrap initial Ether to WETH
if msg.value > 0:
IWETH(WETH_ADDR).deposit(value=msg.value)
@external
@payable
def execute_payloads(
payloads: DynArray[payload, MAX_PAYLOADS],
balance_check: bool = True,
):
assert msg.sender == OWNER_ADDR, "!OWNER"
weth_balance_before: uint256 = empty(uint256)
if balance_check:
weth_balance_before = IERC20(WETH_ADDR).balanceOf(self)
for _payload in payloads:
raw_call(
_payload.target,
_payload.calldata,
value=_payload.value,
)
if balance_check:
assert IERC20(WETH_ADDR).balanceOf(self) > weth_balance_before, "WETH BALANCE REDUCTION"
@external
@payable
def uniswapV3SwapCallback(
amount0_delta: int256,
amount1_delta: int256,
data: Bytes[32]
):
# reject callbacks that did not originate from the owner's EOA
assert tx.origin == OWNER_ADDR, "!OWNER"
assert amount0_delta > 0 or amount1_delta > 0, "REJECTED 0 LIQUIDITY SWAP"
# get the token0/token1 addresses and fee reported by msg.sender
factory: address = IUniswapV3Pool(msg.sender).factory()
token0: address = IUniswapV3Pool(msg.sender).token0()
token1: address = IUniswapV3Pool(msg.sender).token1()
fee: uint24 = IUniswapV3Pool(msg.sender).fee()
assert msg.sender == convert(
slice(
keccak256(
concat(
0xFF,
convert(factory,bytes20),
keccak256(_abi_encode(token0, token1, fee)),
POOL_INIT_CODE_HASH,
)
),
12,
20,
),
address
), "INVALID V3 LP ADDRESS"
# repay token back to pool
if amount0_delta > 0:
IERC20(token0).transfer(msg.sender,convert(amount0_delta, uint256))
else:
IERC20(token1).transfer(msg.sender,convert(amount1_delta, uint256))
@external
@payable
def __default__():
# accept basic Ether transfers to the contract with no calldata
if len(msg.data) == 0:
return
# revert on all other calls
else:
raise
Mempool Backrun Bot
ethereum_flashbots_backrun.py