Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added client/app/assets/images/db-logos/trino.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
162 changes: 162 additions & 0 deletions redash/query_runner/trino.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
import logging
import sys

from redash.query_runner import *
from redash.utils import MaxQueryResultRowsExpection, json_dumps, json_loads

logger = logging.getLogger(__name__)

try:
import trino
from trino.exceptions import DatabaseError

enabled = True
except ImportError:
enabled = False

TRINO_TYPES_MAPPING = {
"boolean": TYPE_BOOLEAN,

"tinyint": TYPE_INTEGER,
"smallint": TYPE_INTEGER,
"integer": TYPE_INTEGER,
"long": TYPE_INTEGER,
"bigint": TYPE_INTEGER,

"float": TYPE_FLOAT,
"real": TYPE_FLOAT,
"double": TYPE_FLOAT,

"decimal": TYPE_INTEGER,

"varchar": TYPE_STRING,
"char": TYPE_STRING,
"string": TYPE_STRING,
"json": TYPE_STRING,

"date": TYPE_DATE,
"timestamp": TYPE_DATETIME,
}


class Trino(BaseQueryRunner):
noop_query = "SELECT 1"
should_annotate_query = False

@classmethod
def configuration_schema(cls):
return {
"type": "object",
"properties": {
"protocol": {"type": "string", "default": "http"},
"host": {"type": "string"},
"port": {"type": "number"},
"username": {"type": "string"},
"password": {"type": "string"},
"catalog": {"type": "string"},
"schema": {"type": "string"},
},
"order": [
"protocol",
"host",
"port",
"username",
"password",
"catalog",
"schema",
],
"required": ["host", "username"],
"secret": ["password"]
}

@classmethod
def enabled(cls):
return enabled

@classmethod
def type(cls):
return "trino"

def get_schema(self, get_stats=False):
query = """
SELECT table_schema, table_name, column_name
FROM information_schema.columns
WHERE table_schema NOT IN ('pg_catalog', 'information_schema')
"""
results, error = self.run_query(query, None)

if error is not None:
self._handle_run_query_error(error)

results = json_loads(results)
schema = {}
for row in results["rows"]:
table_name = f'{row["table_schema"]}.{row["table_name"]}'

if table_name not in schema:
schema[table_name] = {"name": table_name, "columns": []}

schema[table_name]["columns"].append(row["column_name"])

return list(schema.values())

def run_query(self, query, user, org=None):
if self.configuration.get("password"):
auth = trino.auth.BasicAuthentication(
username=self.configuration.get("username"),
password=self.configuration.get("password")
)
else:
auth = trino.constants.DEFAULT_AUTH
connection = trino.dbapi.connect(
http_scheme=self.configuration.get("protocol", "http"),
host=self.configuration.get("host", ""),
port=self.configuration.get("port", 8080),
catalog=self.configuration.get("catalog", "hive"),
schema=self.configuration.get("schema", "default"),
user=self.configuration.get("username"),
auth=auth
)

cursor = connection.cursor()

try:
cursor.execute(query)
results = cursor.fetchall()
max_query_result_rows = org.max_query_result_rows if org else sys.maxsize
if len(results) > max_query_result_rows:
raise MaxQueryResultRowsExpection(max_query_result_rows)
description = cursor.description
columns = self.fetch_columns([
(c[0], TRINO_TYPES_MAPPING.get(c[1], None)) for c in description
])
rows = [
dict(zip([c["name"] for c in columns], r))
for r in results
]
data = {
"columns": columns,
"rows": rows
}
json_data = json_dumps(data)
error = None
except DatabaseError as db:
json_data = None
default_message = "Unspecified DatabaseError: {0}".format(str(db))
if isinstance(db.args[0], dict):
message = db.args[0].get("failureInfo", {"message", None}).get("message")
else:
message = None
error = default_message if message is None else message
except MaxQueryResultRowsExpection as e:
cursor.cancel()
json_data = None
error = str(e)
except (KeyboardInterrupt, InterruptException, JobTimeoutException):
cursor.cancel()
raise

return json_data, error


register(Trino)
1 change: 1 addition & 0 deletions redash/settings/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ def email_server_is_configured():
"redash.query_runner.elasticsearch",
"redash.query_runner.amazon_elasticsearch",
"redash.query_runner.presto",
"redash.query_runner.trino",
"redash.query_runner.databricks",
"redash.query_runner.hive_ds",
"redash.query_runner.impala_ds",
Expand Down
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ user-agents==2.0
maxminddb-geolite2==2018.703
pypd==1.1.0
disposable-email-domains>=0.0.52
gevent==1.4.0
gevent==22.10.2
sshtunnel==0.1.5
supervisor==4.1.0
supervisor_checks==0.8.1
Expand All @@ -68,3 +68,4 @@ numpy==1.19.2
# Uncomment the requirement for ldap3 if using ldap.
# It is not included by default because of the GPL license conflict.
# ldap3==2.2.4
protobuf==3.19.0
3 changes: 2 additions & 1 deletion requirements_all_ds.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,5 @@ pydgraph==2.0.2
azure-kusto-data==0.0.35
pyexasol==0.12.0
python-rapidjson==0.8.0
pyodbc==4.0.28
pyodbc==4.0.28
trino~=0.305
2 changes: 1 addition & 1 deletion requirements_bundles.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@
# It's automatically installed when running npm run bundle

# These can be removed when upgrading to Python 3.x
importlib-metadata>=1.6 # remove when on 3.8
importlib-metadata==4.13.0 # remove when on 3.8
importlib_resources==1.5 # remove when on 3.9