Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change query_hash calculation method with separate params #6960

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def change_query_hash(conn, table, query_text_to):
table
.update()
.where(table.c.id == record.id)
.values(query_hash=gen_query_hash(query_text)))
.values(query_hash=gen_query_hash(query_text, {}, False)))


def upgrade():
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
"""calc query_hash with seperate params

Revision ID: e4d9a0b448cb
Revises: 7205816877ec
Create Date: 2024-05-20 00:48:25.674748

"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.sql import table

from redash.query_runner import get_query_runner
from redash.utils import gen_query_hash

# revision identifiers, used by Alembic.
revision = "e4d9a0b448cb"
down_revision = "7205816877ec"
branch_labels = None
depends_on = None


queries = table(
"queries",
sa.Column("id", sa.Integer, primary_key=True),
sa.Column("query", sa.Text),
sa.Column("query_hash", sa.String(length=32)),
sa.Column("data_source_id", sa.Integer),
sa.Column("options", sa.JSON))

data_sources = table(
"data_sources",
sa.Column("id", sa.Integer, primary_key=True),
sa.Column("type", sa.String(length=255)))


def load_data_sources(conn):
data_source_type_map = {}
for data_source in conn.execute(data_sources.select()):
data_source_type_map[data_source.id] = data_source.type
return data_source_type_map


def upgrade():
conn = op.get_bind()

data_source_type_map = load_data_sources(conn)

for query in conn.execute(queries.select()):
data_source_type = data_source_type_map.get(query.data_source_id)
if not data_source_type:
continue

query_runner = get_query_runner(data_source_type, {})
if not query_runner:
print(f"query #{query.id}: can't get query runner '{data_source_type}'")
continue

parameters_dict = {p["name"]: p.get("value") for p in query.options.get("parameters", [])}

if query_runner.supports_auto_limit:
should_apply_auto_limit = query.options.get("apply_auto_limit", False)
else:
should_apply_auto_limit = False

new_query_hash = gen_query_hash(query.query, parameters_dict, should_apply_auto_limit)

conn.execute(
queries
.update()
.where(queries.c.id == query.id)
.values(query_hash=new_query_hash))


def downgrade():
# We can't calculate the old query_hash.
# Because the dynamic date(-range) parameters were lost.
# This is the root cause of the problem I am trying to fix.
pass
5 changes: 4 additions & 1 deletion redash/handlers/query_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ def run_query(query, parameters, data_source, query_id, should_apply_auto_limit,

return error_response(message)

query_hash = data_source.query_runner.gen_query_hash(query.text, parameters, should_apply_auto_limit)

try:
query.apply(parameters)
except (InvalidParameterError, QueryDetachedFromDataSourceError) as e:
Expand All @@ -81,7 +83,7 @@ def run_query(query, parameters, data_source, query_id, should_apply_auto_limit,
if max_age == 0:
query_result = None
else:
query_result = models.QueryResult.get_latest(data_source, query_text, max_age)
query_result = models.QueryResult.get_latest(data_source, query_hash, max_age)

record_event(
current_user.org,
Expand All @@ -102,6 +104,7 @@ def run_query(query, parameters, data_source, query_id, should_apply_auto_limit,
else:
job = enqueue_query(
query_text,
query_hash,
data_source,
current_user.id,
current_user.is_api_user(),
Expand Down
10 changes: 3 additions & 7 deletions redash/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@
)
from redash.utils import (
base_url,
gen_query_hash,
generate_token,
json_dumps,
json_loads,
Expand Down Expand Up @@ -343,9 +342,7 @@ def unused(cls, days=7):
)

@classmethod
def get_latest(cls, data_source, query, max_age=0):
query_hash = gen_query_hash(query)

def get_latest(cls, data_source, query_hash, max_age=0):
if max_age == -1 and settings.QUERY_RESULTS_EXPIRED_TTL_ENABLED:
max_age = settings.QUERY_RESULTS_EXPIRED_TTL

Expand Down Expand Up @@ -815,20 +812,19 @@ def dashboard_api_keys(self):
def update_query_hash(self):
should_apply_auto_limit = self.options.get("apply_auto_limit", False) if self.options else False
query_runner = self.data_source.query_runner if self.data_source else BaseQueryRunner({})
query_text = self.query_text

parameters_dict = {p["name"]: p.get("value") for p in self.parameters} if self.options else {}
if any(parameters_dict):
try:
query_text = self.parameterized.apply(parameters_dict).query
self.parameterized.apply(parameters_dict).query
except InvalidParameterError as e:
logging.info(f"Unable to update hash for query {self.id} because of invalid parameters: {str(e)}")
except QueryDetachedFromDataSourceError as e:
logging.info(
f"Unable to update hash for query {self.id} because of dropdown query {e.query_id} is unattached from datasource"
)

self.query_hash = query_runner.gen_query_hash(query_text, should_apply_auto_limit)
self.query_hash = query_runner.gen_query_hash(self.query_text, parameters_dict, should_apply_auto_limit)


@listens_for(Query, "before_insert")
Expand Down
7 changes: 4 additions & 3 deletions redash/query_runner/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,9 +261,10 @@ def supports_auto_limit(self):
def apply_auto_limit(self, query_text, should_apply_auto_limit):
return query_text

def gen_query_hash(self, query_text, set_auto_limit=False):
query_text = self.apply_auto_limit(query_text, set_auto_limit)
return utils.gen_query_hash(query_text)
def gen_query_hash(self, query_text, parameters={}, set_auto_limit=False):
if not self.supports_auto_limit:
set_auto_limit = False
return utils.gen_query_hash(query_text, parameters, set_auto_limit)


class BaseSQLQueryRunner(BaseQueryRunner):
Expand Down
13 changes: 8 additions & 5 deletions redash/tasks/queries/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ def _unlock(query_hash, data_source_id):
redis_connection.delete(_job_lock_id(query_hash, data_source_id))


def enqueue_query(query, data_source, user_id, is_api_key=False, scheduled_query=None, metadata={}):
query_hash = gen_query_hash(query)
def enqueue_query(query, query_hash, data_source, user_id, is_api_key=False, scheduled_query=None, metadata={}):
logger.info("Inserting job for %s with metadata=%s", query_hash, metadata)
try_count = 0
job = None
Expand Down Expand Up @@ -99,7 +98,9 @@ def enqueue_query(query, data_source, user_id, is_api_key=False, scheduled_query
if not scheduled_query:
enqueue_kwargs["result_ttl"] = settings.JOB_EXPIRY_TIME

job = queue.enqueue(execute_query, query, data_source.id, metadata, **enqueue_kwargs)
job = queue.enqueue(
execute_query, query, data_source.id, metadata, query_hash=query_hash, **enqueue_kwargs
)

logger.info("[%s] Created new job: %s", query_hash, job.id)
pipe.set(
Expand Down Expand Up @@ -146,9 +147,10 @@ def _resolve_user(user_id, is_api_key, query_id):


class QueryExecutor:
def __init__(self, query, data_source_id, user_id, is_api_key, metadata, is_scheduled_query):
def __init__(self, query, query_hash, data_source_id, user_id, is_api_key, metadata, is_scheduled_query):
self.job = get_current_job()
self.query = query
self.query_hash = query_hash or gen_query_hash(query)
self.data_source_id = data_source_id
self.metadata = metadata
self.data_source = self._load_data_source()
Expand All @@ -162,7 +164,6 @@ def __init__(self, query, data_source_id, user_id, is_api_key, metadata, is_sche

# Close DB connection to prevent holding a connection for a long time while the query is executing.
models.db.session.close()
self.query_hash = gen_query_hash(self.query)
self.is_scheduled_query = is_scheduled_query
if self.is_scheduled_query:
# Load existing tracker or create a new one if the job was created before code update:
Expand Down Expand Up @@ -271,10 +272,12 @@ def execute_query(
user_id=None,
scheduled_query_id=None,
is_api_key=False,
query_hash=None,
):
try:
return QueryExecutor(
query,
query_hash,
data_source_id,
user_id,
is_api_key,
Expand Down
1 change: 1 addition & 0 deletions redash/tasks/queries/maintenance.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ def refresh_queries():
query_text = _apply_auto_limit(query_text, query)
enqueue_query(
query_text,
query.query_hash,
query.data_source,
query.user_id,
scheduled_query=query,
Expand Down
6 changes: 5 additions & 1 deletion redash/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def slugify(s):
return re.sub(r"[^a-z0-9_\-]+", "-", s.lower())


def gen_query_hash(sql):
def gen_query_hash(sql, parameters={}, auto_limit=False):
"""Return hash of the given query after stripping all comments, line breaks
and multiple spaces.

