Degen Code

Degen Code

Share this post

Degen Code
Degen Code
Arbitrum Botting For Fun and Profit
Copy link
Facebook
Email
Notes
More

Arbitrum Botting For Fun and Profit

Lessons Learned From Layer Two

Apr 25, 2023
∙ Paid
17

Share this post

Degen Code
Degen Code
Arbitrum Botting For Fun and Profit
Copy link
Facebook
Email
Notes
More
14
Share

I’ve been experimenting on Arbitrum for the last month or so. It is a convenient place to practice live botting against other searchers. Uniswap V3 is deployed on Arbitrum, plus Uniswap V2 forks like Sushiswap. It is EVM-compatible so mainnet contracts can be deployed with minor modification, it’s fast, and it’s cheap!

It is by far the most popular Ethereum layer two blockchain, so you’ll find plenty of opportunity if you look into it.

I am going to share example code that will let you run and test WETH cycle arbitrage on Arbitrum. It is not optimized but is fairly well-tested and competitive enough for low-effort experimentation. I’ve profited roughly a quarter ETH over ~1000 transactions (with some hilarious misfires along the way).

You’re not going to get rich copy-pasting (remember, hundreds of other readers will be reading this same post) but it should give you a great starting place to begin modifications if you want to chase some more exotic opportunities.

I will shut the bot down after publishing this so you won’t be going against me at least.

And as always, you should inspect the code, understand how it works, and ask me questions! If you allow autonomous code to spend your ETH, you get to keep the profits AND the losses. So please be responsible and careful but remember to have some fun (this is fun, remember?)

If you don’t care about Arbitrum, the bot code is built from the Flashbots projects from a few months ago. I have continued to improve the code and have added some nice new features you’ll likely want to implement.

Arbitrum Differences Compared to Mainnet

No Mempool

Arbitrum does not have a mempool. The closest you can get is to Read the Arbitrum Sequencer Feed. The sequencer published an ordered list of transactions that will be recorded to mainnet. Each transaction is published slightly before the block is built, so you have a slight advantage compared to those watching ex post facto chain updates.

The sequencer only publishes a transaction after it has guaranteed the ordering. Since the queue cannot be re-ordered, frontrunning and sandwiching is impossible. Backrunning is still possible, but that changes the game to be entirely latency-based. The sequencer is first-come-first-serve (though a priority queue mechanism has been recently proposed), so whoever can deliver their backrun to the sequencer first will secure the profit.

If your infrastructure is not set up in a low-latency data center in the same building as the Arbitrum sequencer, you’re not going to win this game. I’m running a node from my home, which can never compete with the co-location crew, so I don’t even bother competing.

Variable Gas Use

On mainnet, gas use is a function of storage use and computation cost. Assuming the EVM has not been changed via hardfork, the gas use for a transaction will be more or less constant.

Since Arbitrum records final state to Ethereum, the cost of a transaction depends on the computation performed on the L2 as well as the storage cost on L1. The Arbitrum fee mechanism is outlined in 2-Dimensions Fees. The strange thing about Arbitrum gas is that the fee only scales with activity, but the fluctuating L1 cost is reflected in the gas use.

An Arbitrum transaction will have different gas use depending on the state of mainnet when it is executed.

Gas Cost Differences

On Ethereum, gas costs are ordered as follows:

  • Storage = high

  • Computation = medium

  • Calldata = low

While on Arbitrum:

  • Storage = medium

  • Computation = low

  • Calldata = high

Searchers are therefore incentivized to minimize calldata and execute as much of their arbitrage logic within the contract as possible. On mainnet, it is very expensive to call other contracts to fetch values, so you should pack as much into calldata as you can to minimize computation, eliminate the use of storage, and simplify control flow.

However slicing and dicing calldata on Arbitrum is cheap, and so is reading values from other contracts. You should do that!

For some more information on gas differences between L1 and L2, check out THIS excellent article from an Optimism developer.

NOTE: The executor contract below is a copy-paste of the mainnet contract, with some minor modifications. It makes no attempt at calldata minimization, so it is more expensive than it should be. I will return to this later when I attack the topic of contract optimizations.

Running an Arbitrum Nitro Node

First of all, consider running a node! You’re free to use an external RPC if you prefer, but I prefer the security and convenience of a local node wherever possible. Not having to work around rate limits is a huge win. If you only use it to avoid rate limits and cloud hosting fees, I believe you’re ahead.

My Arbitrum node currently consumes 338 GB of storage and keeps a CPU core lightly loaded.

OffchainLabs (the developers of Arbitrum) publishes their Nitro Node client to Docker Hub. The packages are all versioned with no convenient “latest” or “stable” tag, so I recommend subscribing to their GitHub repository and getting notifications for each new release (but don’t forget to manually update your Docker container afterwards).

You can set it up very simply with the following compose file. It will set up an HTTP and Websocket endpoint, retrieve L1 state data from the mainnet Ankr RPC, and attempt to generate its state data from a snapshot hosted by Arbitrum.

BUGFIX 2023-05-01: A user in Discord has reported that newer state snapshots are not extracted correctly. Please review the short demonstration after the compose file below to see how to work around this until it is fixed.

NOTE: If you intend to connect your Nitro node to a local Ethereum node also running in Docker, specify the L1 URL as the container name for your Ethereum node (mine is named geth). Also, be sure to place the two containers in the same network. You can do this by adding the Nitro node to the same docker-compose.yml file, or by specifying a particular network name for the Nitro node.

arbitrum-node/docker-compose.yml

version: "2"

