Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

response is ERROR when using 'multiprocessing' #3265

Open
kimroniny opened this issue Mar 5, 2024 · 4 comments · May be fixed by #3412
Open

response is ERROR when using 'multiprocessing' #3265

kimroniny opened this issue Mar 5, 2024 · 4 comments · May be fixed by #3412

Comments

@kimroniny
Copy link

kimroniny commented Mar 5, 2024

  • Version: 6.15.1
  • Python: 3.8.10
  • OS: linux/
  • pip freeze output
aiohttp==3.9.3
aiosignal==1.3.1
async-timeout==4.0.3
attrs==23.2.0
bitarray==2.9.2
brokenaxes==0.6.0
certifi==2024.2.2
charset-normalizer==3.3.2
contourpy==1.1.1
cycler==0.12.1
cytoolz==0.12.3
eth-account==0.11.0
eth-hash==0.6.0
eth-keyfile==0.7.0
eth-keys==0.5.0
eth-rlp==1.0.1
eth-typing==4.0.0
eth-utils==4.0.0
eth_abi==5.0.0
fonttools==4.49.0
frozenlist==1.4.1
hexbytes==0.3.1
idna==3.6
importlib-resources==6.1.1
jsonschema==4.21.1
jsonschema-specifications==2023.12.1
kiwisolver==1.4.5
lru-dict==1.2.0
matplotlib==3.7.5
multidict==6.0.5
numpy==1.24.4
packaging==23.2
parsimonious==0.9.0
pillow==10.2.0
pkgutil_resolve_name==1.3.10
protobuf==4.25.3
py-solc==3.2.0
pycryptodome==3.20.0
pyparsing==3.1.1
python-dateutil==2.8.2
pyunormalize==15.1.0
referencing==0.33.0
regex==2023.12.25
requests==2.31.0
rlp==4.0.0
rpds-py==0.18.0
semantic-version==2.10.0
six==1.16.0
toolz==0.12.1
typing_extensions==4.9.0
urllib3==2.2.1
web3==6.15.1
websockets==12.0
yarl==1.9.4
zipp==3.17.0

What was wrong?

Please include any of the following that are applicable:

  • The code which produced the error
from web3 import Web3, HTTPProvider

import multiprocessing

url = "http://ip:port"


def conn():
    w3 = Web3(HTTPProvider(endpoint_uri=url, request_kwargs={"timeout": 1000}))
    genesis_account = lambda: (
        w3.to_checksum_address(w3.eth.accounts[0]) if w3.eth.accounts else None
    )
    flag = w3.geth.personal.unlock_account(genesis_account(), "", 0)
    # execute one more time, then the exception occurs !!!
    genesis_account()


if __name__ == "__main__":
    conn()

    proc1 = multiprocessing.Process(target=conn)

    proc2 = multiprocessing.Process(target=conn)

    proc1.start()
    proc2.start()

    proc1.join()
    proc2.join()
  • The full output of the error
MainProcess, session: 140271829895392
MainProcess, session: 140271829895392
MainProcess, session: 140271829895392
MainProcess, session: 140271829895392
MainProcess, session: 140271829895392
Process-1, session: 140271829895392
Process-2, session: 140271829895392
Process-2, session: 140271829895392
Process-1, session: 140271829895392
Process-2, session: 140271829895392
Process-1, session: 140271829895392
Process-2, session: 140271829895392
Process Process-2:
Traceback (most recent call last):
  File "/home/miniconda3/envs/py38/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/home/miniconda3/envs/py38/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "temp/para_web3.py", line 15, in conn
    genesis_account()
  File "temp/para_web3.py", line 11, in <lambda>
    w3.to_checksum_address(w3.eth.accounts[0]) if w3.eth.accounts else None
  File "/home/.local/share/virtualenvs/tor-experiment-L97qUE1y/lib/python3.8/site-packages/web3/eth/eth.py", line 114, in accounts
    return self._accounts()
  File "/home/.local/share/virtualenvs/tor-experiment-L97qUE1y/lib/python3.8/site-packages/web3/module.py", line 78, in caller
    return apply_result_formatters(result_formatters, result)
  File "cytoolz/functoolz.pyx", line 267, in cytoolz.functoolz.curry.__call__
  File "cytoolz/functoolz.pyx", line 263, in cytoolz.functoolz.curry.__call__
  File "/home/.local/share/virtualenvs/tor-experiment-L97qUE1y/lib/python3.8/site-packages/web3/module.py", line 49, in apply_result_formatters
    formatted_result = pipe(result, result_formatters)
  File "cytoolz/functoolz.pyx", line 680, in cytoolz.functoolz.pipe
  File "cytoolz/functoolz.pyx", line 655, in cytoolz.functoolz.c_pipe
  File "/home/.local/share/virtualenvs/tor-experiment-L97qUE1y/lib/python3.8/site-packages/eth_utils/functional.py", line 47, in inner
    return callback(fn(*args, **kwargs))
