Python SDK: SQL Compute Module

This compute module provides additional functionality to run SQL computations on tabular input data. The SDK quick start tutorial provides an example of how this module can be used.

When adding SQL compute nodes to your data clean room, you first need to provide an enclave specification for the SQL worker enclave to the DataRoomBuilder.

The currently available enclave specifications for SQL workers are listed in the documentation of the main SDK package.


View Source
"""
.. include:: ../../../decentriq_platform_docs/sql_getting_started.md
---
"""
__docformat__ = "restructuredtext"

from .compute import SqlCompute, SqlSchemaVerifier
from .proto.compute_sql_pb2 import (
    PrimitiveType,
)
from .helpers import (
    TabularDataNodeBuilder,
    read_sql_query_result_as_string,
    read_input_csv_file,
    read_input_csv_string,
    upload_and_publish_tabular_dataset,
)


__all__ = [
    "SqlCompute",
    "TabularDataNodeBuilder",
    "PrimitiveType",
    "read_input_csv_file",
    "read_input_csv_string",
    "upload_and_publish_tabular_dataset",
    "read_sql_query_result_as_string",
    "SqlSchemaVerifier",
]
#   class SqlCompute(decentriq_platform.node.Node):
View Source
class SqlCompute(Node):
    """
    Computation node to execute an SQL query.
    """

    def __init__(
            self,
            name: str,
            sql_statement: str,
            dependencies: List[(str, str)],
            *,
            privacy_settings: Optional[PrivacySettings] = None,
            constraints: Optional[List[Constraint]] = None,
    ) -> None:
        sql_worker_configuration = SqlWorkerConfiguration(
            computation=ComputationConfiguration(
                sqlStatement=sql_statement,
                privacySettings=privacy_settings,
                constraints=constraints,
                tableDependencyMappings=[
                    TableDependencyMapping(table=table, dependency=node_id)
                    for table, node_id in dependencies
                ]
            )
        )
        config = serialize_length_delimited(sql_worker_configuration)

        super().__init__(
            name,
            config=config,
            enclave_type="decentriq.sql-worker",
            dependencies=[node_id for _, node_id in dependencies],
            output_format=ComputeNodeFormat.ZIP
        )

Computation node to execute an SQL query.

#   SqlCompute( name: str, sql_statement: str, dependencies: 'List[str, str]', *, privacy_settings: Optional[compute_sql_pb2.PrivacySettings] = None, constraints: Optional[List[compute_sql_pb2.Constraint]] = None )
View Source
    def __init__(
            self,
            name: str,
            sql_statement: str,
            dependencies: List[(str, str)],
            *,
            privacy_settings: Optional[PrivacySettings] = None,
            constraints: Optional[List[Constraint]] = None,
    ) -> None:
        sql_worker_configuration = SqlWorkerConfiguration(
            computation=ComputationConfiguration(
                sqlStatement=sql_statement,
                privacySettings=privacy_settings,
                constraints=constraints,
                tableDependencyMappings=[
                    TableDependencyMapping(table=table, dependency=node_id)
                    for table, node_id in dependencies
                ]
            )
        )
        config = serialize_length_delimited(sql_worker_configuration)

        super().__init__(
            name,
            config=config,
            enclave_type="decentriq.sql-worker",
            dependencies=[node_id for _, node_id in dependencies],
            output_format=ComputeNodeFormat.ZIP
        )
