Skip to content

Commit

Permalink
Change query_hash calculation method to concatenate original query an…
Browse files Browse the repository at this point in the history
…d parameters instead of final query.
  • Loading branch information
ehooi committed May 10, 2024
1 parent 4569191 commit d479136
Show file tree
Hide file tree
Showing 14 changed files with 76 additions and 46 deletions.
5 changes: 4 additions & 1 deletion redash/handlers/query_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ def run_query(query, parameters, data_source, query_id, should_apply_auto_limit,

return error_response(message)

query_hash = data_source.query_runner.gen_query_hash(query.text, parameters, should_apply_auto_limit)

try:
query.apply(parameters)
except (InvalidParameterError, QueryDetachedFromDataSourceError) as e:
Expand All @@ -82,7 +84,7 @@ def run_query(query, parameters, data_source, query_id, should_apply_auto_limit,
if max_age == 0:
query_result = None
else:
query_result = models.QueryResult.get_latest(data_source, query_text, max_age)
query_result = models.QueryResult.get_latest(data_source, query_hash, max_age)

record_event(
current_user.org,
Expand All @@ -103,6 +105,7 @@ def run_query(query, parameters, data_source, query_id, should_apply_auto_limit,
else:
job = enqueue_query(
query_text,
query_hash,
data_source,
current_user.id,
current_user.is_api_user(),
Expand Down
10 changes: 3 additions & 7 deletions redash/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@
)
from redash.utils import (
base_url,
gen_query_hash,
generate_token,
json_dumps,
json_loads,
Expand Down Expand Up @@ -352,9 +351,7 @@ def unused(cls, days=7):
)

@classmethod
def get_latest(cls, data_source, query, max_age=0):
query_hash = gen_query_hash(query)

def get_latest(cls, data_source, query_hash, max_age=0):
if max_age == -1 and settings.QUERY_RESULTS_EXPIRED_TTL_ENABLED:
max_age = settings.QUERY_RESULTS_EXPIRED_TTL

Expand Down Expand Up @@ -824,20 +821,19 @@ def dashboard_api_keys(self):
def update_query_hash(self):
should_apply_auto_limit = self.options.get("apply_auto_limit", False) if self.options else False
query_runner = self.data_source.query_runner if self.data_source else BaseQueryRunner({})
query_text = self.query_text

parameters_dict = {p["name"]: p.get("value") for p in self.parameters} if self.options else {}
if any(parameters_dict):
try:
query_text = self.parameterized.apply(parameters_dict).query
self.parameterized.apply(parameters_dict).query
except InvalidParameterError as e:
logging.info(f"Unable to update hash for query {self.id} because of invalid parameters: {str(e)}")
except QueryDetachedFromDataSourceError as e:
logging.info(
f"Unable to update hash for query {self.id} because of dropdown query {e.query_id} is unattached from datasource"
)

self.query_hash = query_runner.gen_query_hash(query_text, should_apply_auto_limit)
self.query_hash = query_runner.gen_query_hash(self.query_text, parameters_dict, should_apply_auto_limit)


@listens_for(Query, "before_insert")
Expand Down
7 changes: 4 additions & 3 deletions redash/query_runner/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,9 +261,10 @@ def supports_auto_limit(self):
def apply_auto_limit(self, query_text, should_apply_auto_limit):
return query_text

def gen_query_hash(self, query_text, set_auto_limit=False):
query_text = self.apply_auto_limit(query_text, set_auto_limit)
return utils.gen_query_hash(query_text)
def gen_query_hash(self, query_text, parameters={}, set_auto_limit=False):
if not self.supports_auto_limit:
set_auto_limit = False
return utils.gen_query_hash(query_text, parameters, set_auto_limit)


class BaseSQLQueryRunner(BaseQueryRunner):
Expand Down
13 changes: 8 additions & 5 deletions redash/tasks/queries/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ def _unlock(query_hash, data_source_id):
redis_connection.delete(_job_lock_id(query_hash, data_source_id))


def enqueue_query(query, data_source, user_id, is_api_key=False, scheduled_query=None, metadata={}):
query_hash = gen_query_hash(query)
def enqueue_query(query, query_hash, data_source, user_id, is_api_key=False, scheduled_query=None, metadata={}):
logger.info("Inserting job for %s with metadata=%s", query_hash, metadata)
try_count = 0
job = None
Expand Down Expand Up @@ -99,7 +98,9 @@ def enqueue_query(query, data_source, user_id, is_api_key=False, scheduled_query
if not scheduled_query:
enqueue_kwargs["result_ttl"] = settings.JOB_EXPIRY_TIME

job = queue.enqueue(execute_query, query, data_source.id, metadata, **enqueue_kwargs)
job = queue.enqueue(
execute_query, query, data_source.id, metadata, query_hash=query_hash, **enqueue_kwargs
)

logger.info("[%s] Created new job: %s", query_hash, job.id)
pipe.set(
Expand Down Expand Up @@ -146,9 +147,10 @@ def _resolve_user(user_id, is_api_key, query_id):


class QueryExecutor:
def __init__(self, query, data_source_id, user_id, is_api_key, metadata, is_scheduled_query):
def __init__(self, query, query_hash, data_source_id, user_id, is_api_key, metadata, is_scheduled_query):
self.job = get_current_job()
self.query = query
self.query_hash = query_hash or gen_query_hash(query)
self.data_source_id = data_source_id
self.metadata = metadata
self.data_source = self._load_data_source()
Expand All @@ -162,7 +164,6 @@ def __init__(self, query, data_source_id, user_id, is_api_key, metadata, is_sche

# Close DB connection to prevent holding a connection for a long time while the query is executing.
models.db.session.close()
self.query_hash = gen_query_hash(self.query)
self.is_scheduled_query = is_scheduled_query
if self.is_scheduled_query:
# Load existing tracker or create a new one if the job was created before code update:
Expand Down Expand Up @@ -271,10 +272,12 @@ def execute_query(
user_id=None,
scheduled_query_id=None,
is_api_key=False,
query_hash=None,
):
try:
return QueryExecutor(
query,
query_hash,
data_source_id,
user_id,
is_api_key,
Expand Down
1 change: 1 addition & 0 deletions redash/tasks/queries/maintenance.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ def refresh_queries():
query_text = _apply_auto_limit(query_text, query)
enqueue_query(
query_text,
query.query_hash,
query.data_source,
query.user_id,
scheduled_query=query,
Expand Down
6 changes: 5 additions & 1 deletion redash/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def slugify(s):
return re.sub(r"[^a-z0-9_\-]+", "-", s.lower())


def gen_query_hash(sql):
def gen_query_hash(sql, parameters={}, auto_limit=False):
"""Return hash of the given query after stripping all comments, line breaks
and multiple spaces.
Expand All @@ -60,6 +60,10 @@ def gen_query_hash(sql):
"""
sql = COMMENTS_REGEX.sub("", sql)
sql = "".join(sql.split())

query_parameters = {"parameters": parameters, "auto_limit": auto_limit}
sql += "\n" + json.dumps(query_parameters, sort_keys=True, separators=(",", ":"))

return hashlib.md5(sql.encode("utf-8")).hexdigest()


Expand Down
2 changes: 1 addition & 1 deletion tests/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ def __call__(self):
runtime=1,
retrieved_at=utcnow,
query_text="SELECT 1",
query_hash=gen_query_hash("SELECT 1"),
query_hash=gen_query_hash("SELECT 1", {}, False),
data_source=data_source_factory.create,
org_id=1,
)
Expand Down
1 change: 0 additions & 1 deletion tests/handlers/test_query_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,6 @@ def test_renders_excel_file_when_rows_have_missing_columns(self):

class TestJobResource(BaseTestCase):
def test_cancels_queued_queries(self):

query = self.factory.create_query()
job_id = self.make_request(
"post",
Expand Down
2 changes: 1 addition & 1 deletion tests/models/test_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ def setUp(self):
super(TestQueryUpdateLatestResult, self).setUp()
self.data_source = self.factory.data_source
self.query = "SELECT 1"
self.query_hash = gen_query_hash(self.query)
self.query_hash = gen_query_hash(self.query, {}, False)
self.runtime = 123
self.utcnow = utcnow()
self.data = {"columns": {}, "rows": []}
Expand Down
12 changes: 6 additions & 6 deletions tests/models/test_query_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,30 +12,30 @@ def test_get_latest_returns_none_if_not_found(self):

def test_get_latest_returns_when_found(self):
qr = self.factory.create_query_result()
found_query_result = models.QueryResult.get_latest(qr.data_source, qr.query_text, 60)
found_query_result = models.QueryResult.get_latest(qr.data_source, qr.query_hash, 60)

self.assertEqual(qr, found_query_result)

def test_get_latest_doesnt_return_query_from_different_data_source(self):
qr = self.factory.create_query_result()
data_source = self.factory.create_data_source()
found_query_result = models.QueryResult.get_latest(data_source, qr.query_text, 60)
found_query_result = models.QueryResult.get_latest(data_source, qr.query_hash, 60)

self.assertIsNone(found_query_result)

def test_get_latest_doesnt_return_if_ttl_expired(self):
yesterday = utcnow() - datetime.timedelta(days=1)
qr = self.factory.create_query_result(retrieved_at=yesterday)

found_query_result = models.QueryResult.get_latest(qr.data_source, qr.query_text, max_age=60)
found_query_result = models.QueryResult.get_latest(qr.data_source, qr.query_hash, max_age=60)

self.assertIsNone(found_query_result)

def test_get_latest_returns_if_ttl_not_expired(self):
yesterday = utcnow() - datetime.timedelta(seconds=30)
qr = self.factory.create_query_result(retrieved_at=yesterday)

found_query_result = models.QueryResult.get_latest(qr.data_source, qr.query_text, max_age=120)
found_query_result = models.QueryResult.get_latest(qr.data_source, qr.query_hash, max_age=120)

self.assertEqual(found_query_result, qr)

Expand All @@ -44,7 +44,7 @@ def test_get_latest_returns_the_most_recent_result(self):
self.factory.create_query_result(retrieved_at=yesterday)
qr = self.factory.create_query_result()

found_query_result = models.QueryResult.get_latest(qr.data_source, qr.query_text, 60)
found_query_result = models.QueryResult.get_latest(qr.data_source, qr.query_hash, 60)

self.assertEqual(found_query_result.id, qr.id)

Expand All @@ -54,7 +54,7 @@ def test_get_latest_returns_the_last_cached_result_for_negative_ttl(self):

yesterday = utcnow() + datetime.timedelta(days=-1)
qr = self.factory.create_query_result(retrieved_at=yesterday)
found_query_result = models.QueryResult.get_latest(qr.data_source, qr.query_text, -1)
found_query_result = models.QueryResult.get_latest(qr.data_source, qr.query_hash, -1)

self.assertEqual(found_query_result.id, qr.id)

Expand Down
13 changes: 6 additions & 7 deletions tests/query_runner/test_basesql_queryrunner.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import unittest

from redash.query_runner import BaseQueryRunner, BaseSQLQueryRunner
from redash.utils import gen_query_hash


class TestBaseSQLQueryRunner(unittest.TestCase):
Expand Down Expand Up @@ -118,16 +117,16 @@ def test_apply_auto_limit_inline_comment(self):

def test_gen_query_hash_baseSQL(self):
origin_query_text = "select *"
expected_query_text = "select * LIMIT 1000"
base_runner = BaseQueryRunner({})
self.assertEqual(
base_runner.gen_query_hash(expected_query_text), self.query_runner.gen_query_hash(origin_query_text, True)
)
base_hash = self.query_runner.gen_query_hash(origin_query_text)
self.assertEqual(base_hash, self.query_runner.gen_query_hash(origin_query_text, {}, False))
self.assertNotEqual(base_hash, self.query_runner.gen_query_hash(origin_query_text, {}, True))

def test_gen_query_hash_NoneSQL(self):
origin_query_text = "select *"
base_runner = BaseQueryRunner({})
self.assertEqual(gen_query_hash(origin_query_text), base_runner.gen_query_hash(origin_query_text, True))
self.assertEqual(
base_runner.gen_query_hash(origin_query_text), base_runner.gen_query_hash(origin_query_text, {}, True)
)


if __name__ == "__main__":
Expand Down
39 changes: 26 additions & 13 deletions tests/tasks/test_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def test_multiple_enqueue_of_same_query(self, enqueue, _):
with Connection(rq_redis_connection):
enqueue_query(
query.query_text,
query.query_hash,
query.data_source,
query.user_id,
False,
Expand All @@ -48,6 +49,7 @@ def test_multiple_enqueue_of_same_query(self, enqueue, _):
)
enqueue_query(
query.query_text,
query.query_hash,
query.data_source,
query.user_id,
False,
Expand All @@ -56,6 +58,7 @@ def test_multiple_enqueue_of_same_query(self, enqueue, _):
)
enqueue_query(
query.query_text,
query.query_hash,
query.data_source,
query.user_id,
False,
Expand All @@ -71,6 +74,7 @@ def test_multiple_enqueue_of_expired_job(self, enqueue, fetch_job):
with Connection(rq_redis_connection):
enqueue_query(
query.query_text,
query.query_hash,
query.data_source,
query.user_id,
False,
Expand All @@ -83,6 +87,7 @@ def test_multiple_enqueue_of_expired_job(self, enqueue, fetch_job):

enqueue_query(
query.query_text,
query.query_hash,
query.data_source,
query.user_id,
False,
Expand All @@ -98,6 +103,7 @@ def test_reenqueue_during_job_cancellation(self, enqueue, my_fetch_job):
with Connection(rq_redis_connection):
enqueue_query(
query.query_text,
query.query_hash,
query.data_source,
query.user_id,
False,
Expand All @@ -115,6 +121,7 @@ def cancel_job(*args, **kwargs):

enqueue_query(
query.query_text,
query.query_hash,
query.data_source,
query.user_id,
False,
Expand All @@ -131,6 +138,7 @@ def test_limits_query_time(self, _, enqueue, __):
with Connection(rq_redis_connection):
enqueue_query(
query.query_text,
query.query_hash,
query.data_source,
query.user_id,
False,
Expand All @@ -142,32 +150,37 @@ def test_limits_query_time(self, _, enqueue, __):
self.assertEqual(60, kwargs.get("job_timeout"))

def test_multiple_enqueue_of_different_query(self, enqueue, _):
query = self.factory.create_query()
query1 = self.factory.create_query(query_text="SELECT 1")
query2 = self.factory.create_query(query_text="SELECT 2")
query3 = self.factory.create_query(query_text="SELECT 3")

with Connection(rq_redis_connection):
enqueue_query(
query.query_text,
query.data_source,
query.user_id,
query1.query_text,
query1.query_hash,
query1.data_source,
query1.user_id,
False,
None,
{"Username": "Arik", "query_id": query.id},
{"Username": "Arik", "query_id": query1.id},
)
enqueue_query(
query.query_text + "2",
query.data_source,
query.user_id,
query2.query_text,
query2.query_hash,
query2.data_source,
query2.user_id,
False,
None,
{"Username": "Arik", "query_id": query.id},
{"Username": "Arik", "query_id": query2.id},
)
enqueue_query(
query.query_text + "3",
query.data_source,
query.user_id,
query3.query_text,
query3.query_hash,
query3.data_source,
query3.user_id,
False,
None,
{"Username": "Arik", "query_id": query.id},
{"Username": "Arik", "query_id": query3.id},
)

self.assertEqual(3, enqueue.call_count)
Expand Down

0 comments on commit d479136

Please sign in to comment.