TypeError: 'bool' object is not iterable
Process-1, session: 140271829895392
Process-1, session: 140271829895392
  • What type of node you were connecting to.
Private ethereum node. Geth version is 1.10.0-stable.

How can it be fixed?

explain the output

I print session id in the format "{current_process().name}, session: {id(session)}".

The exception happens when querying eth.accounts. The response shoule be type: List[Account], while it is type: bool which should be the response type of unlock_account.

The detailed exception line is the code. It has obtained the response but faces the error type.

reason

When sending requests, web3 uses the cached session with SimpleCache(). 'SimpleCache()' is instantiated upon importing the web3 and won't be re-instantiated whenever creating a new Web3() .

In the main process, after executing conn() the first session is cached. When forking the children process, all objects including the cached session in the main process are also forked. However, the underlying connections of this session will be shared across all processes, which could cause the responses mixed up in different processes.

It is an absolutely dangerous BUG !!!

solution

  • solution 1.

The easiest solution is to change the cache key. Now the key is just tied to the thread identifier which is identical in the above codes. Just add the process identifier into the key.

  • solution 2.

Solution 1 is not the best because all process are still using the same one SimpleCache(). I think, it should check whether it is in a new forked process or not. If it is, then a new SimpleCache()` should be instantiated.

@kclowes
Copy link
Collaborator

kclowes commented Mar 6, 2024

We don't currently support multiprocessing. You may be able to sort of work around this by using two separate instances of w3, however.

@kimroniny
Copy link
Author

We don't currently support multiprocessing. You may be able to sort of work around this by using two separate instances of w3, however.

@kclowes

The problem is that even though I use two seperate instances of w3 in different processes, this ERROR still exists just like the above example.

Though it doesn't support multiprocessing, it shouldn't occur any errors when using seperate instances in different processes.

The key BUG is about when to initialize SimpleCache() and how to set the cache key. The cache key is only identical to the threading identifier. That's NOT enough in python.

@kclowes
Copy link
Collaborator

kclowes commented Mar 11, 2024

Got it. We'll triage and put it in our queue. Thanks for reporting!

@fselmo
Copy link
Collaborator

fselmo commented May 30, 2024

@kimroniny this is not reproducible, though I agree that a session cache should exist per provider instance even. If you have another reproducible example, it would help quite a bit. I did start a PR towards strapping a session cache manager to each instance of an http provider (or async http provider) which I think is the most reasonable approach and I hope that solves whatever problem you are seeing.

However, on the current v6 branch, with no changes to the session caching, I am getting different sessions:

Process: MainProcess
cache: odict_items([('46b10f397f77283a83ce39cad6f8520b', <requests.sessions.Session object at 0x114b0c1d0>)])
Process: Process-2
cache: odict_items([('46b10f397f77283a83ce39cad6f8520b', <requests.sessions.Session object at 0x10e1847d0>)])
Process: Process-1
cache: odict_items([('46b10f397f77283a83ce39cad6f8520b', <requests.sessions.Session object at 0x104eb05d0>)])

with the following code:

from web3 import Web3, HTTPProvider
import multiprocessing
from web3._utils.request import (
    _session_cache
)


def conn():
    w3 = Web3(HTTPProvider(endpoint_uri=HTTP_LOCAL, request_kwargs={"timeout": 1000}))
    w3.eth.block_number
    print(f"Process: {multiprocessing.current_process().name}")
    print(f"cache: {_session_cache._data.items()}")


if __name__ == "__main__":
    conn()

    proc1 = multiprocessing.Process(target=conn)
    proc2 = multiprocessing.Process(target=conn)

    proc1.start()
    proc2.start()

    proc1.join()
    proc2.join

Similar results on main branch (v7) and similar results on the refactor PR I created. Please provide a more clear and reproducible code snippet if you can.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants