I’ve been experimenting on Arbitrum for the last month or so. It is a convenient place to practice live botting against other searchers. Uniswap V3 is deployed on Arbitrum, plus Uniswap V2 forks like Sushiswap. It is EVM-compatible so mainnet contracts can be deployed with minor modification, it’s fast, and it’s cheap!
It is by far the most popular Ethereum layer two blockchain, so you’ll find plenty of opportunity if you look into it.
I am going to share example code that will let you run and test WETH cycle arbitrage on Arbitrum. It is not optimized but is fairly well-tested and competitive enough for low-effort experimentation. I’ve profited roughly a quarter ETH over ~1000 transactions (with some hilarious misfires along the way).
You’re not going to get rich copy-pasting (remember, hundreds of other readers will be reading this same post) but it should give you a great starting place to begin modifications if you want to chase some more exotic opportunities.
I will shut the bot down after publishing this so you won’t be going against me at least.
And as always, you should inspect the code, understand how it works, and ask me questions! If you allow autonomous code to spend your ETH, you get to keep the profits AND the losses. So please be responsible and careful but remember to have some fun (this is fun, remember?)
If you don’t care about Arbitrum, the bot code is built from the Flashbots projects from a few months ago. I have continued to improve the code and have added some nice new features you’ll likely want to implement.
Arbitrum Differences Compared to Mainnet
No Mempool
Arbitrum does not have a mempool. The closest you can get is to Read the Arbitrum Sequencer Feed. The sequencer published an ordered list of transactions that will be recorded to mainnet. Each transaction is published slightly before the block is built, so you have a slight advantage compared to those watching ex post facto chain updates.
The sequencer only publishes a transaction after it has guaranteed the ordering. Since the queue cannot be re-ordered, frontrunning and sandwiching is impossible. Backrunning is still possible, but that changes the game to be entirely latency-based. The sequencer is first-come-first-serve (though a priority queue mechanism has been recently proposed), so whoever can deliver their backrun to the sequencer first will secure the profit.
If your infrastructure is not set up in a low-latency data center in the same building as the Arbitrum sequencer, you’re not going to win this game. I’m running a node from my home, which can never compete with the co-location crew, so I don’t even bother competing.
Variable Gas Use
On mainnet, gas use is a function of storage use and computation cost. Assuming the EVM has not been changed via hardfork, the gas use for a transaction will be more or less constant.
Since Arbitrum records final state to Ethereum, the cost of a transaction depends on the computation performed on the L2 as well as the storage cost on L1. The Arbitrum fee mechanism is outlined in 2-Dimensions Fees. The strange thing about Arbitrum gas is that the fee only scales with activity, but the fluctuating L1 cost is reflected in the gas use.
An Arbitrum transaction will have different gas use depending on the state of mainnet when it is executed.
Gas Cost Differences
On Ethereum, gas costs are ordered as follows:
Storage = high
Computation = medium
Calldata = low
While on Arbitrum:
Storage = medium
Computation = low
Calldata = high
Searchers are therefore incentivized to minimize calldata and execute as much of their arbitrage logic within the contract as possible. On mainnet, it is very expensive to call other contracts to fetch values, so you should pack as much into calldata as you can to minimize computation, eliminate the use of storage, and simplify control flow.
However slicing and dicing calldata on Arbitrum is cheap, and so is reading values from other contracts. You should do that!
For some more information on gas differences between L1 and L2, check out THIS excellent article from an Optimism developer.
NOTE: The executor contract below is a copy-paste of the mainnet contract, with some minor modifications. It makes no attempt at calldata minimization, so it is more expensive than it should be. I will return to this later when I attack the topic of contract optimizations.
Running an Arbitrum Nitro Node
First of all, consider running a node! You’re free to use an external RPC if you prefer, but I prefer the security and convenience of a local node wherever possible. Not having to work around rate limits is a huge win. If you only use it to avoid rate limits and cloud hosting fees, I believe you’re ahead.
My Arbitrum node currently consumes 338 GB of storage and keeps a CPU core lightly loaded.
OffchainLabs (the developers of Arbitrum) publishes their Nitro Node client to Docker Hub. The packages are all versioned with no convenient “latest” or “stable” tag, so I recommend subscribing to their GitHub repository and getting notifications for each new release (but don’t forget to manually update your Docker container afterwards).
You can set it up very simply with the following compose file. It will set up an HTTP and Websocket endpoint, retrieve L1 state data from the mainnet Ankr RPC, and attempt to generate its state data from a snapshot hosted by Arbitrum.
BUGFIX 2023-05-01: A user in Discord has reported that newer state snapshots are not extracted correctly. Please review the short demonstration after the compose file below to see how to work around this until it is fixed.
NOTE: If you intend to connect your Nitro node to a local Ethereum node also running in Docker, specify the L1 URL as the container name for your Ethereum node (mine is named geth
). Also, be sure to place the two containers in the same network. You can do this by adding the Nitro node to the same docker-compose.yml
file, or by specifying a particular network name for the Nitro node.
arbitrum-node/docker-compose.yml
version: "2"
services:
nitro:
image: docker.io/offchainlabs/nitro-node:v2.0.13-174496c
container_name: nitro
restart: unless-stopped
ports:
- "8547:8547/tcp"
- "8548:8548/tcp"
command:
- --http.port=8547
- --http.addr=0.0.0.0
- --http.corsdomain=*
- --http.vhosts=*
- --http.api=net,web3,eth
- --ws.port=8548
- --ws.addr=0.0.0.0
- --ws.origins=*
- --ws.api=net,web3,eth
# change this to localhost or a container name if you're running
# a local Ethereum node
- --l1.url=https://rpc.ankr.com/eth
- --l2.chain-id=42161
- --init.url=https://snapshot.arbitrum.io/mainnet/nitro.tar
volumes:
- nitro_node_data:/home/user/.arbitrum
volumes:
nitro_node_data:
Snapshot Hotfix
EDIT: Arbitrum has fixed their snapshots, so the below workaround is no longer necessary.
I have not figured out the source of this bug, but recent snapshots from the Arbitrum website do not initialize correctly into the data volume. I will report this to the devs, but I cannot say when it will be fixed.
I’ve developed a workaround. The workaround is to bring the docker compose project up (but not start it), which creates the appropriate data volume. Then unpack the nitro.tar
archive into it, adjust permissions to match the UID/GID of the user that the Docker container uses (UID 1000 / GID 1000), then finally start the container which will find the archived data and resume from there as normal.
Here is a demonstration of the workaround on a virtual machine running Pop_OS (a Ubuntu variant):
test@pop-os:~$ sudo docker-compose up --no-start
Creating network "test_default" with the default driver
Creating nitro ... done
test@pop-os:~$ wget https://snapshot.arbitrum.io/mainnet/nitro.tar
[wait for download to complete, roughly 230 GB]
test@pop-os:~$ sudo su -
root@pop-os:~# cd /var/lib/docker/volumes/nitro_node_data/_data/
root@pop-os:/var/lib/docker/volumes/nitro_node_data/_data# mkdir arb1
root@pop-os:/var/lib/docker/volumes/nitro_node_data/_data# tar xvf ~test/nitro.tar -C arb1
[the snapshot will be extracted]
root@pop-os:/var/lib/docker/volumes/nitro_node_data/_data# chown -R 1000:1000 arb1
root@pop-os:/var/lib/docker/volumes/nitro_node_data/_data# exit
test@pop-os:~$ sudo docker-compose up
[here is some output from a successful startup]
Starting nitro ... done
Attaching to nitro
nitro | INFO [05-02|03:58:36.196] created jwt file fileName=/home/user/.arbitrum/arb1/nitro/jwtsecret
nitro | INFO [05-02|03:58:36.196] Running Arbitrum nitro node revision=v2.0.13-174496c vcs.time=2023-03-28T17:34:23-06:00
nitro | INFO [05-02|03:58:36.196] connected to l1 chain l1url=https://rpc.ankr.com/eth l1chainid=1
nitro | INFO [05-02|03:58:36.197] Allocated cache and file handles database=/home/user/.arbitrum/arb1/nitro/l2chaindata cache=16.00MiB handles=16 readonly=true
nitro | INFO [05-02|03:58:39.750] Found legacy ancient chain path location=/home/user/.arbitrum/arb1/nitro/l2chaindata/ancient
nitro | INFO [05-02|03:58:39.754] Opened ancient database database=/home/user/.arbitrum/arb1/nitro/l2chaindata/ancient readonly=true
nitro | INFO [05-02|03:58:39.797] Allocated cache and file handles database=/home/user/.arbitrum/arb1/nitro/l2chaindata cache=2.00GiB handles=512
nitro | INFO [05-02|03:58:43.224] Found legacy ancient chain path location=/home/user/.arbitrum/arb1/nitro/l2chaindata/ancient
nitro | INFO [05-02|03:58:43.226] Opened ancient database database=/home/user/.arbitrum/arb1/nitro/l2chaindata/ancient readonly=false
nitro | INFO [05-02|03:58:44.237] Loaded most recent local header number=22,207,816 hash=a903d8..20d8fe td=0 age=8mo3d13h
nitro | INFO [05-02|03:58:44.237] Loaded most recent local full block number=22,207,817 hash=7d237d..c07986 td=1 age=8mo3d13h
nitro | INFO [05-02|03:58:44.237] Loaded most recent local fast block number=22,207,816 hash=a903d8..20d8fe td=0 age=8mo3d13h
nitro | INFO [05-02|03:58:44.242] Allocated cache and file handles database=/home/user/.arbitrum/arb1/nitro/arbitrumdata cache=16.00MiB handles=16
nitro | INFO [05-02|03:58:44.257] Allocated cache and file handles database=/home/user/.arbitrum/arb1/nitro/classic-msg cache=16.00MiB handles=16 readonly=true
nitro | INFO [05-02|03:58:44.282] Upgrading chain index type=bloombits percentage=0
nitro | INFO [05-02|03:58:44.283] Starting peer-to-peer node instance=nitro/vv2.0.13-174496c/linux-amd64/go1.19.7
nitro | WARN [05-02|03:58:44.283] P2P server will be useless, neither dialing nor listening
nitro | INFO [05-02|03:58:44.294] Loaded JWT secret file path=/home/user/.arbitrum/arb1/nitro/jwtsecret crc32=0xfb35acf4
nitro | INFO [05-02|03:58:44.294] HTTP server started endpoint=[::]:8547 auth=false prefix= cors=* vhosts=*
nitro | INFO [05-02|03:58:44.294] WebSocket enabled url=ws://[::]:8548
nitro | INFO [05-02|03:58:44.294] WebSocket enabled url=ws://127.0.0.1:8549
nitro | INFO [05-02|03:58:44.294] HTTP server started endpoint=127.0.0.1:8549 auth=true prefix= cors=localhost vhosts=
nitro | INFO [05-02|03:58:44.295] New local node record seq=1,682,999,924,294 id=30fe73f1a6f560a9 ip=127.0.0.1 udp=0 tcp=0
nitro | INFO [05-02|03:58:44.295] Started P2P networking self=enode://f00f453b94a4b1d7416c32d5a6dcf4c708d7cfb46be5293743d452e08f9cea4fb0f23561eeeeda1de60370460937646e88343af03357a41aa905e6d7b78a13af@127.0.0.1:0
nitro | WARN [05-02|03:58:44.298] Error reading unclean shutdown markers error="leveldb: not found"
nitro | INFO [05-02|03:58:44.298] InboxTracker SequencerBatchCount=0
nitro | WARN [05-02|03:58:44.781] empty sequencer message
nitro | WARN [05-02|03:58:44.781] reading virtual delayed message segment delayedMessagesRead=0 afterDelayedMessages=1
nitro | INFO [05-02|03:58:44.781] InboxTracker sequencerBatchCount=1 messageCount=1 l1Block=15,411,056 l1Timestamp=2022-08-25T20:05:44+0000
nitro | INFO [05-02|03:58:52.069] InboxTracker sequencerBatchCount=2 messageCount=722 l1Block=15,447,728 l1Timestamp=2022-08-31T16:43:24+0000
nitro | INFO [05-02|03:58:52.551] Upgrading chain index type=bloombits percentage=2
nitro | INFO [05-02|03:58:52.951] created block l2Block=22,207,818 l2BlockHash=d1882c..7f8e2f l1Block=15,447,473 l1Timestamp=2022-08-31T15:45:26+0000
nitro | INFO [05-02|03:58:53.953] created block l2Block=22,207,826 l2BlockHash=004362..f199d0 l1Block=15,447,567 l1Timestamp=2022-08-31T16:08:14+0000
nitro | INFO [05-02|03:58:53.956] Indexing transactions blocks=18463 txs=20971 tail=22,189,052 total=22,207,515 elapsed=1.004s
[...]
NOTE: The compose file above uses native Docker volumes, which are stored in /var/lib/docker/volumes
on a Linux system. If you are running on MacOS or Windows, adjust the locations and exact commands as needed to extract the snapshot to the correct location and set the permissions correctly.
Executor Contract
The contract implements a simple V3 callback that checks the pool address and repays a swap if a valid callback is made. Nothing exotic here beyond cleaning up some external calls by using an interface instead of raw_call
.
arbitrum_executor_v3.vy
# @version ^0.3
from vyper.interfaces import ERC20 as IERC20
interface IWETH:
def deposit(): payable
interface IUniswapV3Pool:
def factory() -> address: view
def fee() -> uint24: view
def tickSpacing() -> int24: view
def token0() -> address: view
def token1() -> address: view
def maxLiquidityPerTick() -> uint128: view
OWNER_ADDR: immutable(address)
WETH_ADDR: constant(address) = 0x82aF49447D8a07e3bd95BD0d56f35241523fBab1
V3_FACTORY: constant(address) = 0x1F98431c8aD98523631AE4a59f267346ea31F984
MAX_PAYLOADS: constant(uint256) = 16
MAX_PAYLOAD_BYTES: constant(uint256) = 1024
struct payload:
target: address
calldata: Bytes[MAX_PAYLOAD_BYTES]
value: uint256
@external
@payable
def __init__():
OWNER_ADDR = msg.sender
# wrap initial Ether to WETH
if msg.value > 0:
IWETH(WETH_ADDR).deposit(value=msg.value)
@external
@payable
def execute_payloads(
payloads: DynArray[payload, MAX_PAYLOADS],
):
assert msg.sender == OWNER_ADDR, "!OWNER"
for _payload in payloads:
raw_call(
_payload.target,
_payload.calldata,
value=_payload.value,
)
@internal
@pure
def verifyCallback(
tokenA: address,
tokenB: address,
fee: uint24
) -> address:
token0: address = tokenA
token1: address = tokenB
if convert(tokenA,uint160) > convert(tokenB,uint160):
token0 = tokenB
token1 = tokenA
return convert(
slice(
convert(
convert(
keccak256(
concat(
b'\xFF',
convert(V3_FACTORY,bytes20),
keccak256(
_abi_encode(
token0,
token1,
fee
)
),
0xe34f199b19b2b4f47f68442619d555527d244f78a3297ea89325f843f87b8b54,
)
),
uint256
),
bytes32
),
12,
20,
),
address
)
@external
def uniswapV3SwapCallback(
amount0_delta: int256,
amount1_delta: int256,
data: Bytes[32]
):
assert amount0_delta > 0 or amount1_delta > 0, "REJECTED 0 LIQUIDITY SWAP"
# get the token0/token1 addresses and fee reported by msg.sender
token0: address = IUniswapV3Pool(msg.sender).token0()
token1: address = IUniswapV3Pool(msg.sender).token1()
fee: uint24 = IUniswapV3Pool(msg.sender).fee()
assert msg.sender == self.verifyCallback(token0,token1,fee), "!V3LP"
# transfer token back to pool
if amount0_delta > 0:
IERC20(token0).transfer(msg.sender,convert(amount0_delta, uint256))
elif amount1_delta > 0:
IERC20(token1).transfer(msg.sender,convert(amount1_delta, uint256))
@external
@payable
def __default__():
# accept basic Ether transfers to the contract with no calldata
if len(msg.data) == 0:
return
# revert on all other calls
else:
raise
LP Fetcher and Parser Scripts
arbitrum_lp_fetcher_uniswapv2_json.py
from brownie import network, Contract, chain
import sys
import json
import time
BROWNIE_NETWORK = "arbitrum-local"
# starting # of blocks to request with getLogs
BLOCK_SPAN = 5_000
try:
network.connect(BROWNIE_NETWORK)
except:
sys.exit("Could not connect!")
exchanges = [
{
"name": "SushiSwap",
"filename": "arbitrum_sushiswap_lps.json",
"factory_address": "0xc35DADB65012eC5796536bD9864eD8773aBc74C4",
"factory_deployment_block": 70,
},
]
newest_block = chain.height
for name, factory_address, filename, deployment_block in [
(
exchange["name"],
exchange["factory_address"],
exchange["filename"],
exchange["factory_deployment_block"],
)
for exchange in exchanges
]:
print(f"DEX: {name}")
try:
factory_contract = Contract(factory_address)
except:
try:
factory_contract = Contract.from_explorer(factory_address)
except:
factory_contract = None
finally:
if factory_contract is None:
sys.exit("FACTORY COULD NOT BE LOADED")
try:
with open(filename) as file:
lp_data = json.load(file)
except FileNotFoundError:
lp_data = []
if lp_data:
previous_pool_count = len(lp_data)
print(f"Found previously-fetched data: {previous_pool_count} pools")
previous_block = lp_data[-1].get("block_number")
print(f"Found pool data up to block {previous_block}")
else:
previous_pool_count = 0
previous_block = deployment_block
failure = False
start_block = previous_block + 1
while True:
if failure:
# reduce the working span by 10%
BLOCK_SPAN = int(0.9 * BLOCK_SPAN)
else:
# increase the working span by .1%
BLOCK_SPAN = int(1.001 * BLOCK_SPAN)
end_block = min(newest_block, start_block + BLOCK_SPAN)
if end_block > newest_block:
end_block = newest_block
elif end_block == newest_block:
break
try:
pool_created_events = factory_contract.events.PairCreated.getLogs(
fromBlock=start_block, toBlock=end_block
)
except ValueError as e:
failure = True
time.sleep(1)
continue
else:
print(
f"Fetched PairCreated events, block range [{start_block},{end_block}]"
)
# set the next start block
start_block = end_block + 1
failure = False
for event in pool_created_events:
lp_data.append(
{
"pool_address": event.args.get("pair"),
"token0": event.args.get("token0"),
"token1": event.args.get("token1"),
"block_number": event.get("blockNumber"),
"pool_id": event.args.get(""),
"type": "UniswapV2",
}
)
with open(filename, "w") as file:
json.dump(lp_data, file, indent=2)
print(f"Saved {len(lp_data) - previous_pool_count} new pools")
arbitrum_lp_fetcher_uniswapv3_json.py
from brownie import network, Contract, chain
import sys
import json
import time
BROWNIE_NETWORK = "arbitrum-local"
# starting block span to process with getLogs
BLOCK_SPAN = 5_000
try:
network.connect(BROWNIE_NETWORK)
except:
sys.exit("Could not connect!")
exchanges = [
{
"name": "Uniswap V3",
"filename": "arbitrum_uniswapv3_lps.json",
"factory_address": "0x1f98431c8ad98523631ae4a59f267346ea31f984",
"factory_deployment_block": 165,
},
]
newest_block = chain.height
for name, factory_address, filename, deployment_block in [
(
exchange["name"],
exchange["factory_address"],
exchange["filename"],
exchange["factory_deployment_block"],
)
for exchange in exchanges
]:
print(f"DEX: {name}")
try:
factory_contract = Contract(factory_address)
except:
try:
factory_contract = Contract.from_explorer(factory_address)
except:
factory_contract = None
finally:
if factory_contract is None:
sys.exit("FACTORY COULD NOT BE LOADED")
try:
with open(filename) as file:
lp_data = json.load(file)
except FileNotFoundError:
lp_data = []
if lp_data:
previous_pool_count = len(lp_data)
print(f"Found previously-fetched data: {previous_pool_count} pools")
previous_block = lp_data[-1].get("block_number")
print(f"Found pool data up to block {previous_block}")
else:
previous_pool_count = 0
previous_block = deployment_block
failure = False
start_block = previous_block + 1
while True:
if failure:
BLOCK_SPAN = int(0.9 * BLOCK_SPAN)
# reduce the working span by 10%
else:
# increase the working span by .1%
BLOCK_SPAN = int(1.001 * BLOCK_SPAN)
end_block = min(newest_block, start_block + BLOCK_SPAN)
if end_block > newest_block:
end_block = newest_block
elif end_block == newest_block:
break
try:
pool_created_events = factory_contract.events.PoolCreated.getLogs(
fromBlock=start_block, toBlock=end_block
)
except ValueError as e:
failure = True
time.sleep(1)
continue
else:
print(
f"Fetched PoolCreated events, block range [{start_block},{end_block}]"
)
# set the next start block
start_block = end_block + 1
failure = False
# print(pool_created_events)
for event in pool_created_events:
lp_data.append(
{
"pool_address": event.args.get("pool"),
"fee": event.args.get("fee"),
"token0": event.args.get("token0"),
"token1": event.args.get("token1"),
"block_number": event.get("blockNumber"),
"type": "UniswapV3",
}
)
with open(filename, "w") as file:
json.dump(lp_data, file, indent=2)
print(f"Saved {len(lp_data) - previous_pool_count} new pools")
arbitrum_lp_fetcher_camelot_json.py
from brownie import network, Contract, chain
import sys
import json
import time
BROWNIE_NETWORK = "arbitrum-local"
# starting # of blocks to request with getLogs
BLOCK_SPAN = 5_000
try:
network.connect(BROWNIE_NETWORK)
except:
sys.exit("Could not connect!")
exchanges = [
{
"name": "Camelot",
"filename": "arbitrum_camelot_lps.json",
"factory_address": "0x6EcCab422D763aC031210895C81787E87B43A652",
"factory_deployment_block": 35061163,
},
]
newest_block = chain.height
for name, factory_address, filename, deployment_block in [
(
exchange["name"],
exchange["factory_address"],
exchange["filename"],
exchange["factory_deployment_block"],
)
for exchange in exchanges
]:
print(f"DEX: {name}")
try:
factory_contract = Contract(factory_address)
except:
try:
factory_contract = Contract.from_explorer(factory_address)
except:
factory_contract = None
finally:
if factory_contract is None:
sys.exit("FACTORY COULD NOT BE LOADED")
try:
with open(filename) as file:
lp_data = json.load(file)
except FileNotFoundError:
lp_data = []
if lp_data:
previous_pool_count = len(lp_data)
print(f"Found previously-fetched data: {previous_pool_count} pools")
previous_block = lp_data[-1].get("block_number")
print(f"Found pool data up to block {previous_block}")
else:
previous_pool_count = 0
previous_block = deployment_block
failure = False
start_block = previous_block + 1
while True:
if failure:
# reduce the working span by 10%
BLOCK_SPAN = int(0.9 * BLOCK_SPAN)
else:
# increase the working span by .1%
BLOCK_SPAN = int(1.001 * BLOCK_SPAN)
end_block = min(newest_block, start_block + BLOCK_SPAN)
if end_block > newest_block:
end_block = newest_block
elif end_block == newest_block:
break
try:
pool_created_events = factory_contract.events.PairCreated.getLogs(
fromBlock=start_block, toBlock=end_block
)
except ValueError as e:
failure = True
time.sleep(1)
continue
else:
print(
f"Fetched PairCreated events, block range [{start_block},{end_block}]"
)
# set the next start block
start_block = end_block + 1
failure = False
for event in pool_created_events:
lp_data.append(
{
"pool_address": event.args.get("pair"),
"token0": event.args.get("token0"),
"token1": event.args.get("token1"),
"block_number": event.get("blockNumber"),
"pool_id": event.args.get("length"),
"type": "CamelotV2",
}
)
with open(filename, "w") as file:
json.dump(lp_data, file, indent=2)
print(f"Saved {len(lp_data) - previous_pool_count} new pools")
arbitrum_uniswapv3_liquidity_events_fetcher.py
import asyncio
import json
import sys
import brownie
import web3
from web3._utils.filters import construct_event_filter_params
from web3._utils.events import get_event_data
import degenbot as bot
NODE_URI_HTTP = "http://localhost:8547"
NODE_URI_WS = "ws://localhost:8548"
BROWNIE_NETWORK = "arbitrum-local"
UNISWAPV3_START_BLOCK = 165
async def prime_pools():
print("Starting pool primer")
liquidity_snapshot = {}
try:
with open("arbitrum_liquidity_snapshot.json", "r") as file:
json_liquidity_snapshot = json.load(file)
except:
snapshot_last_block = None
else:
snapshot_last_block = json_liquidity_snapshot["snapshot_block"]
print(
f"Loaded LP snapshot: {len(json_liquidity_snapshot)} pools @ block {snapshot_last_block}"
)
assert (
snapshot_last_block < newest_block
), f"Aborting, snapshot block ({snapshot_last_block}) is newer than current chain height ({newest_block})"
for pool_address, snapshot in [
(k, v)
for k, v in json_liquidity_snapshot.items()
if k not in ["snapshot_block"]
]:
liquidity_snapshot[pool_address] = {
"tick_bitmap": {
int(k): v for k, v in snapshot["tick_bitmap"].items()
},
"tick_data": {
int(k): v for k, v in snapshot["tick_data"].items()
},
}
V3LP = w3.eth.contract(abi=bot.uniswap.v3.abi.UNISWAP_V3_POOL_ABI)
liquidity_events = {}
for event in [V3LP.events.Mint, V3LP.events.Burn]:
print(f"processing {event.event_name} events")
start_block = (
max(UNISWAPV3_START_BLOCK, snapshot_last_block + 1)
if snapshot_last_block is not None
else UNISWAPV3_START_BLOCK
)
block_span = 25_000
done = False
event_abi = event._get_event_abi()
while not done:
end_block = min(newest_block, start_block + block_span)
_, event_filter_params = construct_event_filter_params(
event_abi=event_abi,
abi_codec=w3.codec,
argument_filters={},
fromBlock=start_block,
toBlock=end_block,
)
try:
event_logs = w3.eth.get_logs(event_filter_params)
except:
print(f"TIMEOUT: {block_span=}")
block_span = int(0.5 * block_span)
continue
for event in event_logs:
decoded_event = get_event_data(w3.codec, event_abi, event)
pool_address = decoded_event["address"]
block = decoded_event["blockNumber"]
tx_index = decoded_event["transactionIndex"]
liquidity = decoded_event["args"]["amount"] * (
-1 if decoded_event["event"] == "Burn" else 1
)
tick_lower = decoded_event["args"]["tickLower"]
tick_upper = decoded_event["args"]["tickUpper"]
# skip zero liquidity events
if liquidity == 0:
continue
try:
liquidity_events[pool_address]
except KeyError:
liquidity_events[pool_address] = []
liquidity_events[pool_address].append(
(
block,
tx_index,
(
liquidity,
tick_lower,
tick_upper,
),
)
)
print(f"Fetched events: block span [{start_block},{end_block}]")
if end_block == newest_block:
done = True
else:
start_block = end_block + 1
block_span = int(1.01 * block_span)
for pool_address in liquidity_events.keys():
try:
snapshot_tick_data = liquidity_snapshot[pool_address]["tick_data"]
except KeyError:
snapshot_tick_data = {}
try:
snapshot_tick_bitmap = liquidity_snapshot[pool_address][
"tick_bitmap"
]
except KeyError:
snapshot_tick_bitmap = {}
try:
lp_helper = bot.V3LiquidityPool(
address=pool_address,
silent=True,
tick_data=snapshot_tick_data,
tick_bitmap=snapshot_tick_bitmap,
)
except Exception as e:
# print(f"EXCEPTION - pool {pool_address}: {e}")
continue
sorted_liquidity_events = sorted(
liquidity_events[pool_address],
key=lambda event: (event[0], event[1]),
)
for liquidity_event in sorted_liquidity_events:
(
event_block,
_,
(liquidity_delta, tick_lower, tick_upper),
) = liquidity_event
lp_helper.external_update(
updates={
"liquidity_change": (
liquidity_delta,
tick_lower,
tick_upper,
)
},
block_number=event_block,
fetch_missing=False,
force=True,
)
liquidity_snapshot[pool_address] = {
"tick_data": lp_helper.tick_data,
"tick_bitmap": {
key: value
for key, value in lp_helper.tick_bitmap.items()
if key != "sparse" # ignore sparse key
if value["bitmap"]
},
}
liquidity_snapshot["snapshot_block"] = newest_block
with open("arbitrum_liquidity_snapshot.json", "w") as file:
json.dump(liquidity_snapshot, file, indent=2)
print("Wrote LP snapshot")
# Create a reusable web3 object to communicate with the node
w3 = web3.Web3(web3.HTTPProvider(NODE_URI_HTTP))
try:
brownie.network.connect(BROWNIE_NETWORK)
except:
sys.exit(
"Could not connect! Verify your Brownie network settings using 'brownie networks list'"
)
newest_block = brownie.chain.height
if __name__ == "__main__":
asyncio.run(prime_pools())
print("Complete")
arbitrum_parser_2pool.py
import json
import web3
import networkx as nx
import itertools
WETH_ADDRESS = "0x82aF49447D8a07e3bd95BD0d56f35241523fBab1"
w3 = web3.Web3()
BLACKLISTED_TOKENS = [
# "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48", # USDC
# "0xdAC17F958D2ee523a2206206994597C13D831ec7", # USDT
]
sushi_v2_lp_data = {}
for filename in [
"arbitrum_sushiswap_lps.json",
]:
with open(filename) as file:
for pool in json.load(file):
sushi_v2_lp_data[pool.get("pool_address")] = {
key: value
for key, value in pool.items()
if key not in ["pool_id"]
}
print(f"Found {len(sushi_v2_lp_data)} V2 pools")
camelot_v2_lp_data = {}
for filename in [
"arbitrum_camelot_lps.json",
]:
with open(filename) as file:
for pool in json.load(file):
camelot_v2_lp_data[pool.get("pool_address")] = {
key: value
for key, value in pool.items()
if key not in ["pool_id"]
}
print(f"Found {len(camelot_v2_lp_data)} V2 pools")
v3_lp_data = {}
for filename in [
"arbitrum_uniswapv3_lps.json",
]:
with open(filename) as file:
for pool in json.load(file):
v3_lp_data[pool.get("pool_address")] = {
key: value
for key, value in pool.items()
if key not in ["block_number"]
}
print(f"Found {len(v3_lp_data)} V3 pools")
all_v2_pools = set(sushi_v2_lp_data.keys())
all_v3_pools = set(v3_lp_data.keys())
all_tokens = set(
[lp.get("token0") for lp in sushi_v2_lp_data.values()]
+ [lp.get("token1") for lp in sushi_v2_lp_data.values()]
+ [lp.get("token0") for lp in v3_lp_data.values()]
+ [lp.get("token1") for lp in v3_lp_data.values()]
)
# build the graph with tokens as nodes, adding an edge
# between any two tokens held by a liquidity pool
G = nx.MultiGraph()
for pool in sushi_v2_lp_data.values():
G.add_edge(
pool.get("token0"),
pool.get("token1"),
lp_address=pool.get("pool_address"),
pool_type="UniswapV2",
)
for pool in camelot_v2_lp_data.values():
G.add_edge(
pool.get("token0"),
pool.get("token1"),
lp_address=pool.get("pool_address"),
pool_type="CamelotV2",
)
for pool in v3_lp_data.values():
G.add_edge(
pool.get("token0"),
pool.get("token1"),
lp_address=pool.get("pool_address"),
pool_type="UniswapV3",
)
# delete nodes for blacklisted tokens
G.remove_nodes_from(BLACKLISTED_TOKENS)
print(f"G ready: {len(G.nodes)} nodes, {len(G.edges)} edges")
all_tokens_with_weth_pool = list(G.neighbors(WETH_ADDRESS))
print(f"Found {len(all_tokens_with_weth_pool)} tokens with a WETH pair")
print("*** Finding two-pool arbitrage paths ***")
two_pool_arb_paths = {}
for token in all_tokens_with_weth_pool:
pools = G.get_edge_data(token, WETH_ADDRESS).values()
# skip tokens with only one pool
if len(pools) < 2:
continue
for pool_a, pool_b in itertools.permutations(pools, 2):
if pool_a.get("pool_type") == "UniswapV2":
pool_a_dict = sushi_v2_lp_data.get(pool_a.get("lp_address"))
elif pool_a.get("pool_type") == "CamelotV2":
pool_a_dict = camelot_v2_lp_data.get(pool_a.get("lp_address"))
elif pool_a.get("pool_type") == "UniswapV3":
pool_a_dict = v3_lp_data.get(pool_a.get("lp_address"))
else:
raise Exception(f"could not identify pool {pool_a}")
if pool_b.get("pool_type") == "UniswapV2":
pool_b_dict = sushi_v2_lp_data.get(pool_b.get("lp_address"))
elif pool_b.get("pool_type") == "CamelotV2":
pool_b_dict = camelot_v2_lp_data.get(pool_b.get("lp_address"))
elif pool_b.get("pool_type") == "UniswapV3":
pool_b_dict = v3_lp_data.get(pool_b.get("lp_address"))
else:
raise Exception(f"could not identify pool {pool_b}")
two_pool_arb_paths[id] = {
"id": (
id := w3.keccak(
hexstr="".join(
[
pool_a.get("lp_address")[2:],
pool_b.get("lp_address")[2:],
]
)
).hex()
),
"pools": {
pool_a.get("lp_address"): pool_a_dict,
pool_b.get("lp_address"): pool_b_dict,
},
"arb_types": ["cycle", "flash_borrow_lp_swap"],
"path": [pool.get("lp_address") for pool in [pool_a, pool_b]],
}
print(f"Found {len(two_pool_arb_paths)} unique two-pool arbitrage paths")
print("• Saving arb paths to JSON")
with open("arbitrum_arbs_2pool.json", "w") as file:
json.dump(two_pool_arb_paths, file, indent=2)
arbitrum_parser_3pool.py
import json
import web3
import networkx as nx
import itertools
import time
WETH = "0x82aF49447D8a07e3bd95BD0d56f35241523fBab1"
w3 = web3.Web3()
start_timer = time.monotonic()
BLACKLISTED_TOKENS = [
# "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48", # USDC
# "0xdAC17F958D2ee523a2206206994597C13D831ec7", # USDT
]
v2_lp_data = {}
for filename in [
"arbitrum_sushiswap_lps.json",
]:
with open(filename) as file:
for pool in json.load(file):
v2_lp_data[pool.get("pool_address")] = {
key: value
for key, value in pool.items()
if key not in ["pool_id"]
}
print(f"Found {len(v2_lp_data)} V2 pools")
v3_lp_data = {}
for filename in [
"arbitrum_uniswapv3_lps.json",
]:
with open(filename) as file:
for pool in json.load(file):
v3_lp_data[pool.get("pool_address")] = {
key: value
for key, value in pool.items()
if key not in ["block_number"]
}
print(f"Found {len(v3_lp_data)} V3 pools")
all_v2_pools = set(v2_lp_data.keys())
all_v3_pools = set(v3_lp_data.keys())
# build the graph with tokens as nodes, adding an edge
# between any two tokens held by a liquidity pool
G = nx.MultiGraph()
for pool in v2_lp_data.values():
G.add_edge(
pool.get("token0"),
pool.get("token1"),
lp_address=pool.get("pool_address"),
pool_type="UniswapV2",
)
for pool in v3_lp_data.values():
G.add_edge(
pool.get("token0"),
pool.get("token1"),
lp_address=pool.get("pool_address"),
pool_type="UniswapV3",
)
# delete nodes for blacklisted tokens
G.remove_nodes_from(BLACKLISTED_TOKENS)
print(f"G ready: {len(G.nodes)} nodes, {len(G.edges)} edges")
all_tokens_with_weth_pool = list(G.neighbors(WETH))
print(f"Found {len(all_tokens_with_weth_pool)} tokens with a WETH pair")
print("*** Finding triangular arbitrage paths ***")
triangle_arb_paths = {}
# only consider tokens (nodes) with degree > 1, signifies that the token is held by more than one pool
filtered_tokens = [
token for token in all_tokens_with_weth_pool if G.degree(token) > 1
]
print(f"Processing {len(filtered_tokens)} tokens with degree > 1")
# loop through all possible token pair
for token_a, token_b in itertools.combinations(filtered_tokens, 2):
# find tokenA/tokenB pools, skip if a tokenA/tokenB pool is not found
if not G.get_edge_data(token_a, token_b):
continue
inside_pools = [
edge.get("lp_address")
for edge in G.get_edge_data(token_a, token_b).values()
]
# find tokenA/WETH pools
outside_pools_tokenA = [
edge.get("lp_address")
for edge in G.get_edge_data(token_a, WETH).values()
]
# find tokenB/WETH pools
outside_pools_tokenB = [
edge.get("lp_address")
for edge in G.get_edge_data(token_b, WETH).values()
]
# find all triangular arbitrage paths of form:
# tokenA/WETH -> tokenA/tokenB -> tokenB/WETH
for pool_addresses in itertools.product(
outside_pools_tokenA, inside_pools, outside_pools_tokenB
):
pool_data = {}
for pool_address in pool_addresses:
if pool_address in all_v2_pools:
pool_info = {
pool_address: {
key: value
for key, value in v2_lp_data.get(pool_address).items()
}
}
elif pool_address in all_v3_pools:
pool_info = {
pool_address: {
key: value
for key, value in v3_lp_data.get(pool_address).items()
}
}
else:
raise Exception
pool_data.update(pool_info)
triangle_arb_paths[id] = {
"id": (
id := w3.keccak(
hexstr="".join(
[pool_address[2:] for pool_address in pool_addresses]
)
).hex()
),
"path": pool_addresses,
"pools": pool_data,
}
# find all triangular arbitrage paths of form:
# tokenB/WETH -> tokenA/tokenB -> tokenA/WETH
for pool_addresses in itertools.product(
outside_pools_tokenB, inside_pools, outside_pools_tokenA
):
pool_data = {}
for pool_address in pool_addresses:
if pool_address in all_v2_pools:
pool_info = {
pool_address: {
key: value
for key, value in v2_lp_data.get(pool_address).items()
}
}
elif pool_address in all_v3_pools:
pool_info = {
pool_address: {
key: value
for key, value in v3_lp_data.get(pool_address).items()
}
}
else:
raise Exception
pool_data.update(pool_info)
triangle_arb_paths[id] = {
"id": (
id := w3.keccak(
hexstr="".join(
[pool_address[2:] for pool_address in pool_addresses]
)
).hex()
),
"path": pool_addresses,
"pools": pool_data,
}
print(
f"Found {len(triangle_arb_paths)} triangle arb paths in {time.monotonic() - start_timer:.1f}s"
)
print("• Saving pool data to JSON")
with open("arbitrum_arbs_3pool.json", "w") as file:
json.dump(triangle_arb_paths, file, indent=2)
Bot Discussion
The bot implements some nice new features that you should check out.
Graceful Shutdown — no more tracebacks when you stop a bot with CTRL+C
Camelot DEX — Camelot DEX recently launched. Camelot is a Sushiswap fork that offers split fee Uniswap V2 pools (the fee for
token0
is different fromtoken1
, but otherwise the same calculations are made). I have modified theLiquidityPool
degenbot helper to support these split fee pools, and the bot will load them if found.Liquidity Snapshots — if the bot finds a file named
arbitrum_liquidity_snapshot.json
(which is created byarbitrum_uniswapv3_liquidity_events_fetcher.py
), it will prime theV3LiquidityPool
helpers with complete liquidity information during startup, then update them to the current block height after. No more endless liquidity data fetching! This is a huge startup time improvement for bots doing V3 arbitrage.Pre-activated Arbitrage Helpers — After an arbitrage helper has been activated (generated a valid arbitrage payload which tested via eth_call without reverts), its ID will be added to a file named
arbitrum_activated_arbs.json
. If the bot finds this on startup, all arbitrage helpers matching these IDs will be moved to theactive_arbs dictionary
. They will need to be updated and recalculated for the current pool state as usual, but the gas testing (and re-testing on each startup) is now skipped.Prioritized Activation — The
activate_arbs
coroutine used to do a simple loop through all arbitrage helpers. Fine enough but simplistic, and arbs at the end of the dictionary would take a long time to be activated. Now the coroutine will prioritize activating arbitrage helpers in the following order: (1) inactive arbitage helpers with a recently-activated pool, (2) pre-activated arbitrage helpers per the above note, (3) inactive arbitrage helpers in small batches, working sequentially through the complete list. This results in finding opportunities faster after a restart, since fresh and historically good opportunities are prioritized over stale ones.“Low-Hanging Fruit” Sweeps — Instead of trying to compete with the infrastructure groups, this bot attempts to find arbitrage opportunities that they have missed. Instead of trying to submit payloads every block, it simply identifies profitable arbs every 10s (configurable of course) and submits a transaction to capture it. This has allowed me to capture a lot of very small arbs that nobody else seemed to notice (most are between 10-25 cents worth of WETH) with little risk of reverts.
NOTE: hundreds of readers now know about this and have working code to secure it, so don’t expect this to last long.
RPC Execution — Flashbots isn’t available on Arbitrum, so we have to arb the old-fashioned way. Feel free to use, adapt, and improve the
execute_arb_with_rpc
method for any blockchain where you need to submit transactions directly instead of through a 3rd party relay API.Triangle Arb Prioritization — The
watch_events
coroutine is responsible for selecting arbitrage paths after finding an update for a particular pool. 2-pool paths are straightforward, but triangle (3-pool) paths can quickly spiral out of control (since a Uniswap V3 DEX can have 4-5 pools for each pair at various fee levels, plus the V2 forks, plus the reverse paths). To cut down on excessive calculation (until I get parallel calculation implemented, another can of worms), this bot selects only 3-pool arbitrage paths where the updated pool is in the middle position. This cuts down on a lot of “nuisance” recalculations, with the assumption that an opportunity created at an edge pool will be more profitable in a 2-pool path than a 3-pool path with the middle hop close to parity.NOTE: you can modify this easily if you prefer to calculate all the opportunities.
Bot Source Code
arbitrum_cycle.py