Skip to content

Commit

Permalink
query cancellation
Browse files Browse the repository at this point in the history
  • Loading branch information
hrl20 committed Apr 17, 2024
1 parent da51d43 commit 5d7d1d5
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 11 deletions.
7 changes: 5 additions & 2 deletions dbt/adapters/duckdb/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,11 @@ def close(cls, connection: Connection) -> Connection:
connection = super(SQLConnectionManager, cls).close(connection)
return connection

def cancel(self, connection):
pass
def cancel(self, connection: Connection):
logger.debug("cancelling query on connection {}. Details: {}".format(connection.name, connection))
self._ENV.cancel(connection)
logger.debug("query cancelled on connection {}".format(connection.name))


@contextmanager
def exception_handler(self, sql: str, connection_name="master"):
Expand Down
9 changes: 7 additions & 2 deletions dbt/adapters/duckdb/environments/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from ..plugins import BasePlugin
from ..utils import SourceConfig
from ..utils import TargetConfig
from dbt.contracts.connection import AdapterResponse
from dbt.contracts.connection import AdapterResponse, Connection
from dbt.exceptions import DbtRuntimeError


Expand Down Expand Up @@ -114,9 +114,14 @@ def store_relation(self, plugin_name: str, target_config: TargetConfig) -> None:

def get_binding_char(self) -> str:
return "?"

def supports_comments(self) -> bool:
return self._supports_comments

@classmethod
@abc.abstractmethod
def cancel(cls, connection: Connection):
pass

@classmethod
def initialize_db(
Expand Down
6 changes: 5 additions & 1 deletion dbt/adapters/duckdb/environments/buenavista.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from . import Environment
from .. import credentials
from .. import utils
from dbt.contracts.connection import AdapterResponse
from dbt.contracts.connection import AdapterResponse, Connection


class BVEnvironment(Environment):
Expand All @@ -30,6 +30,10 @@ def handle(self):
cursor = self.initialize_cursor(self.creds, conn.cursor())
cursor.close()
return conn

@classmethod
def cancel(cls, connection: Connection):
pass

def get_binding_char(self) -> str:
return "%s"
Expand Down
5 changes: 4 additions & 1 deletion dbt/adapters/duckdb/environments/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from . import Environment
from .. import credentials
from .. import utils
from dbt.contracts.connection import AdapterResponse
from dbt.contracts.connection import AdapterResponse, Connection
from dbt.exceptions import DbtRuntimeError


Expand Down Expand Up @@ -58,6 +58,9 @@ def notify_closed(self):
if self.handle_count == 0 and not self._keep_open:
self.close()

def cancel(cls, connection: Connection):
connection.handle.cursor().interrupt()

def handle(self):
# Extensions/settings need to be configured per cursor
with self.lock:
Expand Down
9 changes: 4 additions & 5 deletions dbt/adapters/duckdb/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
TEMP_SCHEMA_NAME = "temp_schema_name"
DEFAULT_TEMP_SCHEMA_NAME = "dbt_temp"


if TYPE_CHECKING:
import agate

Expand All @@ -51,10 +52,6 @@ class DuckDBAdapter(SQLAdapter):
def date_function(cls) -> str:
return "now()"

@classmethod
def is_cancelable(cls) -> bool:
return False

def debug_query(self):
self.execute("select 1 as id")

Expand All @@ -63,7 +60,7 @@ def is_motherduck(self):
return self.config.credentials.is_motherduck

@available
def convert_datetimes_to_strs(self, table: "agate.Table") -> "agate.Table":
def convert_datetimes_to_strs(self, table: agate.Table) -> agate.Table:
import agate

for column in table.columns:
Expand Down Expand Up @@ -240,6 +237,7 @@ def render_column_constraint(cls, constraint: ColumnLevelConstraint) -> Optional
return f"references {constraint.expression}"
else:
return super().render_column_constraint(constraint)


def _clean_up_temp_relation_for_incremental(self, config):
if self.is_motherduck():
Expand All @@ -260,6 +258,7 @@ def pre_model_hook(self, config: Any) -> None:
self._clean_up_temp_relation_for_incremental(config)
super().pre_model_hook(config)


@available
def get_temp_relation_path(self, model: Any):
"""This is a workaround to enable incremental models on MotherDuck because it
Expand Down

0 comments on commit 5d7d1d5

Please sign in to comment.