services:
  nitro:
    image: docker.io/offchainlabs/nitro-node:v2.0.13-174496c
    container_name: nitro
    restart: unless-stopped
    ports:
      - "8547:8547/tcp"
      - "8548:8548/tcp"
    command:
      - --http.port=8547
      - --http.addr=0.0.0.0
      - --http.corsdomain=*
      - --http.vhosts=*
      - --http.api=net,web3,eth
      - --ws.port=8548
      - --ws.addr=0.0.0.0
      - --ws.origins=*
      - --ws.api=net,web3,eth
      # change this to localhost or a container name if you're running 
      # a local Ethereum node
      - --l1.url=https://rpc.ankr.com/eth
      - --l2.chain-id=42161
      - --init.url=https://snapshot.arbitrum.io/mainnet/nitro.tar
    volumes:
      - nitro_node_data:/home/user/.arbitrum

volumes:
  nitro_node_data:

Snapshot Hotfix

EDIT: Arbitrum has fixed their snapshots, so the below workaround is no longer necessary.

I have not figured out the source of this bug, but recent snapshots from the Arbitrum website do not initialize correctly into the data volume. I will report this to the devs, but I cannot say when it will be fixed.

I’ve developed a workaround. The workaround is to bring the docker compose project up (but not start it), which creates the appropriate data volume. Then unpack the nitro.tar archive into it, adjust permissions to match the UID/GID of the user that the Docker container uses (UID 1000 / GID 1000), then finally start the container which will find the archived data and resume from there as normal.

Here is a demonstration of the workaround on a virtual machine running Pop_OS (a Ubuntu variant):

test@pop-os:~$ sudo docker-compose up --no-start
Creating network "test_default" with the default driver
Creating nitro ... done

test@pop-os:~$ wget https://snapshot.arbitrum.io/mainnet/nitro.tar

[wait for download to complete, roughly 230 GB]

test@pop-os:~$ sudo su -
root@pop-os:~# cd /var/lib/docker/volumes/nitro_node_data/_data/
root@pop-os:/var/lib/docker/volumes/nitro_node_data/_data# mkdir arb1
root@pop-os:/var/lib/docker/volumes/nitro_node_data/_data# tar xvf ~test/nitro.tar -C arb1

[the snapshot will be extracted]

root@pop-os:/var/lib/docker/volumes/nitro_node_data/_data# chown -R 1000:1000 arb1
root@pop-os:/var/lib/docker/volumes/nitro_node_data/_data# exit

test@pop-os:~$ sudo docker-compose up

[here is some output from a successful startup]

Starting nitro ... done
Attaching to nitro
nitro    | INFO [05-02|03:58:36.196] created jwt file                         fileName=/home/user/.arbitrum/arb1/nitro/jwtsecret
nitro    | INFO [05-02|03:58:36.196] Running Arbitrum nitro node              revision=v2.0.13-174496c vcs.time=2023-03-28T17:34:23-06:00
nitro    | INFO [05-02|03:58:36.196] connected to l1 chain                    l1url=https://rpc.ankr.com/eth l1chainid=1
nitro    | INFO [05-02|03:58:36.197] Allocated cache and file handles         database=/home/user/.arbitrum/arb1/nitro/l2chaindata cache=16.00MiB handles=16 readonly=true
nitro    | INFO [05-02|03:58:39.750] Found legacy ancient chain path          location=/home/user/.arbitrum/arb1/nitro/l2chaindata/ancient
nitro    | INFO [05-02|03:58:39.754] Opened ancient database                  database=/home/user/.arbitrum/arb1/nitro/l2chaindata/ancient readonly=true
nitro    | INFO [05-02|03:58:39.797] Allocated cache and file handles         database=/home/user/.arbitrum/arb1/nitro/l2chaindata cache=2.00GiB  handles=512
nitro    | INFO [05-02|03:58:43.224] Found legacy ancient chain path          location=/home/user/.arbitrum/arb1/nitro/l2chaindata/ancient
nitro    | INFO [05-02|03:58:43.226] Opened ancient database                  database=/home/user/.arbitrum/arb1/nitro/l2chaindata/ancient readonly=false
nitro    | INFO [05-02|03:58:44.237] Loaded most recent local header          number=22,207,816 hash=a903d8..20d8fe td=0 age=8mo3d13h
nitro    | INFO [05-02|03:58:44.237] Loaded most recent local full block      number=22,207,817 hash=7d237d..c07986 td=1 age=8mo3d13h
nitro    | INFO [05-02|03:58:44.237] Loaded most recent local fast block      number=22,207,816 hash=a903d8..20d8fe td=0 age=8mo3d13h
nitro    | INFO [05-02|03:58:44.242] Allocated cache and file handles         database=/home/user/.arbitrum/arb1/nitro/arbitrumdata cache=16.00MiB handles=16
nitro    | INFO [05-02|03:58:44.257] Allocated cache and file handles         database=/home/user/.arbitrum/arb1/nitro/classic-msg cache=16.00MiB handles=16  readonly=true
nitro    | INFO [05-02|03:58:44.282] Upgrading chain index                    type=bloombits percentage=0
nitro    | INFO [05-02|03:58:44.283] Starting peer-to-peer node               instance=nitro/vv2.0.13-174496c/linux-amd64/go1.19.7
nitro    | WARN [05-02|03:58:44.283] P2P server will be useless, neither dialing nor listening 
nitro    | INFO [05-02|03:58:44.294] Loaded JWT secret file                   path=/home/user/.arbitrum/arb1/nitro/jwtsecret crc32=0xfb35acf4
nitro    | INFO [05-02|03:58:44.294] HTTP server started                      endpoint=[::]:8547 auth=false prefix= cors=* vhosts=*
nitro    | INFO [05-02|03:58:44.294] WebSocket enabled                        url=ws://[::]:8548
nitro    | INFO [05-02|03:58:44.294] WebSocket enabled                        url=ws://127.0.0.1:8549
nitro    | INFO [05-02|03:58:44.294] HTTP server started                      endpoint=127.0.0.1:8549 auth=true  prefix= cors=localhost vhosts=
nitro    | INFO [05-02|03:58:44.295] New local node record                    seq=1,682,999,924,294 id=30fe73f1a6f560a9 ip=127.0.0.1 udp=0 tcp=0
nitro    | INFO [05-02|03:58:44.295] Started P2P networking                   self=enode://f00f453b94a4b1d7416c32d5a6dcf4c708d7cfb46be5293743d452e08f9cea4fb0f23561eeeeda1de60370460937646e88343af03357a41aa905e6d7b78a13af@127.0.0.1:0
nitro    | WARN [05-02|03:58:44.298] Error reading unclean shutdown markers   error="leveldb: not found"
nitro    | INFO [05-02|03:58:44.298] InboxTracker                             SequencerBatchCount=0
nitro    | WARN [05-02|03:58:44.781] empty sequencer message 
nitro    | WARN [05-02|03:58:44.781] reading virtual delayed message segment  delayedMessagesRead=0 afterDelayedMessages=1
nitro    | INFO [05-02|03:58:44.781] InboxTracker                             sequencerBatchCount=1 messageCount=1 l1Block=15,411,056 l1Timestamp=2022-08-25T20:05:44+0000
nitro    | INFO [05-02|03:58:52.069] InboxTracker                             sequencerBatchCount=2 messageCount=722 l1Block=15,447,728 l1Timestamp=2022-08-31T16:43:24+0000
nitro    | INFO [05-02|03:58:52.551] Upgrading chain index                    type=bloombits percentage=2
nitro    | INFO [05-02|03:58:52.951] created block                            l2Block=22,207,818 l2BlockHash=d1882c..7f8e2f l1Block=15,447,473 l1Timestamp=2022-08-31T15:45:26+0000
nitro    | INFO [05-02|03:58:53.953] created block                            l2Block=22,207,826 l2BlockHash=004362..f199d0 l1Block=15,447,567 l1Timestamp=2022-08-31T16:08:14+0000
nitro    | INFO [05-02|03:58:53.956] Indexing transactions                    blocks=18463 txs=20971 tail=22,189,052 total=22,207,515 elapsed=1.004s