#   class TabularDataNodeBuilder:
View Source
class TabularDataNodeBuilder:
    """
    Helper class to construct the triplet of nodes consisting of a data node to
    store tabular input data, as well as a schema validation computation.

    In a data clean room computations and the data they depend on are expressed in terms
    of graphs where computations are represented by *compute nodes* and input datasets are
    represented by *data nodes*.
    With tabular input data, it is possible to validate the input data and check whether it
    corresponds to a pre-defined schema. This validation is a computation in itself so in
    order to validate the input data, we need to add an additional compute node on top of the
    data node. Because it must be possible to trigger this validation *while restricting access
    to the resulting data*, another special node, the *noop node* needs to be added.

    This class will add the necessary nodes to your data clean room.

    After having constructed an object of this class, call its `add_to_builder` method
    and pass it a `DataRoomBuilder` instance. This will add the necessary nodes and required
    user permissions to the builder instance.

    Data should be published to the node with id `input_node_id` and be read from `output_node_id`.
    The id of the validation computation can be accessed using the field `validation_computation_id`.

    This class uses a special convention for the ids of the nodes added to the data room based on
    the table name given to the builder. Assuming you use `"my_table"` as the `table_name` when
    instantiating the `TabularDataNodeBuilder`, then the following three nodes will be added:

    1. `@table/my_table/dataset` - the data node to which you can upload data.
    2. `my_table` - the id of the compute node that verifies the schema of the data.
        Subsequent SQL compute nodes should read data from this node.
    3. `@table/my_table/validation` - the id of a special trigger node that can be used to trigger the validation computation.

    The helper function `upload_and_publish_tabular_dataset` will upload, publish, and validate your
    data automatically, without you having to worry about internal naming.
    """
    def __init__(
            self,
            table_name: str,
            schema: List[Tuple[str, Any, bool]],
            *,
            is_required: bool = False
        ):
        """
        Create a `TabularDataNodeBuilder`.

        **Parameters**:
        - `table_name`: What your dataset should be called. This is the name you will later use in
            your SQL queries.
        - `schema`: The list of columns. This is a list of tuples, each containing the name of the column,
            the data type (`decentriq_platform.sql.proto.PrimitiveType`), and whether the column is nullable (can have empty values).
            The data type is an enum like object with values `PrimitiveType.STRING`, `PrimitiveType.INT64`,
            `PrimitiveType.FLOAT64`.
        - `is_required`: Whether the dataset needs to be present for computations to be triggered.
        """
        self.table_name = table_name
        self.is_required = is_required
        self._leaf_node_id = _data_node_id(table_name)
        self._verifier_node_id = table_name

        self.validation_computation_id = _noop_node_id(table_name)
        """The id of the computation that will perform the data validation."""

        self._verifier = SqlSchemaVerifier(
            self._verifier_node_id,
            input_data_node=self._leaf_node_id,
            columns=schema,
        )

        self._noop = Noop(self.validation_computation_id, dependencies=[self._verifier_node_id])

    def add_to_builder(
            self,
            builder: DataRoomBuilder,
            authentication: AuthenticationMethod,
            users: List[str]
    ) -> Tuple[str, str]:
        """
        Configure the given `DataRoomBuilder` to build the final data clean room with the
        necessary compute and data nodes.
        This call will also add the necessary permissions to the data room
        builder that let each user in the list of users perform the following
        tasks:

        1. Upload data to the data node.
        2. Use the data in downstream computations while making sure
           the schema is valid.
        3. Trigger the schema validation step separately as its own computation.

        **Parameters**:
        - `builder`: The builder object to which to add the data and compute as well as
            the permissions.
        - `authentication`: The authentication method used to authenticate the users
            from within the enclave.
        - `users`: A list of email addresses that will be given permissions both for
            the validation of the data as well as the uploading of data.

        **Returns**:
        A tuple containing, as the first element, the id of the data node, and,
        as a second element, the id of the verification computation.
        The data node id must be used when publishing data to the data node.
        The id of the verification computation must be used when depending on
        this dataset from downstream computations (e.g. SQL queries).
        """
        builder.add_data_node(
            self._leaf_node_id,
            is_required=self.is_required,
            node_id=self._leaf_node_id
        )
        builder.add_compute_node(self._verifier, node_id=self._verifier_node_id)
        builder.add_compute_node(
            self._noop,
            node_id=self.validation_computation_id
        )
        for email in users:
            builder.add_user_permission(
                email=email,
                authentication_method=authentication,
                permissions=[
                    self.validation_permission,
                    self.leaf_crud_permission
                ]
            )
        return (self._leaf_node_id, self._verifier_node_id)

    @property
    def validation_permission(self) -> Permission:
        """The permission required to trigger the data validation."""
        return Permissions.execute_compute(self.validation_computation_id)

    @property
    def leaf_crud_permission(self) -> Permission:
        """The permission required to upload the raw, non-validated data."""
        return Permissions.leaf_crud(self._leaf_node_id)

    @property
    def input_node_id(self) -> str:
        """The node id to which data should be uploaded."""
        return self._leaf_node_id

    @property
    def output_node_id(self) -> str:
        """
        The id of the node from which the validated data can be read.
        This id needs to be specified in the list of dependencies of any
        downstream computations (such as SQL queries).
        """
        return self._verifier_node_id

Helper class to construct the triplet of nodes consisting of a data node to store tabular input data, as well as a schema validation computation.

