diff --git a/client/app/assets/images/db-logos/trino.png b/client/app/assets/images/db-logos/trino.png new file mode 100644 index 0000000..904db40 Binary files /dev/null and b/client/app/assets/images/db-logos/trino.png differ diff --git a/redash/query_runner/trino.py b/redash/query_runner/trino.py new file mode 100644 index 0000000..1ddce1f --- /dev/null +++ b/redash/query_runner/trino.py @@ -0,0 +1,162 @@ +import logging +import sys + +from redash.query_runner import * +from redash.utils import MaxQueryResultRowsExpection, json_dumps, json_loads + +logger = logging.getLogger(__name__) + +try: + import trino + from trino.exceptions import DatabaseError + + enabled = True +except ImportError: + enabled = False + +TRINO_TYPES_MAPPING = { + "boolean": TYPE_BOOLEAN, + + "tinyint": TYPE_INTEGER, + "smallint": TYPE_INTEGER, + "integer": TYPE_INTEGER, + "long": TYPE_INTEGER, + "bigint": TYPE_INTEGER, + + "float": TYPE_FLOAT, + "real": TYPE_FLOAT, + "double": TYPE_FLOAT, + + "decimal": TYPE_INTEGER, + + "varchar": TYPE_STRING, + "char": TYPE_STRING, + "string": TYPE_STRING, + "json": TYPE_STRING, + + "date": TYPE_DATE, + "timestamp": TYPE_DATETIME, +} + + +class Trino(BaseQueryRunner): + noop_query = "SELECT 1" + should_annotate_query = False + + @classmethod + def configuration_schema(cls): + return { + "type": "object", + "properties": { + "protocol": {"type": "string", "default": "http"}, + "host": {"type": "string"}, + "port": {"type": "number"}, + "username": {"type": "string"}, + "password": {"type": "string"}, + "catalog": {"type": "string"}, + "schema": {"type": "string"}, + }, + "order": [ + "protocol", + "host", + "port", + "username", + "password", + "catalog", + "schema", + ], + "required": ["host", "username"], + "secret": ["password"] + } + + @classmethod + def enabled(cls): + return enabled + + @classmethod + def type(cls): + return "trino" + + def get_schema(self, get_stats=False): + query = """ + SELECT table_schema, table_name, column_name + FROM information_schema.columns + WHERE table_schema NOT IN ('pg_catalog', 'information_schema') + """ + results, error = self.run_query(query, None) + + if error is not None: + self._handle_run_query_error(error) + + results = json_loads(results) + schema = {} + for row in results["rows"]: + table_name = f'{row["table_schema"]}.{row["table_name"]}' + + if table_name not in schema: + schema[table_name] = {"name": table_name, "columns": []} + + schema[table_name]["columns"].append(row["column_name"]) + + return list(schema.values()) + + def run_query(self, query, user, org=None): + if self.configuration.get("password"): + auth = trino.auth.BasicAuthentication( + username=self.configuration.get("username"), + password=self.configuration.get("password") + ) + else: + auth = trino.constants.DEFAULT_AUTH + connection = trino.dbapi.connect( + http_scheme=self.configuration.get("protocol", "http"), + host=self.configuration.get("host", ""), + port=self.configuration.get("port", 8080), + catalog=self.configuration.get("catalog", "hive"), + schema=self.configuration.get("schema", "default"), + user=self.configuration.get("username"), + auth=auth + ) + + cursor = connection.cursor() + + try: + cursor.execute(query) + results = cursor.fetchall() + max_query_result_rows = org.max_query_result_rows if org else sys.maxsize + if len(results) > max_query_result_rows: + raise MaxQueryResultRowsExpection(max_query_result_rows) + description = cursor.description + columns = self.fetch_columns([ + (c[0], TRINO_TYPES_MAPPING.get(c[1], None)) for c in description + ]) + rows = [ + dict(zip([c["name"] for c in columns], r)) + for r in results + ] + data = { + "columns": columns, + "rows": rows + } + json_data = json_dumps(data) + error = None + except DatabaseError as db: + json_data = None + default_message = "Unspecified DatabaseError: {0}".format(str(db)) + if isinstance(db.args[0], dict): + message = db.args[0].get("failureInfo", {"message", None}).get("message") + else: + message = None + error = default_message if message is None else message + except MaxQueryResultRowsExpection as e: + cursor.cancel() + json_data = None + error = str(e) + except (KeyboardInterrupt, InterruptException, JobTimeoutException): + cursor.cancel() + raise + + return json_data, error + + +register(Trino) diff --git a/redash/settings/__init__.py b/redash/settings/__init__.py index 7f59775..10547a4 100644 --- a/redash/settings/__init__.py +++ b/redash/settings/__init__.py @@ -340,6 +340,7 @@ def email_server_is_configured(): "redash.query_runner.elasticsearch", "redash.query_runner.amazon_elasticsearch", "redash.query_runner.presto", + "redash.query_runner.trino", "redash.query_runner.databricks", "redash.query_runner.hive_ds", "redash.query_runner.impala_ds", diff --git a/requirements.txt b/requirements.txt index e8fd44e..c458dc5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -55,7 +55,7 @@ user-agents==2.0 maxminddb-geolite2==2018.703 pypd==1.1.0 disposable-email-domains>=0.0.52 -gevent==1.4.0 +gevent==22.10.2 sshtunnel==0.1.5 supervisor==4.1.0 supervisor_checks==0.8.1 @@ -68,3 +68,4 @@ numpy==1.19.2 # Uncomment the requirement for ldap3 if using ldap. # It is not included by default because of the GPL license conflict. # ldap3==2.2.4 +protobuf==3.19.0 diff --git a/requirements_all_ds.txt b/requirements_all_ds.txt index 5e24e17..3ca249c 100644 --- a/requirements_all_ds.txt +++ b/requirements_all_ds.txt @@ -34,4 +34,5 @@ pydgraph==2.0.2 azure-kusto-data==0.0.35 pyexasol==0.12.0 python-rapidjson==0.8.0 -pyodbc==4.0.28 \ No newline at end of file +pyodbc==4.0.28 +trino~=0.305 diff --git a/requirements_bundles.txt b/requirements_bundles.txt index 3f57a20..bbdaf8e 100644 --- a/requirements_bundles.txt +++ b/requirements_bundles.txt @@ -4,5 +4,5 @@ # It's automatically installed when running npm run bundle # These can be removed when upgrading to Python 3.x -importlib-metadata>=1.6 # remove when on 3.8 +importlib-metadata==4.13.0 # remove when on 3.8 importlib_resources==1.5 # remove when on 3.9