Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[master] MongoDB returner reuse database connections #66238

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog/55999.changed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve MongoDB returner performance by reusing database connections
41 changes: 25 additions & 16 deletions salt/returners/mongo_future_return.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@

log = logging.getLogger(__name__)

MONGO_CLIENT = None

# Define the module's virtual name
__virtualname__ = "mongo"

Expand Down Expand Up @@ -157,6 +159,8 @@ def _get_conn(ret):
"""
_options = _get_options(ret)

global MONGO_CLIENT

host = _options.get("host")
port = _options.get("port")
ssl = _options.get("ssl") or False
Expand All @@ -176,21 +180,26 @@ def _get_conn(ret):
" provided"
)
pymongo.uri_parser.parse_uri(uri)
conn = pymongo.MongoClient(uri)
mdb = conn.get_database()

if not MONGO_CLIENT:
MONGO_CLIENT = pymongo.MongoClient(uri)

mdb = MONGO_CLIENT.get_database()
else:
if PYMONGO_VERSION > Version("2.3"):
conn = pymongo.MongoClient(
host, port, username=user, password=password, ssl=ssl
)
if not MONGO_CLIENT:
MONGO_CLIENT = pymongo.MongoClient(
host, port, username=user, password=password, ssl=ssl
)
else:
if uri:
raise salt.exceptions.SaltConfigurationError(
"pymongo <= 2.3 does not support uri format"
)
conn = pymongo.Connection(host, port, username=user, password=password)
if not MONGO_CLIENT:
MONGO_CLIENT = pymongo.Connection(host, port, username=user, password=password)

mdb = conn[db_]
mdb = MONGO_CLIENT[db_]

if indexes:
if PYMONGO_VERSION > Version("2.3"):
Expand All @@ -204,14 +213,14 @@ def _get_conn(ret):
mdb.jobs.ensure_index("jid")
mdb.events.ensure_index("tag")

return conn, mdb
return mdb


def returner(ret):
"""
Return data to a mongodb server
"""
conn, mdb = _get_conn(ret)
mdb = _get_conn(ret)

if isinstance(ret["return"], dict):
back = _remove_dots(ret["return"])
Expand Down Expand Up @@ -292,7 +301,7 @@ def save_load(jid, load, minions=None):
"""
Save the load for a given job id
"""
conn, mdb = _get_conn(ret=None)
mdb = _get_conn(ret=None)
to_save = _safe_copy(load)

if PYMONGO_VERSION > Version("2.3"):
Expand All @@ -312,15 +321,15 @@ def get_load(jid):
"""
Return the load associated with a given job id
"""
conn, mdb = _get_conn(ret=None)
mdb = _get_conn(ret=None)
return mdb.jobs.find_one({"jid": jid}, {"_id": 0})


def get_jid(jid):
"""
Return the return information associated with a jid
"""
conn, mdb = _get_conn(ret=None)
mdb = _get_conn(ret=None)
ret = {}
rdata = mdb.saltReturns.find({"jid": jid}, {"_id": 0})
if rdata:
Expand All @@ -335,7 +344,7 @@ def get_fun(fun):
"""
Return the most recent jobs that have executed the named function
"""
conn, mdb = _get_conn(ret=None)
mdb = _get_conn(ret=None)
ret = {}
rdata = mdb.saltReturns.find_one({"fun": fun}, {"_id": 0})
if rdata:
Expand All @@ -347,7 +356,7 @@ def get_minions():
"""
Return a list of minions
"""
conn, mdb = _get_conn(ret=None)
mdb = _get_conn(ret=None)
ret = []
name = mdb.saltReturns.distinct("minion")
ret.append(name)
Expand All @@ -358,7 +367,7 @@ def get_jids():
"""
Return a list of job ids
"""
conn, mdb = _get_conn(ret=None)
mdb = _get_conn(ret=None)
map = "function() { emit(this.jid, this); }"
reduce = "function (key, values) { return values[0]; }"
result = mdb.jobs.inline_map_reduce(map, reduce)
Expand All @@ -380,7 +389,7 @@ def event_return(events):
"""
Return events to Mongodb server
"""
conn, mdb = _get_conn(ret=None)
mdb = _get_conn(ret=None)

if isinstance(events, list):
events = events[0]
Expand Down
20 changes: 13 additions & 7 deletions salt/returners/mongo_return.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@

log = logging.getLogger(__name__)

MONGO_CLIENT = None

# Define the module's virtual name
# currently only used iby _get_options
__virtualname__ = "mongo"
Expand Down Expand Up @@ -126,6 +128,8 @@ def _get_conn(ret):
"""
_options = _get_options(ret)

global MONGO_CLIENT

host = _options.get("host")
port = _options.get("port")
ssl = _options.get("ssl") or False
Expand All @@ -139,10 +143,12 @@ def _get_conn(ret):
# a bunch of these sections that need to be supported

if PYMONGO_VERSION > Version("2.3"):
conn = pymongo.MongoClient(host=host, port=port, ssl=ssl)
if not MONGO_CLIENT:
MONGO_CLIENT = pymongo.MongoClient(host=host, port=port, ssl=ssl)
else:
conn = pymongo.Connection(host, port)
mdb = conn[db_]
if not MONGO_CLIENT:
MONGO_CLIENT = pymongo.Connection(host, port)
mdb = MONGO_CLIENT[db_]

if user and password:
mdb.authenticate(user, password)
Expand All @@ -159,14 +165,14 @@ def _get_conn(ret):

mdb.jobs.ensure_index("jid")

return conn, mdb
return mdb


def returner(ret):
"""
Return data to a mongodb server
"""
conn, mdb = _get_conn(ret)
mdb = _get_conn(ret)
col = mdb[ret["id"]]

if isinstance(ret["return"], dict):
Expand Down Expand Up @@ -207,7 +213,7 @@ def get_jid(jid):
"""
Return the return information associated with a jid
"""
conn, mdb = _get_conn(ret=None)
mdb = _get_conn(ret=None)
ret = {}
rdata = mdb.saltReturns.find({"jid": jid}, {"_id": 0})
if rdata:
Expand All @@ -222,7 +228,7 @@ def get_fun(fun):
"""
Return the most recent jobs that have executed the named function
"""
conn, mdb = _get_conn(ret=None)
mdb = _get_conn(ret=None)
ret = {}
rdata = mdb.saltReturns.find_one({"fun": fun}, {"_id": 0})
if rdata:
Expand Down