In a data clean room computations and the data they depend on are expressed in terms of graphs where computations are represented by compute nodes and input datasets are represented by data nodes. With tabular input data, it is possible to validate the input data and check whether it corresponds to a pre-defined schema. This validation is a computation in itself so in order to validate the input data, we need to add an additional compute node on top of the data node. Because it must be possible to trigger this validation while restricting access to the resulting data, another special node, the noop node needs to be added.

This class will add the necessary nodes to your data clean room.

After having constructed an object of this class, call its add_to_builder method and pass it a DataRoomBuilder instance. This will add the necessary nodes and required user permissions to the builder instance.

Data should be published to the node with id input_node_id and be read from output_node_id. The id of the validation computation can be accessed using the field validation_computation_id.

This class uses a special convention for the ids of the nodes added to the data room based on the table name given to the builder. Assuming you use "my_table" as the table_name when instantiating the TabularDataNodeBuilder, then the following three nodes will be added:

  1. @table/my_table/dataset - the data node to which you can upload data.
  2. my_table - the id of the compute node that verifies the schema of the data. Subsequent SQL compute nodes should read data from this node.
  3. @table/my_table/validation - the id of a special trigger node that can be used to trigger the validation computation.

The helper function upload_and_publish_tabular_dataset will upload, publish, and validate your data automatically, without you having to worry about internal naming.

#   TabularDataNodeBuilder( table_name: str, schema: List[Tuple[str, Any, bool]], *, is_required: bool = False )
View Source
    def __init__(
            self,
            table_name: str,
            schema: List[Tuple[str, Any, bool]],
            *,
            is_required: bool = False
        ):
        """
        Create a `TabularDataNodeBuilder`.

        **Parameters**:
        - `table_name`: What your dataset should be called. This is the name you will later use in
            your SQL queries.
        - `schema`: The list of columns. This is a list of tuples, each containing the name of the column,
            the data type (`decentriq_platform.sql.proto.PrimitiveType`), and whether the column is nullable (can have empty values).
            The data type is an enum like object with values `PrimitiveType.STRING`, `PrimitiveType.INT64`,
            `PrimitiveType.FLOAT64`.
        - `is_required`: Whether the dataset needs to be present for computations to be triggered.
        """
        self.table_name = table_name
        self.is_required = is_required
        self._leaf_node_id = _data_node_id(table_name)
        self._verifier_node_id = table_name

        self.validation_computation_id = _noop_node_id(table_name)
        """The id of the computation that will perform the data validation."""

        self._verifier = SqlSchemaVerifier(
            self._verifier_node_id,
            input_data_node=self._leaf_node_id,
            columns=schema,
        )

        self._noop = Noop(self.validation_computation_id, dependencies=[self._verifier_node_id])

Create a TabularDataNodeBuilder.

Parameters:

  • table_name: What your dataset should be called. This is the name you will later use in your SQL queries.
  • schema: The list of columns. This is a list of tuples, each containing the name of the column, the data type (decentriq_platform.sql.proto.PrimitiveType), and whether the column is nullable (can have empty values). The data type is an enum like object with values PrimitiveType.STRING, PrimitiveType.INT64, PrimitiveType.FLOAT64.
  • is_required: Whether the dataset needs to be present for computations to be triggered.
#   validation_computation_id

The id of the computation that will perform the data validation.

#   def add_to_builder( self, builder: decentriq_platform.builders.DataRoomBuilder, authentication: data_room_pb2.AuthenticationMethod, users: List[str] ) -> Tuple[str, str]:
View Source
    def add_to_builder(
            self,
            builder: DataRoomBuilder,
            authentication: AuthenticationMethod,
            users: List[str]
    ) -> Tuple[str, str]:
        """
        Configure the given `DataRoomBuilder` to build the final data clean room with the
        necessary compute and data nodes.
        This call will also add the necessary permissions to the data room
        builder that let each user in the list of users perform the following
        tasks:

        1. Upload data to the data node.
        2. Use the data in downstream computations while making sure
           the schema is valid.
        3. Trigger the schema validation step separately as its own computation.

        **Parameters**:
        - `builder`: The builder object to which to add the data and compute as well as
            the permissions.
        - `authentication`: The authentication method used to authenticate the users
            from within the enclave.
        - `users`: A list of email addresses that will be given permissions both for
            the validation of the data as well as the uploading of data.

        **Returns**:
        A tuple containing, as the first element, the id of the data node, and,
        as a second element, the id of the verification computation.
        The data node id must be used when publishing data to the data node.
        The id of the verification computation must be used when depending on
        this dataset from downstream computations (e.g. SQL queries).
        """
        builder.add_data_node(
            self._leaf_node_id,
            is_required=self.is_required,
            node_id=self._leaf_node_id
        )
        builder.add_compute_node(self._verifier, node_id=self._verifier_node_id)
        builder.add_compute_node(
            self._noop,
            node_id=self.validation_computation_id
        )
        for email in users:
            builder.add_user_permission(
                email=email,
                authentication_method=authentication,
                permissions=[
                    self.validation_permission,
                    self.leaf_crud_permission
                ]
            )
        return (self._leaf_node_id, self._verifier_node_id)