[...]

NOTE: The compose file above uses native Docker volumes, which are stored in /var/lib/docker/volumes on a Linux system. If you are running on MacOS or Windows, adjust the locations and exact commands as needed to extract the snapshot to the correct location and set the permissions correctly.

Executor Contract

The contract implements a simple V3 callback that checks the pool address and repays a swap if a valid callback is made. Nothing exotic here beyond cleaning up some external calls by using an interface instead of raw_call.

arbitrum_executor_v3.vy

# @version ^0.3

from vyper.interfaces import ERC20 as IERC20

interface IWETH:    
    def deposit(): payable

interface IUniswapV3Pool:
    def factory() -> address: view
    def fee() -> uint24: view
    def tickSpacing() -> int24: view
    def token0() -> address: view
    def token1() -> address: view
    def maxLiquidityPerTick() -> uint128: view

OWNER_ADDR: immutable(address)
WETH_ADDR: constant(address) = 0x82aF49447D8a07e3bd95BD0d56f35241523fBab1
V3_FACTORY: constant(address) = 0x1F98431c8aD98523631AE4a59f267346ea31F984

MAX_PAYLOADS: constant(uint256) = 16
MAX_PAYLOAD_BYTES: constant(uint256) = 1024

struct payload:
    target: address
    calldata: Bytes[MAX_PAYLOAD_BYTES]
    value: uint256


@external
@payable
def __init__():

    OWNER_ADDR = msg.sender    
    
    # wrap initial Ether to WETH
    if msg.value > 0:
        IWETH(WETH_ADDR).deposit(value=msg.value)


@external
@payable
def execute_payloads(
    payloads: DynArray[payload, MAX_PAYLOADS],
):

    assert msg.sender == OWNER_ADDR, "!OWNER"
   
    for _payload in payloads:
        raw_call(
            _payload.target,
            _payload.calldata,
            value=_payload.value,
        )


@internal
@pure
def verifyCallback(
    tokenA: address, 
    tokenB: address, 
    fee: uint24
) -> address:   
            
    token0: address = tokenA
    token1: address = tokenB

    if convert(tokenA,uint160) > convert(tokenB,uint160):        
        token0 = tokenB
        token1 = tokenA
        
    return convert(
        slice(
            convert(
                convert(
                    keccak256(
                        concat(
                            b'\xFF',
                            convert(V3_FACTORY,bytes20),
                            keccak256(
                                _abi_encode(
                                    token0,
                                    token1,
                                    fee
                                )
                            ),
                            0xe34f199b19b2b4f47f68442619d555527d244f78a3297ea89325f843f87b8b54,
                        )
                    ),
                    uint256
                ),
                bytes32
            ),
            12,
            20,
        ),
        address
    )


@external
def uniswapV3SwapCallback(
    amount0_delta: int256, 
    amount1_delta: int256, 
    data: Bytes[32]
):
    assert amount0_delta > 0 or amount1_delta > 0, "REJECTED 0 LIQUIDITY SWAP"

    # get the token0/token1 addresses and fee reported by msg.sender   
    token0: address = IUniswapV3Pool(msg.sender).token0()
    token1: address = IUniswapV3Pool(msg.sender).token1()
    fee: uint24 = IUniswapV3Pool(msg.sender).fee()
    
    assert msg.sender == self.verifyCallback(token0,token1,fee), "!V3LP"

    # transfer token back to pool
    if amount0_delta > 0:
        IERC20(token0).transfer(msg.sender,convert(amount0_delta, uint256))        
    elif amount1_delta > 0:
        IERC20(token1).transfer(msg.sender,convert(amount1_delta, uint256))        


@external
@payable
def __default__():
    # accept basic Ether transfers to the contract with no calldata
    if len(msg.data) == 0:
        return
    
    # revert on all other calls
    else:
        raise