Expand All @@ -60,6 +60,10 @@ def gen_query_hash(sql):
"""
sql = COMMENTS_REGEX.sub("", sql)
sql = "".join(sql.split())

query_parameters = {"parameters": parameters, "auto_limit": auto_limit}
sql += "\n" + json.dumps(query_parameters, sort_keys=True, separators=(",", ":"))

return hashlib.md5(sql.encode("utf-8")).hexdigest()


Expand Down
2 changes: 1 addition & 1 deletion tests/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ def __call__(self):
runtime=1,
retrieved_at=utcnow,
query_text="SELECT 1",
query_hash=gen_query_hash("SELECT 1"),
query_hash=gen_query_hash("SELECT 1", {}, False),
data_source=data_source_factory.create,
org_id=1,
)
Expand Down
2 changes: 1 addition & 1 deletion tests/models/test_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ def setUp(self):
super(TestQueryUpdateLatestResult, self).setUp()
self.data_source = self.factory.data_source
self.query = "SELECT 1"
self.query_hash = gen_query_hash(self.query)
self.query_hash = gen_query_hash(self.query, {}, False)
self.runtime = 123
self.utcnow = utcnow()
self.data = {"columns": {}, "rows": []}
Expand Down
12 changes: 6 additions & 6 deletions tests/models/test_query_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,30 +12,30 @@ def test_get_latest_returns_none_if_not_found(self):

def test_get_latest_returns_when_found(self):
qr = self.factory.create_query_result()
found_query_result = models.QueryResult.get_latest(qr.data_source, qr.query_text, 60)
found_query_result = models.QueryResult.get_latest(qr.data_source, qr.query_hash, 60)

self.assertEqual(qr, found_query_result)

def test_get_latest_doesnt_return_query_from_different_data_source(self):
qr = self.factory.create_query_result()
data_source = self.factory.create_data_source()
found_query_result = models.QueryResult.get_latest(data_source, qr.query_text, 60)
found_query_result = models.QueryResult.get_latest(data_source, qr.query_hash, 60)

self.assertIsNone(found_query_result)

def test_get_latest_doesnt_return_if_ttl_expired(self):
yesterday = utcnow() - datetime.timedelta(days=1)
qr = self.factory.create_query_result(retrieved_at=yesterday)

found_query_result = models.QueryResult.get_latest(qr.data_source, qr.query_text, max_age=60)
found_query_result = models.QueryResult.get_latest(qr.data_source, qr.query_hash, max_age=60)

self.assertIsNone(found_query_result)

def test_get_latest_returns_if_ttl_not_expired(self):
yesterday = utcnow() - datetime.timedelta(seconds=30)
qr = self.factory.create_query_result(retrieved_at=yesterday)

found_query_result = models.QueryResult.get_latest(qr.data_source, qr.query_text, max_age=120)
found_query_result = models.QueryResult.get_latest(qr.data_source, qr.query_hash, max_age=120)

self.assertEqual(found_query_result, qr)

Expand All @@ -44,7 +44,7 @@ def test_get_latest_returns_the_most_recent_result(self):
self.factory.create_query_result(retrieved_at=yesterday)
qr = self.factory.create_query_result()

found_query_result = models.QueryResult.get_latest(qr.data_source, qr.query_text, 60)
found_query_result = models.QueryResult.get_latest(qr.data_source, qr.query_hash, 60)

self.assertEqual(found_query_result.id, qr.id)

Expand All @@ -54,7 +54,7 @@ def test_get_latest_returns_the_last_cached_result_for_negative_ttl(self):

yesterday = utcnow() + datetime.timedelta(days=-1)
qr = self.factory.create_query_result(retrieved_at=yesterday)
found_query_result = models.QueryResult.get_latest(qr.data_source, qr.query_text, -1)
found_query_result = models.QueryResult.get_latest(qr.data_source, qr.query_hash, -1)

self.assertEqual(found_query_result.id, qr.id)

Expand Down
13 changes: 6 additions & 7 deletions tests/query_runner/test_basesql_queryrunner.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import unittest

from redash.query_runner import BaseQueryRunner, BaseSQLQueryRunner
from redash.utils import gen_query_hash


class TestBaseSQLQueryRunner(unittest.TestCase):
Expand Down Expand Up @@ -118,16 +117,16 @@ def test_apply_auto_limit_inline_comment(self):

def test_gen_query_hash_baseSQL(self):
origin_query_text = "select *"
expected_query_text = "select * LIMIT 1000"
base_runner = BaseQueryRunner({})
self.assertEqual(
base_runner.gen_query_hash(expected_query_text), self.query_runner.gen_query_hash(origin_query_text, True)
)
base_hash = self.query_runner.gen_query_hash(origin_query_text)
self.assertEqual(base_hash, self.query_runner.gen_query_hash(origin_query_text, {}, False))
self.assertNotEqual(base_hash, self.query_runner.gen_query_hash(origin_query_text, {}, True))

def test_gen_query_hash_NoneSQL(self):
origin_query_text = "select *"
base_runner = BaseQueryRunner({})
self.assertEqual(gen_query_hash(origin_query_text), base_runner.gen_query_hash(origin_query_text, True))
self.assertEqual(
base_runner.gen_query_hash(origin_query_text), base_runner.gen_query_hash(origin_query_text, {}, True)
)


if __name__ == "__main__":
Expand Down