Configure the given DataRoomBuilder to build the final data clean room with the necessary compute and data nodes. This call will also add the necessary permissions to the data room builder that let each user in the list of users perform the following tasks:

  1. Upload data to the data node.
  2. Use the data in downstream computations while making sure the schema is valid.
  3. Trigger the schema validation step separately as its own computation.

Parameters:

  • builder: The builder object to which to add the data and compute as well as the permissions.
  • authentication: The authentication method used to authenticate the users from within the enclave.
  • users: A list of email addresses that will be given permissions both for the validation of the data as well as the uploading of data.

Returns: A tuple containing, as the first element, the id of the data node, and, as a second element, the id of the verification computation. The data node id must be used when publishing data to the data node. The id of the verification computation must be used when depending on this dataset from downstream computations (e.g. SQL queries).

#   validation_permission: data_room_pb2.Permission

The permission required to trigger the data validation.

#   leaf_crud_permission: data_room_pb2.Permission

The permission required to upload the raw, non-validated data.

#   input_node_id: str

The node id to which data should be uploaded.

#   output_node_id: str

The id of the node from which the validated data can be read. This id needs to be specified in the list of dependencies of any downstream computations (such as SQL queries).

#   PrimitiveType = <google.protobuf.internal.enum_type_wrapper.EnumTypeWrapper...
#   def read_input_csv_file( path: str, /, *, has_header: bool = True, check_header: bool = True, encoding='utf-8', **kwargs ) -> _io.BytesIO:
View Source
def read_input_csv_file(
        path: str,
        /, *,
        has_header: bool = True,
        check_header: bool = True,
        encoding="utf-8",
        **kwargs
) -> io.BytesIO:
    """
    Read CSV from a file and turn it into a bytes array of the correct format so that it can be uploaded
    to the Decentriq platform.

    **Parameters**:
    - `path`: The path to the CSV file.
    - `has_header`: Whether the string contains a header row.
    - `check_header`: Whether the function should try to determine whether the file has a header.
        If the file has a header row but you didn't set the `has_header` flag, an
        exception will be raised. If you're sure that the way you use the function
        is correct, you can disable this check using this parameter.
    - `encoding`: The encoding of the CSV file. If you wrote the CSV file using a library like pandas,
        you need to check the documentation to see what encoding they use by default
        when writing files (likely `"utf-8"` in which case this can be left at its default value).
    - `delimiter`: What delimiter is used in the the CSV file. Default is the comma.
    - `**kwargs`: Additional keyword arguments passed to the Python CSV parser. Refer to the
        [official documentation](https://docs.python.org/3/library/csv.html#csv-fmt-params)
        for a list of supported arguments.

    **Returns**:
    A BytesIO object that can be passed to the methods resposible for uploading data.
    """
    with open(path, 'r', encoding=encoding) as csvfile:
        return _read_input_csv(
            csvfile,
            has_header=has_header,
            check_header=check_header,
            **kwargs,
        )

Read CSV from a file and turn it into a bytes array of the correct format so that it can be uploaded to the Decentriq platform.

Parameters:

  • path: The path to the CSV file.
  • has_header: Whether the string contains a header row.
  • check_header: Whether the function should try to determine whether the file has a header. If the file has a header row but you didn't set the has_header flag, an exception will be raised. If you're sure that the way you use the function is correct, you can disable this check using this parameter.
  • encoding: The encoding of the CSV file. If you wrote the CSV file using a library like pandas, you need to check the documentation to see what encoding they use by default when writing files (likely "utf-8" in which case this can be left at its default value).
  • delimiter: What delimiter is used in the the CSV file. Default is the comma.
  • **kwargs: Additional keyword arguments passed to the Python CSV parser. Refer to the official documentation for a list of supported arguments.