LP Fetcher and Parser Scripts

arbitrum_lp_fetcher_uniswapv2_json.py

from brownie import network, Contract, chain
import sys
import json
import time

BROWNIE_NETWORK = "arbitrum-local"

# starting # of blocks to request with getLogs
BLOCK_SPAN = 5_000

try:
    network.connect(BROWNIE_NETWORK)
except:
    sys.exit("Could not connect!")

exchanges = [
    {
        "name": "SushiSwap",
        "filename": "arbitrum_sushiswap_lps.json",
        "factory_address": "0xc35DADB65012eC5796536bD9864eD8773aBc74C4",
        "factory_deployment_block": 70,
    },
]

newest_block = chain.height

for name, factory_address, filename, deployment_block in [
    (
        exchange["name"],
        exchange["factory_address"],
        exchange["filename"],
        exchange["factory_deployment_block"],
    )
    for exchange in exchanges
]:

    print(f"DEX: {name}")

    try:
        factory_contract = Contract(factory_address)
    except:
        try:
            factory_contract = Contract.from_explorer(factory_address)
        except:
            factory_contract = None
    finally:
        if factory_contract is None:
            sys.exit("FACTORY COULD NOT BE LOADED")

    try:
        with open(filename) as file:
            lp_data = json.load(file)
    except FileNotFoundError:
        lp_data = []

    if lp_data:
        previous_pool_count = len(lp_data)
        print(f"Found previously-fetched data: {previous_pool_count} pools")
        previous_block = lp_data[-1].get("block_number")
        print(f"Found pool data up to block {previous_block}")
    else:
        previous_pool_count = 0
        previous_block = deployment_block

    failure = False
    start_block = previous_block + 1

    while True:

        if failure:
            # reduce the working span by 10%
            BLOCK_SPAN = int(0.9 * BLOCK_SPAN)

        else:
            # increase the working span by .1%
            BLOCK_SPAN = int(1.001 * BLOCK_SPAN)

        end_block = min(newest_block, start_block + BLOCK_SPAN)

        if end_block > newest_block:
            end_block = newest_block
        elif end_block == newest_block:
            break

        try:
            pool_created_events = factory_contract.events.PairCreated.getLogs(
                fromBlock=start_block, toBlock=end_block
            )
        except ValueError as e:
            failure = True
            time.sleep(1)
            continue
        else:
            print(
                f"Fetched PairCreated events, block range [{start_block},{end_block}]"
            )
            # set the next start block
            start_block = end_block + 1
            failure = False

            for event in pool_created_events:
                lp_data.append(
                    {
                        "pool_address": event.args.get("pair"),
                        "token0": event.args.get("token0"),
                        "token1": event.args.get("token1"),
                        "block_number": event.get("blockNumber"),
                        "pool_id": event.args.get(""),
                        "type": "UniswapV2",
                    }
                )
                with open(filename, "w") as file:
                    json.dump(lp_data, file, indent=2)

    print(f"Saved {len(lp_data) - previous_pool_count} new pools")

arbitrum_lp_fetcher_uniswapv3_json.py

from brownie import network, Contract, chain
import sys
import json
import time

BROWNIE_NETWORK = "arbitrum-local"

# starting block span to process with getLogs
BLOCK_SPAN = 5_000

try:
    network.connect(BROWNIE_NETWORK)
except:
    sys.exit("Could not connect!")

exchanges = [
    {
        "name": "Uniswap V3",
        "filename": "arbitrum_uniswapv3_lps.json",
        "factory_address": "0x1f98431c8ad98523631ae4a59f267346ea31f984",
        "factory_deployment_block": 165,
    },
]

newest_block = chain.height

for name, factory_address, filename, deployment_block in [
    (
        exchange["name"],
        exchange["factory_address"],
        exchange["filename"],
        exchange["factory_deployment_block"],
    )
    for exchange in exchanges
]:

    print(f"DEX: {name}")

    try:
        factory_contract = Contract(factory_address)
    except:
        try:
            factory_contract = Contract.from_explorer(factory_address)
        except:
            factory_contract = None
    finally:
        if factory_contract is None:
            sys.exit("FACTORY COULD NOT BE LOADED")

    try:
        with open(filename) as file:
            lp_data = json.load(file)
    except FileNotFoundError:
        lp_data = []

    if lp_data:
        previous_pool_count = len(lp_data)
        print(f"Found previously-fetched data: {previous_pool_count} pools")
        previous_block = lp_data[-1].get("block_number")
        print(f"Found pool data up to block {previous_block}")
    else:
        previous_pool_count = 0
        previous_block = deployment_block

    failure = False
    start_block = previous_block + 1

    while True:

        if failure:
            BLOCK_SPAN = int(0.9 * BLOCK_SPAN)
            # reduce the working span by 10%
        else:
            # increase the working span by .1%
            BLOCK_SPAN = int(1.001 * BLOCK_SPAN)

        end_block = min(newest_block, start_block + BLOCK_SPAN)

        if end_block > newest_block:
            end_block = newest_block
        elif end_block == newest_block:
            break

        try:
            pool_created_events = factory_contract.events.PoolCreated.getLogs(
                fromBlock=start_block, toBlock=end_block
            )
        except ValueError as e:
            failure = True
            time.sleep(1)
            continue
        else:
            print(
                f"Fetched PoolCreated events, block range [{start_block},{end_block}]"
            )
            # set the next start block
            start_block = end_block + 1
            failure = False

            # print(pool_created_events)
            for event in pool_created_events:
                lp_data.append(
                    {
                        "pool_address": event.args.get("pool"),
                        "fee": event.args.get("fee"),
                        "token0": event.args.get("token0"),
                        "token1": event.args.get("token1"),
                        "block_number": event.get("blockNumber"),
                        "type": "UniswapV3",
                    }
                )
            with open(filename, "w") as file:
                json.dump(lp_data, file, indent=2)

    print(f"Saved {len(lp_data) - previous_pool_count} new pools")

arbitrum_lp_fetcher_camelot_json.py

from brownie import network, Contract, chain
import sys
import json
import time

BROWNIE_NETWORK = "arbitrum-local"

# starting # of blocks to request with getLogs
BLOCK_SPAN = 5_000

try:
    network.connect(BROWNIE_NETWORK)
except:
    sys.exit("Could not connect!")

exchanges = [
    {
        "name": "Camelot",
        "filename": "arbitrum_camelot_lps.json",
        "factory_address": "0x6EcCab422D763aC031210895C81787E87B43A652",
        "factory_deployment_block": 35061163,
    },
]

newest_block = chain.height

for name, factory_address, filename, deployment_block in [
    (
        exchange["name"],
        exchange["factory_address"],
        exchange["filename"],
        exchange["factory_deployment_block"],
    )
    for exchange in exchanges
]:

    print(f"DEX: {name}")

    try:
        factory_contract = Contract(factory_address)
    except:
        try:
            factory_contract = Contract.from_explorer(factory_address)
        except:
            factory_contract = None
    finally:
        if factory_contract is None:
            sys.exit("FACTORY COULD NOT BE LOADED")

    try:
        with open(filename) as file:
            lp_data = json.load(file)
    except FileNotFoundError:
        lp_data = []

    if lp_data:
        previous_pool_count = len(lp_data)
        print(f"Found previously-fetched data: {previous_pool_count} pools")
        previous_block = lp_data[-1].get("block_number")
        print(f"Found pool data up to block {previous_block}")
    else:
        previous_pool_count = 0
        previous_block = deployment_block

    failure = False
    start_block = previous_block + 1

    while True:

        if failure:
            # reduce the working span by 10%
            BLOCK_SPAN = int(0.9 * BLOCK_SPAN)

        else:
            # increase the working span by .1%
            BLOCK_SPAN = int(1.001 * BLOCK_SPAN)

        end_block = min(newest_block, start_block + BLOCK_SPAN)

        if end_block > newest_block:
            end_block = newest_block
        elif end_block == newest_block:
            break

        try:
            pool_created_events = factory_contract.events.PairCreated.getLogs(
                fromBlock=start_block, toBlock=end_block
            )
        except ValueError as e:
            failure = True
            time.sleep(1)
            continue
        else:
            print(
                f"Fetched PairCreated events, block range [{start_block},{end_block}]"
            )
            # set the next start block
            start_block = end_block + 1
            failure = False

            for event in pool_created_events:

                lp_data.append(
                    {
                        "pool_address": event.args.get("pair"),
                        "token0": event.args.get("token0"),
                        "token1": event.args.get("token1"),
                        "block_number": event.get("blockNumber"),
                        "pool_id": event.args.get("length"),
                        "type": "CamelotV2",
                    }
                )
                with open(filename, "w") as file:
                    json.dump(lp_data, file, indent=2)

    print(f"Saved {len(lp_data) - previous_pool_count} new pools")

arbitrum_uniswapv3_liquidity_events_fetcher.py

import asyncio
import json
import sys
import brownie
import web3

from web3._utils.filters import construct_event_filter_params
from web3._utils.events import get_event_data

import degenbot as bot

NODE_URI_HTTP = "http://localhost:8547"
NODE_URI_WS = "ws://localhost:8548"

BROWNIE_NETWORK = "arbitrum-local"

UNISWAPV3_START_BLOCK = 165