Returns: A BytesIO object that can be passed to the methods resposible for uploading data.

#   def read_input_csv_string( content: str, /, *, has_header: bool = True, check_header: bool = True, **kwargs ) -> _io.BytesIO:
View Source
def read_input_csv_string(
        content: str,
        /, *,
        has_header: bool = True,
        check_header: bool = True,
        **kwargs
) -> io.BytesIO:
    """
    Read CSV from a string and turn it into a bytes array of the correct format so that it can be uploaded
    to the Decentriq platform.

    **Parameters**:
    - `content`: The string containing the CSV file.
    - `has_header`: Whether the string contains a header row.
    - `check_header`: Whether the function should try to determine whether the file has a header.
        If the file has a header row but you didn't set the `has_header` flag, an
        exception will be raised. If you're sure that the way you use the function
        is correct, you can disable this check using this parameter.
    - `encoding`: The encoding of the source file. If you wrote the CSV file using a library like pandas,
        you need to check the documentation to see what encoding they use by default
        when writing files.
    - `delimiter`: What delimiter is used in the the CSV file. Default is the comma.
    - `**kwargs`: Additional keyword arguments passed to the Python CSV parser. What flags can be passed can be
        seen [here](https://docs.python.org/3/library/csv.html#csv-fmt-params).

    **Returns**:
    A BytesIO object that can be passed to the methods resposible for uploading data.
    """
    return _read_input_csv(
        io.StringIO(content.strip()),
        has_header=has_header,
        check_header=check_header,
        **kwargs
    )

Read CSV from a string and turn it into a bytes array of the correct format so that it can be uploaded to the Decentriq platform.

Parameters:

  • content: The string containing the CSV file.
  • has_header: Whether the string contains a header row.
  • check_header: Whether the function should try to determine whether the file has a header. If the file has a header row but you didn't set the has_header flag, an exception will be raised. If you're sure that the way you use the function is correct, you can disable this check using this parameter.
  • encoding: The encoding of the source file. If you wrote the CSV file using a library like pandas, you need to check the documentation to see what encoding they use by default when writing files.
  • delimiter: What delimiter is used in the the CSV file. Default is the comma.
  • **kwargs: Additional keyword arguments passed to the Python CSV parser. What flags can be passed can be seen here.

Returns: A BytesIO object that can be passed to the methods resposible for uploading data.

#   def upload_and_publish_tabular_dataset( data: _io.BytesIO, key: decentriq_platform.storage.Key, data_room_id: str, *, table: str, session: decentriq_platform.session.Session, description: str = '', validate: bool = True, **kwargs ) -> str:
View Source
def upload_and_publish_tabular_dataset(
        data: io.BytesIO,
        key: Key,
        data_room_id: str,
        *,
        table: str,
        session: Session,
        description: str = "",
        validate: bool = True,
        **kwargs
) -> str:
    """
    Convenience function for uploading data and validating the schema of the uploaded
    data.

    Validation of tabular data is a separate computation for which specific compute nodes
    need to be present in the compute graph defined by the data room definition.
    This function will take care of triggering the validation action for you.
    In case validation fails, an exception will be raised. Validation can be turned off
    using the `validate` parameter.

    **Parameters**:
    - `data`: The input data to be uploaded. Use one of the reader functions provided in this
        package to read CSV-like data.
    - `key`: A key for encrypting the data to-be-uploaded.
    - `data_room_id`: To which data room the dataset should be published. This is the id you
        get when publishing a data room.
    - `table`: The name of the data node builder.
    - `session`: The session with which to communicate with the enclave.
    - `description`: An optional description of the dataset.
    - `validate`: Whether to perform the validation operation.

    **Returns**:
    The manifest hash (dataset id) in case the upload and validation succeeded.
    """
    manifest_hash = session.client.upload_dataset(
        data,
        key,
        table,
        description=description
    )
    session.publish_dataset(
        data_room_id, manifest_hash,
        leaf_id=_data_node_id(table),
        key=key
    )

    if validate:
        try:
            job_id = session.run_computation(
                data_room_id,
                _noop_node_id(table)
            )
            session.wait_until_computation_has_finished(job_id, **kwargs)
        except Exception as e:
            raise Exception("Validation of dataset failed! Reason: {}".format(e))

    return manifest_hash