async def prime_pools():
    print("Starting pool primer")

    liquidity_snapshot = {}

    try:
        with open("arbitrum_liquidity_snapshot.json", "r") as file:
            json_liquidity_snapshot = json.load(file)
    except:
        snapshot_last_block = None
    else:
        snapshot_last_block = json_liquidity_snapshot["snapshot_block"]
        print(
            f"Loaded LP snapshot: {len(json_liquidity_snapshot)} pools @ block {snapshot_last_block}"
        )

        assert (
            snapshot_last_block < newest_block
        ), f"Aborting, snapshot block ({snapshot_last_block}) is newer than current chain height ({newest_block})"

        for pool_address, snapshot in [
            (k, v)
            for k, v in json_liquidity_snapshot.items()
            if k not in ["snapshot_block"]
        ]:
            liquidity_snapshot[pool_address] = {
                "tick_bitmap": {
                    int(k): v for k, v in snapshot["tick_bitmap"].items()
                },
                "tick_data": {
                    int(k): v for k, v in snapshot["tick_data"].items()
                },
            }

    V3LP = w3.eth.contract(abi=bot.uniswap.v3.abi.UNISWAP_V3_POOL_ABI)

    liquidity_events = {}

    for event in [V3LP.events.Mint, V3LP.events.Burn]:
        print(f"processing {event.event_name} events")

        start_block = (
            max(UNISWAPV3_START_BLOCK, snapshot_last_block + 1)
            if snapshot_last_block is not None
            else UNISWAPV3_START_BLOCK
        )
        block_span = 25_000
        done = False

        event_abi = event._get_event_abi()

        while not done:
            end_block = min(newest_block, start_block + block_span)

            _, event_filter_params = construct_event_filter_params(
                event_abi=event_abi,
                abi_codec=w3.codec,
                argument_filters={},
                fromBlock=start_block,
                toBlock=end_block,
            )

            try:
                event_logs = w3.eth.get_logs(event_filter_params)
            except:
                print(f"TIMEOUT: {block_span=}")
                block_span = int(0.5 * block_span)
                continue

            for event in event_logs:
                decoded_event = get_event_data(w3.codec, event_abi, event)

                pool_address = decoded_event["address"]
                block = decoded_event["blockNumber"]
                tx_index = decoded_event["transactionIndex"]
                liquidity = decoded_event["args"]["amount"] * (
                    -1 if decoded_event["event"] == "Burn" else 1
                )
                tick_lower = decoded_event["args"]["tickLower"]
                tick_upper = decoded_event["args"]["tickUpper"]

                # skip zero liquidity events
                if liquidity == 0:
                    continue

                try:
                    liquidity_events[pool_address]
                except KeyError:
                    liquidity_events[pool_address] = []

                liquidity_events[pool_address].append(
                    (
                        block,
                        tx_index,
                        (
                            liquidity,
                            tick_lower,
                            tick_upper,
                        ),
                    )
                )

            print(f"Fetched events: block span [{start_block},{end_block}]")

            if end_block == newest_block:
                done = True
            else:
                start_block = end_block + 1
                block_span = int(1.01 * block_span)

    for pool_address in liquidity_events.keys():
        try:
            snapshot_tick_data = liquidity_snapshot[pool_address]["tick_data"]
        except KeyError:
            snapshot_tick_data = {}

        try:
            snapshot_tick_bitmap = liquidity_snapshot[pool_address][
                "tick_bitmap"
            ]
        except KeyError:
            snapshot_tick_bitmap = {}

        try:
            lp_helper = bot.V3LiquidityPool(
                address=pool_address,
                silent=True,
                tick_data=snapshot_tick_data,
                tick_bitmap=snapshot_tick_bitmap,
            )
        except Exception as e:
            # print(f"EXCEPTION - pool {pool_address}: {e}")
            continue

        sorted_liquidity_events = sorted(
            liquidity_events[pool_address],
            key=lambda event: (event[0], event[1]),
        )

        for liquidity_event in sorted_liquidity_events:
            (
                event_block,
                _,
                (liquidity_delta, tick_lower, tick_upper),
            ) = liquidity_event

            lp_helper.external_update(
                updates={
                    "liquidity_change": (
                        liquidity_delta,
                        tick_lower,
                        tick_upper,
                    )
                },
                block_number=event_block,
                fetch_missing=False,
                force=True,
            )

        liquidity_snapshot[pool_address] = {
            "tick_data": lp_helper.tick_data,
            "tick_bitmap": {
                key: value
                for key, value in lp_helper.tick_bitmap.items()
                if key != "sparse"  # ignore sparse key
                if value["bitmap"]
            },
        }

    liquidity_snapshot["snapshot_block"] = newest_block

    with open("arbitrum_liquidity_snapshot.json", "w") as file:
        json.dump(liquidity_snapshot, file, indent=2)
        print("Wrote LP snapshot")


# Create a reusable web3 object to communicate with the node
w3 = web3.Web3(web3.HTTPProvider(NODE_URI_HTTP))

try:
    brownie.network.connect(BROWNIE_NETWORK)
except:
    sys.exit(
        "Could not connect! Verify your Brownie network settings using 'brownie networks list'"
    )

newest_block = brownie.chain.height

if __name__ == "__main__":
    asyncio.run(prime_pools())
    print("Complete")

arbitrum_parser_2pool.py

import json
import web3
import networkx as nx
import itertools

WETH_ADDRESS = "0x82aF49447D8a07e3bd95BD0d56f35241523fBab1"

w3 = web3.Web3()

BLACKLISTED_TOKENS = [
    # "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48",  # USDC
    # "0xdAC17F958D2ee523a2206206994597C13D831ec7",  # USDT
]


sushi_v2_lp_data = {}
for filename in [
    "arbitrum_sushiswap_lps.json",
]:
    with open(filename) as file:
        for pool in json.load(file):
            sushi_v2_lp_data[pool.get("pool_address")] = {
                key: value
                for key, value in pool.items()
                if key not in ["pool_id"]
            }
print(f"Found {len(sushi_v2_lp_data)} V2 pools")

camelot_v2_lp_data = {}
for filename in [
    "arbitrum_camelot_lps.json",
]:
    with open(filename) as file:
        for pool in json.load(file):
            camelot_v2_lp_data[pool.get("pool_address")] = {
                key: value
                for key, value in pool.items()
                if key not in ["pool_id"]
            }
print(f"Found {len(camelot_v2_lp_data)} V2 pools")


v3_lp_data = {}
for filename in [
    "arbitrum_uniswapv3_lps.json",
]:
    with open(filename) as file:
        for pool in json.load(file):
            v3_lp_data[pool.get("pool_address")] = {
                key: value
                for key, value in pool.items()
                if key not in ["block_number"]
            }
print(f"Found {len(v3_lp_data)} V3 pools")

all_v2_pools = set(sushi_v2_lp_data.keys())
all_v3_pools = set(v3_lp_data.keys())

all_tokens = set(
    [lp.get("token0") for lp in sushi_v2_lp_data.values()]
    + [lp.get("token1") for lp in sushi_v2_lp_data.values()]
    + [lp.get("token0") for lp in v3_lp_data.values()]
    + [lp.get("token1") for lp in v3_lp_data.values()]
)

# build the graph with tokens as nodes, adding an edge
# between any two tokens held by a liquidity pool
G = nx.MultiGraph()
for pool in sushi_v2_lp_data.values():
    G.add_edge(
        pool.get("token0"),
        pool.get("token1"),
        lp_address=pool.get("pool_address"),
        pool_type="UniswapV2",
    )

for pool in camelot_v2_lp_data.values():
    G.add_edge(
        pool.get("token0"),
        pool.get("token1"),
        lp_address=pool.get("pool_address"),
        pool_type="CamelotV2",
    )

for pool in v3_lp_data.values():
    G.add_edge(
        pool.get("token0"),
        pool.get("token1"),
        lp_address=pool.get("pool_address"),
        pool_type="UniswapV3",
    )

# delete nodes for blacklisted tokens
G.remove_nodes_from(BLACKLISTED_TOKENS)

print(f"G ready: {len(G.nodes)} nodes, {len(G.edges)} edges")

all_tokens_with_weth_pool = list(G.neighbors(WETH_ADDRESS))
print(f"Found {len(all_tokens_with_weth_pool)} tokens with a WETH pair")

print("*** Finding two-pool arbitrage paths ***")
two_pool_arb_paths = {}

for token in all_tokens_with_weth_pool:

    pools = G.get_edge_data(token, WETH_ADDRESS).values()

    # skip tokens with only one pool
    if len(pools) < 2:
        continue

    for pool_a, pool_b in itertools.permutations(pools, 2):

        if pool_a.get("pool_type") == "UniswapV2":
            pool_a_dict = sushi_v2_lp_data.get(pool_a.get("lp_address"))
        elif pool_a.get("pool_type") == "CamelotV2":
            pool_a_dict = camelot_v2_lp_data.get(pool_a.get("lp_address"))
        elif pool_a.get("pool_type") == "UniswapV3":
            pool_a_dict = v3_lp_data.get(pool_a.get("lp_address"))
        else:
            raise Exception(f"could not identify pool {pool_a}")

        if pool_b.get("pool_type") == "UniswapV2":
            pool_b_dict = sushi_v2_lp_data.get(pool_b.get("lp_address"))
        elif pool_b.get("pool_type") == "CamelotV2":
            pool_b_dict = camelot_v2_lp_data.get(pool_b.get("lp_address"))
        elif pool_b.get("pool_type") == "UniswapV3":
            pool_b_dict = v3_lp_data.get(pool_b.get("lp_address"))
        else:
            raise Exception(f"could not identify pool {pool_b}")

        two_pool_arb_paths[id] = {
            "id": (
                id := w3.keccak(
                    hexstr="".join(
                        [
                            pool_a.get("lp_address")[2:],
                            pool_b.get("lp_address")[2:],
                        ]
                    )
                ).hex()
            ),
            "pools": {
                pool_a.get("lp_address"): pool_a_dict,
                pool_b.get("lp_address"): pool_b_dict,
            },
            "arb_types": ["cycle", "flash_borrow_lp_swap"],
            "path": [pool.get("lp_address") for pool in [pool_a, pool_b]],
        }
print(f"Found {len(two_pool_arb_paths)} unique two-pool arbitrage paths")

print("• Saving arb paths to JSON")
with open("arbitrum_arbs_2pool.json", "w") as file:
    json.dump(two_pool_arb_paths, file, indent=2)

arbitrum_parser_3pool.py

import json
import web3
import networkx as nx
import itertools
import time

WETH = "0x82aF49447D8a07e3bd95BD0d56f35241523fBab1"

w3 = web3.Web3()
start_timer = time.monotonic()

BLACKLISTED_TOKENS = [
    # "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48",  # USDC
    # "0xdAC17F958D2ee523a2206206994597C13D831ec7",  # USDT
]

v2_lp_data = {}
for filename in [
    "arbitrum_sushiswap_lps.json",
]:
    with open(filename) as file:
        for pool in json.load(file):
            v2_lp_data[pool.get("pool_address")] = {
                key: value
                for key, value in pool.items()
                if key not in ["pool_id"]
            }
print(f"Found {len(v2_lp_data)} V2 pools")

v3_lp_data = {}
for filename in [
    "arbitrum_uniswapv3_lps.json",
]:
    with open(filename) as file:
        for pool in json.load(file):
            v3_lp_data[pool.get("pool_address")] = {
                key: value
                for key, value in pool.items()
                if key not in ["block_number"]
            }
print(f"Found {len(v3_lp_data)} V3 pools")

all_v2_pools = set(v2_lp_data.keys())
all_v3_pools = set(v3_lp_data.keys())

# build the graph with tokens as nodes, adding an edge
# between any two tokens held by a liquidity pool
G = nx.MultiGraph()
for pool in v2_lp_data.values():
    G.add_edge(
        pool.get("token0"),
        pool.get("token1"),
        lp_address=pool.get("pool_address"),
        pool_type="UniswapV2",
    )

for pool in v3_lp_data.values():
    G.add_edge(
        pool.get("token0"),
        pool.get("token1"),
        lp_address=pool.get("pool_address"),
        pool_type="UniswapV3",
    )

# delete nodes for blacklisted tokens
G.remove_nodes_from(BLACKLISTED_TOKENS)

print(f"G ready: {len(G.nodes)} nodes, {len(G.edges)} edges")

all_tokens_with_weth_pool = list(G.neighbors(WETH))
print(f"Found {len(all_tokens_with_weth_pool)} tokens with a WETH pair")

print("*** Finding triangular arbitrage paths ***")
triangle_arb_paths = {}

# only consider tokens (nodes) with degree > 1, signifies that the token is held by more than one pool
filtered_tokens = [
    token for token in all_tokens_with_weth_pool if G.degree(token) > 1
]
print(f"Processing {len(filtered_tokens)} tokens with degree > 1")