Convenience function for uploading data and validating the schema of the uploaded data.

Validation of tabular data is a separate computation for which specific compute nodes need to be present in the compute graph defined by the data room definition. This function will take care of triggering the validation action for you. In case validation fails, an exception will be raised. Validation can be turned off using the validate parameter.

Parameters:

  • data: The input data to be uploaded. Use one of the reader functions provided in this package to read CSV-like data.
  • key: A key for encrypting the data to-be-uploaded.
  • data_room_id: To which data room the dataset should be published. This is the id you get when publishing a data room.
  • table: The name of the data node builder.
  • session: The session with which to communicate with the enclave.
  • description: An optional description of the dataset.
  • validate: Whether to perform the validation operation.

Returns: The manifest hash (dataset id) in case the upload and validation succeeded.

#   def read_sql_query_result_as_string(result: bytes, include_header: bool = True) -> str:
View Source
def read_sql_query_result_as_string(
        result: bytes,
        include_header: bool = True
) -> str:
    """
    Read the given raw CSV output from an SQL query result and transform it into a string
    containing the full CSV file including the header row.
    The resulting string can be written directly to a file from where it can be read
    using your data analysis library of choice.

    **Parameters**:
    - `result`: The result from which to extract a CSV file as a string.
    - `include_header`: Whether to include an additional header row that contains
        column names (provided for example by using alias expressions
        in SELECT statements). If this setting is true, but no column
        names could be found, the names "V1", "V2" and so will be used.
    """
    content, schema = _read_sql_query_result(result)

    if include_header:
        opt_columns = [col.name for col in schema.namedColumns]
        # Use V1, V2, ... for unknown column names => same format as R uses.
        replacement_columns = [f"V{ix}" for ix in range(1, len(opt_columns) + 1)]
        columns = [ c1 if c1 else c2 for c1, c2 in zip(opt_columns, replacement_columns)]
        returned_content = ','.join(columns) + '\n' + content
    else:
        returned_content = content

    # Remove trailing newlines
    return returned_content.strip()

Read the given raw CSV output from an SQL query result and transform it into a string containing the full CSV file including the header row. The resulting string can be written directly to a file from where it can be read using your data analysis library of choice.

Parameters:

  • result: The result from which to extract a CSV file as a string.
  • include_header: Whether to include an additional header row that contains column names (provided for example by using alias expressions in SELECT statements). If this setting is true, but no column names could be found, the names "V1", "V2" and so will be used.
#   class SqlSchemaVerifier(decentriq_platform.node.Node):
View Source
class SqlSchemaVerifier(Node):
    """
    Computation node to validate an input and provide the necessary types.
    """

    def __init__(
        self,
        name: str,
        input_data_node: str,
        columns, # type: List[Tuple[str, PrimitiveType.V, bool]]
    ) -> None:
        named_columns = map(lambda c: NamedColumn(
            name=c[0],
            columnType=ColumnType(
                primitiveType=c[1],
                nullable=c[2]
                )
            ),
            columns
        )
        sql_worker_configuration = SqlWorkerConfiguration(
            validation=ValidationConfiguration(
                tableSchema=TableSchema(namedColumns=named_columns)
            )
        )
        config = serialize_length_delimited(sql_worker_configuration)

        super().__init__(
            name,
            config=config,
            enclave_type="decentriq.sql-worker",
            dependencies=[input_data_node],
            output_format=ComputeNodeFormat.ZIP
        )

Computation node to validate an input and provide the necessary types.

#   SqlSchemaVerifier(name: str, input_data_node: str, columns)
View Source
    def __init__(
        self,
        name: str,
        input_data_node: str,
        columns, # type: List[Tuple[str, PrimitiveType.V, bool]]
    ) -> None:
        named_columns = map(lambda c: NamedColumn(
            name=c[0],
            columnType=ColumnType(
                primitiveType=c[1],
                nullable=c[2]
                )
            ),
            columns
        )
        sql_worker_configuration = SqlWorkerConfiguration(
            validation=ValidationConfiguration(
                tableSchema=TableSchema(namedColumns=named_columns)
            )
        )
        config = serialize_length_delimited(sql_worker_configuration)

        super().__init__(
            name,
            config=config,
            enclave_type="decentriq.sql-worker",
            dependencies=[input_data_node],
            output_format=ComputeNodeFormat.ZIP
        )