# loop through all possible token pair
for token_a, token_b in itertools.combinations(filtered_tokens, 2):

    # find tokenA/tokenB pools, skip if a tokenA/tokenB pool is not found
    if not G.get_edge_data(token_a, token_b):
        continue

    inside_pools = [
        edge.get("lp_address")
        for edge in G.get_edge_data(token_a, token_b).values()
    ]

    # find tokenA/WETH pools
    outside_pools_tokenA = [
        edge.get("lp_address")
        for edge in G.get_edge_data(token_a, WETH).values()
    ]

    # find tokenB/WETH pools
    outside_pools_tokenB = [
        edge.get("lp_address")
        for edge in G.get_edge_data(token_b, WETH).values()
    ]

    # find all triangular arbitrage paths of form:
    # tokenA/WETH -> tokenA/tokenB -> tokenB/WETH
    for pool_addresses in itertools.product(
        outside_pools_tokenA, inside_pools, outside_pools_tokenB
    ):

        pool_data = {}
        for pool_address in pool_addresses:
            if pool_address in all_v2_pools:
                pool_info = {
                    pool_address: {
                        key: value
                        for key, value in v2_lp_data.get(pool_address).items()
                    }
                }
            elif pool_address in all_v3_pools:
                pool_info = {
                    pool_address: {
                        key: value
                        for key, value in v3_lp_data.get(pool_address).items()
                    }
                }
            else:
                raise Exception
            pool_data.update(pool_info)

        triangle_arb_paths[id] = {
            "id": (
                id := w3.keccak(
                    hexstr="".join(
                        [pool_address[2:] for pool_address in pool_addresses]
                    )
                ).hex()
            ),
            "path": pool_addresses,
            "pools": pool_data,
        }

    # find all triangular arbitrage paths of form:
    # tokenB/WETH -> tokenA/tokenB -> tokenA/WETH
    for pool_addresses in itertools.product(
        outside_pools_tokenB, inside_pools, outside_pools_tokenA
    ):
        pool_data = {}
        for pool_address in pool_addresses:
            if pool_address in all_v2_pools:
                pool_info = {
                    pool_address: {
                        key: value
                        for key, value in v2_lp_data.get(pool_address).items()
                    }
                }
            elif pool_address in all_v3_pools:
                pool_info = {
                    pool_address: {
                        key: value
                        for key, value in v3_lp_data.get(pool_address).items()
                    }
                }
            else:
                raise Exception
            pool_data.update(pool_info)

        triangle_arb_paths[id] = {
            "id": (
                id := w3.keccak(
                    hexstr="".join(
                        [pool_address[2:] for pool_address in pool_addresses]
                    )
                ).hex()
            ),
            "path": pool_addresses,
            "pools": pool_data,
        }

print(
    f"Found {len(triangle_arb_paths)} triangle arb paths in {time.monotonic() - start_timer:.1f}s"
)

print("• Saving pool data to JSON")
with open("arbitrum_arbs_3pool.json", "w") as file:
    json.dump(triangle_arb_paths, file, indent=2)

Bot Discussion

The bot implements some nice new features that you should check out.

  • Graceful Shutdown — no more tracebacks when you stop a bot with CTRL+C

  • Camelot DEX — Camelot DEX recently launched. Camelot is a Sushiswap fork that offers split fee Uniswap V2 pools (the fee for token0 is different from token1, but otherwise the same calculations are made). I have modified the LiquidityPool degenbot helper to support these split fee pools, and the bot will load them if found.

  • Liquidity Snapshots — if the bot finds a file named arbitrum_liquidity_snapshot.json (which is created by arbitrum_uniswapv3_liquidity_events_fetcher.py), it will prime the V3LiquidityPool helpers with complete liquidity information during startup, then update them to the current block height after. No more endless liquidity data fetching! This is a huge startup time improvement for bots doing V3 arbitrage.

  • Pre-activated Arbitrage Helpers — After an arbitrage helper has been activated (generated a valid arbitrage payload which tested via eth_call without reverts), its ID will be added to a file named arbitrum_activated_arbs.json. If the bot finds this on startup, all arbitrage helpers matching these IDs will be moved to the active_arbs dictionary. They will need to be updated and recalculated for the current pool state as usual, but the gas testing (and re-testing on each startup) is now skipped.

  • Prioritized Activation — The activate_arbs coroutine used to do a simple loop through all arbitrage helpers. Fine enough but simplistic, and arbs at the end of the dictionary would take a long time to be activated. Now the coroutine will prioritize activating arbitrage helpers in the following order: (1) inactive arbitage helpers with a recently-activated pool, (2) pre-activated arbitrage helpers per the above note, (3) inactive arbitrage helpers in small batches, working sequentially through the complete list. This results in finding opportunities faster after a restart, since fresh and historically good opportunities are prioritized over stale ones.

  • “Low-Hanging Fruit” Sweeps — Instead of trying to compete with the infrastructure groups, this bot attempts to find arbitrage opportunities that they have missed. Instead of trying to submit payloads every block, it simply identifies profitable arbs every 10s (configurable of course) and submits a transaction to capture it. This has allowed me to capture a lot of very small arbs that nobody else seemed to notice (most are between 10-25 cents worth of WETH) with little risk of reverts.

    • NOTE: hundreds of readers now know about this and have working code to secure it, so don’t expect this to last long.

  • RPC Execution — Flashbots isn’t available on Arbitrum, so we have to arb the old-fashioned way. Feel free to use, adapt, and improve the execute_arb_with_rpc method for any blockchain where you need to submit transactions directly instead of through a 3rd party relay API.

  • Triangle Arb Prioritization — The watch_events coroutine is responsible for selecting arbitrage paths after finding an update for a particular pool. 2-pool paths are straightforward, but triangle (3-pool) paths can quickly spiral out of control (since a Uniswap V3 DEX can have 4-5 pools for each pair at various fee levels, plus the V2 forks, plus the reverse paths). To cut down on excessive calculation (until I get parallel calculation implemented, another can of worms), this bot selects only 3-pool arbitrage paths where the updated pool is in the middle position. This cuts down on a lot of “nuisance” recalculations, with the assumption that an opportunity created at an edge pool will be more profitable in a 2-pool path than a 3-pool path with the middle hop close to parity.

    • NOTE: you can modify this easily if you prefer to calculate all the opportunities.

Bot Source Code

arbitrum_cycle.py

This post is for paid subscribers

Already a paid subscriber? Sign in
© 2025 BowTiedDevil
Privacy ∙ Terms ∙ Collection notice
Start writingGet the app
Substack is the home for great culture

Share

Copy link
Facebook
Email
Notes
More