Python SDK

The Python package decentriq_platform provides all of the tools needed to interact with the Decentriq platform. Some extra functionality, such as the ability to define SQL computations to be run in an enclave, is provided via additional submodules, also called compute modules.

The package decentriq_platform can be used to

  • create DCRs (Data Clean Rooms)
  • encrypt and upload input data
  • trigger computation of authorized queries

The currently available compute modules are:

To learn more, click on these modules in the Available Modules section on the right.

Under API Documentation bellow you will also find a list of all the methods and classes provided as part of each module. Often used classes and methods are generally re-exported as part of the main namespace decentriq_platform. This way users of our SDK won't have to import functionality from a large set of submodules.

Note that some methods accept or return proto objects. These are objects used in the communication with our platform and due to their special structure are documented separately under the Proto section. Each module exposes a .proto submodule that contains all the proto objects required by that particular module.

The source code of the Decentriq Python SDK can be found on GitHub.


Installation

Requirements

Make sure you are running:

  • Python 3.9 or above
  • One of the following OS: macOS (≥ 10.9), Windows (any version supporting 64bit), Linux (any version supporting 64bit)

Decentriq suggests using the SDK from a clean Python environment.

Follow the instruction below to proceed with the environment creation and the installation:

  1. Open a terminal window.

  2. Create a new directory and navigate into it.

  3. Create a new virtual environment to install into. You can use, for instance, Python's own venv package:

    python -m venv env
    source env/bin/activate
    
  4. Make sure that you have the latest version of pip installed:

    pip install --upgrade pip
    
  5. Install the decentriq_platform package:

    pip install decentriq_platform
    
  6. Test that the installation was successful. The following shouldn't print an error:

    python -c "import decentriq_platform"
    

Available enclaves

The following enclave versions are currently available with the latest SDK version (v0.11.0):

  • decentriq.driver:v4: Latest version of the driver enclave, responsible for communicating with the SDK and orchestrating computation tasks.
  • decentriq.driver:v2: Previous version of the driver enclave (kept for backwards compatibility).
  • decentriq.sql-worker:v4: Latest version of the SQL worker capable of running SQL queries.
  • decentriq.sql-worker:v2: Previous stable version of the SQL worker enclave (kept for backwards compatibility).
  • decentriq.python-ml-worker:v2: An AWS Nitro-based enclave capable of running Python (version 3.9.12) code focused on machine learning. The following libraries are available within the enclave: numpy (1.21.5), scikit-learn (1.0.2), pandas (1.4.2), statsmodels (0.13.2), xgboost (1.5.2), auto-sklearn (0.14.7).
  • decentriq.python-synth-data-worker:v2: Another AWS Nitro-based enclave capable of producing synthetic data based on a source of potentially sensitive input data.
  • decentriq.r-latex-worker:v2: An AWS Nitro enclave for running code written in R. The enclave contains a wide array of R libraries (version 4.1.2), including several for the creation of LaTeX-based reports: data_table (1.14.2), knitr (1.36), rmarkdown (2.11), tinytex (0.34), Hmisc (4.6), pROC (1.18.0), ggplot2 (3.3.5), ggmosaic (0.3.3), dplyr (1.0.7), ggforce (0.3.3), kableExtra (1.3.4).

View Source
"""
.. include:: ../../decentriq_platform_docs/gcg_getting_started.md
---
"""
__docformat__ = "restructuredtext"

from .client import Client, create_client, Session
from .platform import ClientPlatformFeatures, SessionPlatformFeatures
from .builders import (
    DataRoomBuilder,
    DataRoomCommitBuilder,
    GovernanceProtocol
)
from .compute import Noop, StaticContent
from .permission import Permissions
from .storage import Key
from .attestation import enclave_specifications, EnclaveSpecifications


__all__ = [
    "create_client",
    "Client",
    "Session",
    "DataRoomBuilder",
    "DataRoomCommitBuilder",
    "Permissions",
    "GovernanceProtocol",
    "enclave_specifications",
    "EnclaveSpecifications",
    "Key",
    "StaticContent",
    "Noop",
    "sql",
    "container",
    "storage",
    "attestation",
    "types",
    "authentication",
    "platform",
    "session",
    "node",
]
#   def create_client( user_email: str, api_token: str, *, integrate_with_platform: bool, client_id: str = 'MHyVW112w7Ql95G96fn9rnLWkYuOLmdk', api_host: str = 'api.decentriq.com', api_port: int = 443, api_use_tls: bool = True, api_core_host: Optional[str] = None, api_core_port: Optional[int] = None, api_core_use_tls: Optional[bool] = None, api_platform_host: Optional[str] = None, api_platform_port: Optional[int] = None, api_platform_use_tls: Optional[bool] = None, request_timeout: Optional[int] = None ) -> decentriq_platform.client.Client:
View Source
def create_client(
        user_email: str,
        api_token: str,
        *,
        integrate_with_platform: bool,
        client_id: str = DECENTRIQ_CLIENT_ID,
        api_host: str = DECENTRIQ_HOST,
        api_port: int = DECENTRIQ_PORT,
        api_use_tls: bool = DECENTRIQ_USE_TLS,
        api_core_host: Optional[str] = DECENTRIQ_API_CORE_HOST,
        api_core_port: Optional[int] = DECENTRIQ_API_CORE_PORT,
        api_core_use_tls: Optional[bool] = DECENTRIQ_API_CORE_USE_TLS,
        api_platform_host: Optional[str] = DECENTRIQ_API_PLATFORM_HOST,
        api_platform_port: Optional[int] = DECENTRIQ_API_PLATFORM_PORT,
        api_platform_use_tls: Optional[bool] = DECENTRIQ_API_PLATFORM_USE_TLS,
        request_timeout: Optional[int] = None
) -> Client:
    """
    The primary way to create a `Client` object.

    Client objects created using this method can optionally be integrated with the
    Decentriq UI.
    This means that certain additional features, such as being able to retrieve the list
    of data rooms that you participate in, will be made available via the `Client.platform` field.
    This flag also needs to be set if you want to use the authentication objects that let
    Decentriq act as the root CA that controls access to data rooms.

    In order to provide these features, the client will communicate directly with an API
    that exists outside of the confidential computing environment. This setting will only
    affect how metadata about data rooms (such as their name and id) is stored and retrieved,
    it will not in any way compromise the security of how your data is uploaded to the
    enclaves or how computed results are retrieved.

    **Parameters**:
    - `api_token`: An API token with which to authenticate oneself.
        The API token can be obtained in the user
        account settings in the Decentriq UI.
    - `user_email`: The email address of the user that generated the given API token.
    - `integrate_with_platform`: Whether to configure the client to integrate itself with our web platform.
        When this setting is set to False, none of the features provided via the
        `platform` field will be available.
    """
    api = API(
        api_token,
        client_id,
        api_core_host if api_core_host is not None else api_host,
        api_core_port if api_core_port is not None else api_port,
        api_prefix="/api/core",
        use_tls=api_core_use_tls if api_core_use_tls is not None else api_use_tls,
        timeout=request_timeout
    )

    if integrate_with_platform:
        platform = ClientPlatformFeatures(
            api_token,
            user_email,
            http_api=api,
            client_id=client_id,
            api_host=api_platform_host if api_platform_host is not None else api_host,
            api_port=api_platform_port if api_platform_port is not None else api_port,
            api_use_tls=api_platform_use_tls if api_platform_use_tls is not None else api_use_tls
        )
    else:
        platform = None

    return Client(
        user_email, api, platform, request_timeout=request_timeout
    )

The primary way to create a Client object.

Client objects created using this method can optionally be integrated with the Decentriq UI. This means that certain additional features, such as being able to retrieve the list of data rooms that you participate in, will be made available via the Client.platform field. This flag also needs to be set if you want to use the authentication objects that let Decentriq act as the root CA that controls access to data rooms.

In order to provide these features, the client will communicate directly with an API that exists outside of the confidential computing environment. This setting will only affect how metadata about data rooms (such as their name and id) is stored and retrieved, it will not in any way compromise the security of how your data is uploaded to the enclaves or how computed results are retrieved.

Parameters:

  • api_token: An API token with which to authenticate oneself. The API token can be obtained in the user account settings in the Decentriq UI.
  • user_email: The email address of the user that generated the given API token.
  • integrate_with_platform: Whether to configure the client to integrate itself with our web platform. When this setting is set to False, none of the features provided via the platform field will be available.
#   class Client:
View Source
class Client:
    """
    A `Client` object allows you to upload datasets and to create `Session` objects that
    can communicate with enclaves and perform essential operations such as publishing
    data rooms and execute computations and retrieve results.

    Objects of this class can be used to create and run data rooms, as well as to securely
    upload data and retrieve computation results.

    Objects of this class should be created using the `create_client` function.
    """

    _api: API
    _platform: Optional[ClientPlatformFeatures]

    def __init__(
            self,
            user_email: str,
            api: API,
            platform: Optional[ClientPlatformFeatures] = None,
            request_timeout: int = None
    ):
        """
        Create a client instance.

        Rather than creating `Client` instances directly using this constructor, use the function
        `create_client`.
        """
        self.user_email = user_email
        self._api = api
        self._platform = platform
        self.request_timeout = request_timeout

    def check_enclave_availability(self, specs: Dict[str, EnclaveSpecification]):
        """
        Check whether the selected enclaves are deployed at this moment.
        If one of the enclaves is not deployed, an exception will be raised.
        """
        available_specs =\
            {spec["proto"].SerializeToString() for spec in self._get_enclave_specifications()}
        for s in specs:
            if s["proto"].SerializeToString() not in available_specs:
                raise Exception(
                    "No available enclave deployed for attestation spec '{name}' (version {version})".\
                        format(name=s["name"], version=s["version"])
                )

    def _get_enclave_specifications(self) -> List[EnclaveSpecification]:
        url = Endpoints.SYSTEM_ATTESTATION_SPECS
        response: EnclaveSpecificationResponse = self._api.get(url).json()
        enclave_specs = []

        for spec_json in response["attestationSpecs"]:
            attestation_specification = AttestationSpecification()
            spec_length_delimited = base64.b64decode(spec_json["spec"])
            parse_length_delimited(spec_length_delimited, attestation_specification)
            enclave_spec = EnclaveSpecification(
                name=spec_json["name"],
                version=spec_json["version"],
                proto=attestation_specification,
                decoder=None,
                workerProtocols=[0],
            )
            enclave_specs.append(enclave_spec)

        return enclave_specs

    def create_session(
            self,
            auth: Auth,
            enclaves: Dict[str, EnclaveSpecification],
    ) -> Session:
        """
        Creates a new `decentriq_platform.session.Session` instance to communicate
        with a driver enclave.
        The passed set of enclave specifications must include a specification for
        a driver enclave.

        Messages sent through this session will be authenticated
        with the given authentication object.
        """
        url = Endpoints.SESSIONS
        if "decentriq.driver" not in enclaves:
            raise Exception(
                "Unable to find a specification for the driver enclave" +
                f" named 'decentriq.driver', you can get these specifications" +
                " from the main package."
            )
        driver_spec = enclaves["decentriq.driver"]

        if "clientProtocols" not in driver_spec:
            raise Exception(
                "Missing client supported protocol versions"
            )
        attestation_proto = driver_spec["proto"]
        client_protocols = driver_spec["clientProtocols"]

        attestation_specification_hash =\
            hashlib.sha256(serialize_length_delimited(attestation_proto)).hexdigest()

        req_body = CreateSessionRequest(
            attestationSpecificationHash=attestation_specification_hash
        )
        response: SessionJsonResponse = self._api.post(
                url,
                json.dumps(req_body),
                {"Content-type": "application/json"}
        ).json()

        platform_api =\
            self.platform._platform_api if self.is_integrated_with_platform else None

        session = Session(
                self,
                response["sessionId"],
                attestation_proto,
                client_protocols,
                auth=auth,
                platform_api=platform_api,
        )

        return session

    def _create_scope(self, email: str, metadata: Dict[str, str]) -> str:
        url = Endpoints.USER_SCOPES_COLLECTION.replace(":userId", email)
        req_body = CreateScopeRequest(metadata=metadata)
        response: ScopeJson = self._api.post(
                url,
                json.dumps(req_body),
                {"Content-type": "application/json"}
        ).json()
        return response["scopeId"]

    def get_scope(self, email: str, scope_id: str) -> ScopeJson:
        url = Endpoints.USER_SCOPE \
                .replace(":userId", email) \
                .replace(":scopeId", scope_id)
        response: ScopeJson = self._api.get(url).json()
        return response

    def get_scope_by_metadata(self, email: str, metadata: Dict[str, str]) -> Optional[str]:
        url = Endpoints.USER_SCOPES_COLLECTION.replace(":userId", email)
        response: List[ScopeJson] = self._api.get(
                url,
                params={"metadata": json.dumps(metadata)}
            ).json()
        if len(response) == 0:
            return None
        else:
            scope = response[0]
            return scope["scopeId"]

    def _ensure_scope_with_metadata(self, email: str, metadata: Dict[str, str]) -> str:
        scope = self.get_scope_by_metadata(email, metadata)
        if scope is None:
            scope = self._create_scope(email, metadata)
        return scope

    def delete_scope(self, email: str, scope_id: str):
        url = Endpoints.USER_SCOPE \
            .replace(":userId", email) \
            .replace(":scopeId", scope_id)
        self._api.delete(url)

    def upload_dataset(
            self,
            data: BinaryIO,
            key: Key,
            file_name: str,
            /, *,
            description: str = "",
            chunk_size: int = 8 * 1024 ** 2,
            parallel_uploads: int = 8,
            owner_email: Optional[str] = None,
    ) -> str:
        """
        Uploads `data` as a file usable by enclaves and returns the
        corresponding manifest hash.

        **Parameters**:
        - `data`: The data to upload as a buffered stream.
            Such an object can be obtained by wrapping a binary string in a `io.BytesIO()`
            object or, if reading from a file, by using `with open(path, "rb") as file`.
        - `key`: Encryption key used to encrypt the file.
        - `file_name`: Name of the file.
        - `description`: An optional file description.
        - `chunk_size`: Size of the chunks into which the stream is split in bytes.
        - `parallel_uploads`: Whether to upload chunks in parallel.
        - `owner_email`: Owner of the file if different from the one already specified
            when creating the client object.
        """
        uploader = BoundedExecutor(
                bound=parallel_uploads * 2,
                max_workers=parallel_uploads
        )
        email = owner_email if owner_email else self.user_email
        # create and upload chunks
        chunker = Chunker(data, chunk_size=chunk_size)
        chunk_hashes: List[str] = []
        chunk_uploads_futures = []
        upload_description = self._create_upload(email)
        for chunk_hash, chunk_data in chunker:
            chunk_uploads_futures.append(
                uploader.submit(
                    self._encrypt_and_upload_chunk,
                    chunk_hash,
                    chunk_data,
                    key.material,
                    email,
                    upload_description["uploadId"]
                )
            )
            chunk_hashes.append(chunk_hash.hex())

        # check chunks uploads were successful
        completed, pending = futures.wait(
                chunk_uploads_futures,
                None,
                futures.FIRST_EXCEPTION
            )
        if len(pending):
            # re-raise exception
            for future in completed: future.result()
        uploader.shutdown(wait=False)

        # create manifest and upload
        manifest_hash, manifest_encrypted = create_encrypted_chunk(
                key.material,
                os.urandom(16),
                json.dumps(chunk_hashes).encode("utf-8")
        )
        scope_id = self._ensure_scope_with_metadata(email, {"type": ScopeTypes.USER_FILE, "manifest_hash": manifest_hash.hex() })
        self._finalize_upload(
            user_id=email,
            scope_id=scope_id,
            upload_id=upload_description["uploadId"],
            name=file_name,
            manifest_hash=manifest_hash,
            manifest_encrypted=manifest_encrypted,
            chunks=chunk_hashes
        )

        manifest_hash_hex = manifest_hash.hex()

        if self.is_integrated_with_platform:
            self.platform._platform_api.save_dataset_metadata(
                manifest_hash_hex,
                file_name=file_name,
                description=description,
                owner_email=self.user_email
            )

        return manifest_hash_hex

    def _encrypt_and_upload_chunk(
            self,
            chunk_hash: bytes,
            chunk_data: bytes,
            key: bytes,
            user_id: str,
            upload_id: str
    ):
        cipher = StorageCipher(key)
        chunk_data_encrypted = cipher.encrypt(chunk_data)
        self._upload_chunk(chunk_hash, chunk_data_encrypted, user_id, upload_id)

    def _create_upload(self, user_id: str) -> UploadDescription:
        url = Endpoints.USER_UPLOADS_COLLECTION.replace(":userId", user_id)
        response = self._api.post(url, {}, {"Content-type": "application/json"})
        upload_description: UploadDescription = response.json()
        return upload_description

    def _upload_chunk(
            self,
            chunk_hash: bytes,
            chunk_data_encrypted: bytes,
            user_id: str,
            upload_id: str
    ):
        url = Endpoints.USER_UPLOAD_CHUNKS \
            .replace(":userId", user_id) \
            .replace(":uploadId", upload_id)
        wrapped_chunk= ChunkWrapper(
            hash=chunk_hash.hex(),
            data=b64encode(chunk_data_encrypted).decode("ascii")
        )
        self._api.post(url, json.dumps(wrapped_chunk), {"Content-type": "application/json"})

    def _delete_user_upload(self, email: str, upload_id: str):
        url = Endpoints.USER_UPLOAD \
            .replace(":userId", email) \
            .replace(":uploadId", upload_id)
        self._api.delete(url)

    def _get_user_upload(self, email: str, upload_id: str) -> UploadDescription:
        url = Endpoints.USER_UPLOAD.replace(
                ":userId", email
            ).replace(":uploadId", upload_id)
        response = self._api.get(url)
        return response.json()

    def _get_user_uploads_collection(self, email: str) -> List[UploadDescription]:
        url = Endpoints.USER_UPLOADS_COLLECTION.replace(":userId", email)
        response = self._api.get(url)
        return response.json()

    def _finalize_upload(
            self,
            user_id: str,
            scope_id: str,
            upload_id: str,
            name: str,
            manifest_hash: bytes,
            manifest_encrypted: bytes,
            chunks: List[str]
    ) -> DatasetDescription:
        url = Endpoints.USER_FILES \
            .replace(":userId", user_id)
        payload = FinalizeUpload(
            uploadId=upload_id,
            manifest=b64encode(manifest_encrypted).decode("ascii"),
            manifestHash=manifest_hash.hex(),
            name=name,
            chunks=chunks,
            scopeId=scope_id
        )
        dataset_description: DatasetDescription = self._api.post(
            url,
            json.dumps(payload),
            {"Content-type": "application/json"}
        ).json()
        return dataset_description

    def get_dataset(
            self,
            manifest_hash: str
    ) -> Optional[DatasetDescription]:
        """
        Returns information about a user file given a dataset id.
        """
        url = Endpoints.USER_FILE \
            .replace(":userId", self.user_email) \
            .replace(":manifestHash", manifest_hash)
        try:
            response = self._api.get(url).json()
            return DatasetDescription(
                datasetId=response["manifestHash"],
                name=response["filename"],
                creationDate=response["creationDate"],
            )
        except ServerError:
            return None

    def get_all_datasets(self) -> List[DatasetDescription]:
        """
        Returns the list of files uploaded by the user.
        """
        url = Endpoints.USER_FILES \
            .replace(":userId", self.user_email)
        response = self._api.get(url)
        data = response.json()
        result = []
        for dataset in data:
            result.append(
                DatasetDescription(
                    datasetId=dataset["manifestHash"],
                    name=dataset["filename"],
                    creationDate=dataset["creationDate"],
                )
            )
        return result

    def delete_dataset(self, manifest_hash: str, force: bool = False):
        """
        Deletes the dataset with the given id from the Decentriq platform.

        In case the dataset is still published to one or more data rooms,
        an exception will be thrown and the dataset will need to be
        unpublished manually from the respective data rooms using
        `Session.remove_published_dataset`.
        This behavior can be circumvented by using the `force` flag.
        Note, however, that this might put some data rooms in a broken
        state as they might try to read data that does not exist anymore.
        """
        if self.is_integrated_with_platform:
            data_room_ids = self.platform._platform_api.\
                get_data_rooms_with_published_dataset(manifest_hash)
            if data_room_ids:
                list_of_ids = "\n".join([f"- {dcr_id}" for dcr_id in data_room_ids])
                if force:
                    print(
                        "This dataset is published to the following data rooms."
                        " These data rooms might be in a broken state now:"
                        f"\n{list_of_ids}"
                    )
                else:
                    raise Exception(
                        "This dataset is published to the following data rooms"
                        " and needs to be unpublished before it can be deleted!"
                        f"\n{list_of_ids}"
                    )

        url = Endpoints.USER_FILE \
            .replace(":userId", self.user_email) \
            .replace(":manifestHash", manifest_hash)
        self._api.delete(url)

        if self.is_integrated_with_platform:
            try:
                self.platform._platform_api.delete_dataset_metadata(manifest_hash)
            except Exception as e:
                print(f"Error when deleting dataset: {e}")

    @property
    def platform(self) -> ClientPlatformFeatures:
        """
        Provider of a list of convenience methods to interact with the Decentriq platform.

        This field exposes an object that provides a set of features known from the Decentriq
        web platform.
        """
        if self._platform:
            return self._platform
        else:
            raise Exception(
                "This field is not set as the client has not been configured with integration"
                " with the web platform."
            )

    @property
    def is_integrated_with_platform(self) -> bool:
        """Whether this client has been created with platform integration."""
        return self._platform is not None

A Client object allows you to upload datasets and to create Session objects that can communicate with enclaves and perform essential operations such as publishing data rooms and execute computations and retrieve results.

Objects of this class can be used to create and run data rooms, as well as to securely upload data and retrieve computation results.

Objects of this class should be created using the create_client function.

#   Client( user_email: str, api: decentriq_platform.api.API, platform: Optional[decentriq_platform.platform.ClientPlatformFeatures] = None, request_timeout: int = None )
View Source
    def __init__(
            self,
            user_email: str,
            api: API,
            platform: Optional[ClientPlatformFeatures] = None,
            request_timeout: int = None
    ):
        """
        Create a client instance.

        Rather than creating `Client` instances directly using this constructor, use the function
        `create_client`.
        """
        self.user_email = user_email
        self._api = api
        self._platform = platform
        self.request_timeout = request_timeout

Create a client instance.

Rather than creating Client instances directly using this constructor, use the function create_client.

#   def check_enclave_availability( self, specs: Dict[str, decentriq_platform.types.EnclaveSpecification] ):
View Source
    def check_enclave_availability(self, specs: Dict[str, EnclaveSpecification]):
        """
        Check whether the selected enclaves are deployed at this moment.
        If one of the enclaves is not deployed, an exception will be raised.
        """
        available_specs =\
            {spec["proto"].SerializeToString() for spec in self._get_enclave_specifications()}
        for s in specs:
            if s["proto"].SerializeToString() not in available_specs:
                raise Exception(
                    "No available enclave deployed for attestation spec '{name}' (version {version})".\
                        format(name=s["name"], version=s["version"])
                )

Check whether the selected enclaves are deployed at this moment. If one of the enclaves is not deployed, an exception will be raised.

View Source
    def create_session(
            self,
            auth: Auth,
            enclaves: Dict[str, EnclaveSpecification],
    ) -> Session:
        """
        Creates a new `decentriq_platform.session.Session` instance to communicate
        with a driver enclave.
        The passed set of enclave specifications must include a specification for
        a driver enclave.

        Messages sent through this session will be authenticated
        with the given authentication object.
        """
        url = Endpoints.SESSIONS
        if "decentriq.driver" not in enclaves:
            raise Exception(
                "Unable to find a specification for the driver enclave" +
                f" named 'decentriq.driver', you can get these specifications" +
                " from the main package."
            )
        driver_spec = enclaves["decentriq.driver"]

        if "clientProtocols" not in driver_spec:
            raise Exception(
                "Missing client supported protocol versions"
            )
        attestation_proto = driver_spec["proto"]
        client_protocols = driver_spec["clientProtocols"]

        attestation_specification_hash =\
            hashlib.sha256(serialize_length_delimited(attestation_proto)).hexdigest()

        req_body = CreateSessionRequest(
            attestationSpecificationHash=attestation_specification_hash
        )
        response: SessionJsonResponse = self._api.post(
                url,
                json.dumps(req_body),
                {"Content-type": "application/json"}
        ).json()

        platform_api =\
            self.platform._platform_api if self.is_integrated_with_platform else None

        session = Session(
                self,
                response["sessionId"],
                attestation_proto,
                client_protocols,
                auth=auth,
                platform_api=platform_api,
        )

        return session

Creates a new decentriq_platform.session.Session instance to communicate with a driver enclave. The passed set of enclave specifications must include a specification for a driver enclave.

Messages sent through this session will be authenticated with the given authentication object.

#   def get_scope( self, email: str, scope_id: str ) -> decentriq_platform.types.ScopeJson:
View Source
    def get_scope(self, email: str, scope_id: str) -> ScopeJson:
        url = Endpoints.USER_SCOPE \
                .replace(":userId", email) \
                .replace(":scopeId", scope_id)
        response: ScopeJson = self._api.get(url).json()
        return response
#   def get_scope_by_metadata(self, email: str, metadata: Dict[str, str]) -> Optional[str]:
View Source
    def get_scope_by_metadata(self, email: str, metadata: Dict[str, str]) -> Optional[str]:
        url = Endpoints.USER_SCOPES_COLLECTION.replace(":userId", email)
        response: List[ScopeJson] = self._api.get(
                url,
                params={"metadata": json.dumps(metadata)}
            ).json()
        if len(response) == 0:
            return None
        else:
            scope = response[0]
            return scope["scopeId"]
#   def delete_scope(self, email: str, scope_id: str):
View Source
    def delete_scope(self, email: str, scope_id: str):
        url = Endpoints.USER_SCOPE \
            .replace(":userId", email) \
            .replace(":scopeId", scope_id)
        self._api.delete(url)
#   def upload_dataset( self, data: <class 'BinaryIO'>, key: decentriq_platform.storage.Key, file_name: str, /, *, description: str = '', chunk_size: int = 8388608, parallel_uploads: int = 8, owner_email: Optional[str] = None ) -> str:
View Source
    def upload_dataset(
            self,
            data: BinaryIO,
            key: Key,
            file_name: str,
            /, *,
            description: str = "",
            chunk_size: int = 8 * 1024 ** 2,
            parallel_uploads: int = 8,
            owner_email: Optional[str] = None,
    ) -> str:
        """
        Uploads `data` as a file usable by enclaves and returns the
        corresponding manifest hash.

        **Parameters**:
        - `data`: The data to upload as a buffered stream.
            Such an object can be obtained by wrapping a binary string in a `io.BytesIO()`
            object or, if reading from a file, by using `with open(path, "rb") as file`.
        - `key`: Encryption key used to encrypt the file.
        - `file_name`: Name of the file.
        - `description`: An optional file description.
        - `chunk_size`: Size of the chunks into which the stream is split in bytes.
        - `parallel_uploads`: Whether to upload chunks in parallel.
        - `owner_email`: Owner of the file if different from the one already specified
            when creating the client object.
        """
        uploader = BoundedExecutor(
                bound=parallel_uploads * 2,
                max_workers=parallel_uploads
        )
        email = owner_email if owner_email else self.user_email
        # create and upload chunks
        chunker = Chunker(data, chunk_size=chunk_size)
        chunk_hashes: List[str] = []
        chunk_uploads_futures = []
        upload_description = self._create_upload(email)
        for chunk_hash, chunk_data in chunker:
            chunk_uploads_futures.append(
                uploader.submit(
                    self._encrypt_and_upload_chunk,
                    chunk_hash,
                    chunk_data,
                    key.material,
                    email,
                    upload_description["uploadId"]
                )
            )
            chunk_hashes.append(chunk_hash.hex())

        # check chunks uploads were successful
        completed, pending = futures.wait(
                chunk_uploads_futures,
                None,
                futures.FIRST_EXCEPTION
            )
        if len(pending):
            # re-raise exception
            for future in completed: future.result()
        uploader.shutdown(wait=False)

        # create manifest and upload
        manifest_hash, manifest_encrypted = create_encrypted_chunk(
                key.material,
                os.urandom(16),
                json.dumps(chunk_hashes).encode("utf-8")
        )
        scope_id = self._ensure_scope_with_metadata(email, {"type": ScopeTypes.USER_FILE, "manifest_hash": manifest_hash.hex() })
        self._finalize_upload(
            user_id=email,
            scope_id=scope_id,
            upload_id=upload_description["uploadId"],
            name=file_name,
            manifest_hash=manifest_hash,
            manifest_encrypted=manifest_encrypted,
            chunks=chunk_hashes
        )

        manifest_hash_hex = manifest_hash.hex()

        if self.is_integrated_with_platform:
            self.platform._platform_api.save_dataset_metadata(
                manifest_hash_hex,
                file_name=file_name,
                description=description,
                owner_email=self.user_email
            )

        return manifest_hash_hex

Uploads data as a file usable by enclaves and returns the corresponding manifest hash.

Parameters:

  • data: The data to upload as a buffered stream. Such an object can be obtained by wrapping a binary string in a io.BytesIO() object or, if reading from a file, by using with open(path, "rb") as file.
  • key: Encryption key used to encrypt the file.
  • file_name: Name of the file.
  • description: An optional file description.
  • chunk_size: Size of the chunks into which the stream is split in bytes.
  • parallel_uploads: Whether to upload chunks in parallel.
  • owner_email: Owner of the file if different from the one already specified when creating the client object.
#   def get_dataset( self, manifest_hash: str ) -> Optional[decentriq_platform.types.DatasetDescription]:
View Source
    def get_dataset(
            self,
            manifest_hash: str
    ) -> Optional[DatasetDescription]:
        """
        Returns information about a user file given a dataset id.
        """
        url = Endpoints.USER_FILE \
            .replace(":userId", self.user_email) \
            .replace(":manifestHash", manifest_hash)
        try:
            response = self._api.get(url).json()
            return DatasetDescription(
                datasetId=response["manifestHash"],
                name=response["filename"],
                creationDate=response["creationDate"],
            )
        except ServerError:
            return None

Returns information about a user file given a dataset id.

#   def get_all_datasets(self) -> List[decentriq_platform.types.DatasetDescription]:
View Source
    def get_all_datasets(self) -> List[DatasetDescription]:
        """
        Returns the list of files uploaded by the user.
        """
        url = Endpoints.USER_FILES \
            .replace(":userId", self.user_email)
        response = self._api.get(url)
        data = response.json()
        result = []
        for dataset in data:
            result.append(
                DatasetDescription(
                    datasetId=dataset["manifestHash"],
                    name=dataset["filename"],
                    creationDate=dataset["creationDate"],
                )
            )
        return result

Returns the list of files uploaded by the user.

#   def delete_dataset(self, manifest_hash: str, force: bool = False):
View Source
    def delete_dataset(self, manifest_hash: str, force: bool = False):
        """
        Deletes the dataset with the given id from the Decentriq platform.

        In case the dataset is still published to one or more data rooms,
        an exception will be thrown and the dataset will need to be
        unpublished manually from the respective data rooms using
        `Session.remove_published_dataset`.
        This behavior can be circumvented by using the `force` flag.
        Note, however, that this might put some data rooms in a broken
        state as they might try to read data that does not exist anymore.
        """
        if self.is_integrated_with_platform:
            data_room_ids = self.platform._platform_api.\
                get_data_rooms_with_published_dataset(manifest_hash)
            if data_room_ids:
                list_of_ids = "\n".join([f"- {dcr_id}" for dcr_id in data_room_ids])
                if force:
                    print(
                        "This dataset is published to the following data rooms."
                        " These data rooms might be in a broken state now:"
                        f"\n{list_of_ids}"
                    )
                else:
                    raise Exception(
                        "This dataset is published to the following data rooms"
                        " and needs to be unpublished before it can be deleted!"
                        f"\n{list_of_ids}"
                    )

        url = Endpoints.USER_FILE \
            .replace(":userId", self.user_email) \
            .replace(":manifestHash", manifest_hash)
        self._api.delete(url)

        if self.is_integrated_with_platform:
            try:
                self.platform._platform_api.delete_dataset_metadata(manifest_hash)
            except Exception as e:
                print(f"Error when deleting dataset: {e}")

Deletes the dataset with the given id from the Decentriq platform.

In case the dataset is still published to one or more data rooms, an exception will be thrown and the dataset will need to be unpublished manually from the respective data rooms using Session.remove_published_dataset. This behavior can be circumvented by using the force flag. Note, however, that this might put some data rooms in a broken state as they might try to read data that does not exist anymore.

Provider of a list of convenience methods to interact with the Decentriq platform.

This field exposes an object that provides a set of features known from the Decentriq web platform.

#   is_integrated_with_platform: bool

Whether this client has been created with platform integration.

#   class Session:
View Source
class Session():
    """
    Class for managing the communication with an enclave.
    """
    client: Client
    session_id: str
    enclave_identifier: str
    auth: Auth
    email: str
    keypair: Any
    fatquote: Fatquote
    quote: QuoteBody
    driver_attestation_specification: AttestationSpecification
    client_protocols: List[int]

    def __init__(
            self,
            client: Client,
            session_id: str,
            driver_attestation_specification: AttestationSpecification,
            client_protocols: List[int],
            auth: Auth,
            platform_api: Optional[PlatformApi] = None,
   ):
        """
        `Session` instances should not be instantiated directly but rather
         be created using a `Client` object using  `decentriq_platform.Client.create_session`.
        """
        url = Endpoints.SESSION_FATQUOTE.replace(":sessionId", session_id)
        response: FatquoteResBody = client._api.get(url).json()
        fatquote_bytes = b64decode(response["fatquoteBase64"])
        fatquote = Fatquote()
        fatquote.ParseFromString(fatquote_bytes)
        verification = Verification(attestation_specification=driver_attestation_specification)
        report_data = verification.verify(fatquote)
        self.client = client
        self.session_id = session_id
        self.auth = auth
        self.email = auth.user_id
        self.keypair = chily.Keypair.from_random()
        self.fatquote = fatquote
        self.report_data = report_data
        self.driver_attestation_specification = driver_attestation_specification
        self.client_protocols = client_protocols

        if platform_api:
            self._platform = SessionPlatformFeatures(self, platform_api)
        else:
            self._platform = None

    def _get_client_protocol(self, endpoint_protocols: List[int]) -> int:
        try:
            protocol = max(set(self.client_protocols) & set(endpoint_protocols))
            return protocol
        except ValueError:
            min_enclave_version = min(self.client_protocols)
            max_endpoint_version = min(endpoint_protocols)
            exception_message = "Endpoint is only available with protocol versions {} but the enclave only supports {}\n".format(
                endpoint_protocols,
                self.client_protocols
            )
            if min_enclave_version > max_endpoint_version:
                exception_message += "Try upgrading to a newer version of the SDK"
            else:
                exception_message += "Try using an older version of the SDK"
            raise Exception(exception_message)

    def _get_enclave_pubkey(self):
        pub_keyB = bytearray(self.report_data[:32])
        return chily.PublicKey.from_bytes(pub_keyB)

    def _encrypt_and_encode_data(self, data: bytes, auth: Auth) -> bytes:
        nonce = chily.Nonce.from_random()
        cipher = chily.Cipher(
            self.keypair.secret, self._get_enclave_pubkey()
        )
        enc_data = cipher.encrypt("client sent session data", data, nonce)
        public_keys = bytes(self.keypair.public_key.bytes) + bytes(self._get_enclave_pubkey().bytes)
        signature = auth._sign(public_keys)
        shared_key = bytes(self.keypair.secret.diffie_hellman(self._get_enclave_pubkey()).bytes)
        hkdf = HKDF(algorithm=hashes.SHA512(), length=64, info=b"IdP KDF Context", salt=b"")
        mac_key = hkdf.derive(shared_key)
        mac_tag = hmac.digest(mac_key, auth._get_user_id().encode(), "sha512")
        sigma_auth = Sigma(signature, mac_tag, auth)
        return datanoncepubkey_to_message(
            bytes(enc_data),
            bytes(nonce.bytes),
            bytes(self.keypair.public_key.bytes),
            sigma_auth
        )

    def _decode_and_decrypt_data(self, data: bytes) -> bytes:
        dec_data, nonceB, _ = message_to_datanoncepubkey(data)
        cipher = chily.Cipher(
            self.keypair.secret, self._get_enclave_pubkey()
        )
        return cipher.decrypt("client received session data", dec_data, chily.Nonce.from_bytes(nonceB))

    def send_request(
            self,
            request: GcgRequest,
            protocol: int,
    ) -> List[GcgResponse]:
        """
        Low-level method for sending a raw `GcgRequest` to the enclave.
        Use this method if any of the convenience methods (such as `run_computation`) don't perform
        the exact task you want.
        """
        gcg_protocol = serialize_length_delimited(
            ComputeNodeProtocol(
                version=protocol
            )
        )
        serialized_request = serialize_length_delimited(
            Request(
                deltaRequest= self._encrypt_and_encode_data(
                    gcg_protocol + serialize_length_delimited(request),
                    self.auth
                )
            )
        )
        url = Endpoints.SESSION_MESSAGES.replace(":sessionId", self.session_id)
        enclave_request = EnclaveMessage(data=b64encode(serialized_request).decode("ascii"))
        enclave_response: bytes = self.client._api.post(
            url, json.dumps(enclave_request), {"Content-type": "application/json", "Accept-Version": "2"}
        ).content

        responses: List[GcgResponse] = []
        offset = 0
        while offset < len(enclave_response):
            response_container = Response()
            offset += parse_length_delimited(enclave_response[offset:], response_container)
            if response_container.HasField("unsuccessfulResponse"):
                raise Exception(response_container.unsuccessfulResponse)
            else:
                response = GcgResponse()
                decrypted_response = self._decode_and_decrypt_data(
                    response_container.successfulResponse
                )
                response_protocol = ComputeNodeProtocol()
                response_offset = parse_length_delimited(
                        decrypted_response, response_protocol
                )
                if response_protocol.version != protocol:
                    raise Exception("Different response protocol version than requested")
                parse_length_delimited(decrypted_response[response_offset:], response)
                if response.HasField("failure"):
                    raise Exception(response.failure)
                responses.append(response)
        return responses

    def _publish_data_room(
            self,
            data_room_definition: Tuple[DataRoom, List[ConfigurationModification]],
    ) -> CreateDataRoomResponse:
        endpoint_protocols = [1]
        protocol = self._get_client_protocol(endpoint_protocols)
        data_room, conf_modifications = data_room_definition
        if not data_room.ownerEmail:
            data_room.ownerEmail = self.email
        request = CreateDataRoomRequest(
            dataRoom=data_room,
            initialConfiguration=conf_modifications,
        )
        responses = self.send_request(GcgRequest(createDataRoomRequest=request), protocol)
        if len(responses) != 1:
            raise Exception("Malformed response")

        response = responses[0]

        if response.HasField("createDataRoomResponse"):
            if response.createDataRoomResponse.HasField("dataRoomValidationError"):
                raise Exception(
                    "Error when validating data room: {} (compute node id '{}')".format(
                        response.createDataRoomResponse.dataRoomValidationError.message,
                        response.createDataRoomResponse.dataRoomValidationError.computeNodeId
                    )
                )
            else:
                if self.is_integrated_with_platform:
                    data_room_hash = response.createDataRoomResponse.dataRoomId
                    self._create_data_room_in_platform(data_room_definition, data_room_hash)
        else:
            raise Exception(
                "Expected createDataRoomResponse, got "
                + str(response.WhichOneof("gcg_response"))
            )

        return response.createDataRoomResponse

    def publish_data_room(
            self,
            data_room_definition: Tuple[DataRoom, List[ConfigurationModification]]
    ) -> str:
        """
        Create a data room with the provided protobuf configuration object
        and have the enclave apply the given list of modifications to the data
        room configuration.

        The id returned from this method will be used when interacting with the
        published data room (for example when running computations or publishing
        datasets).
        """
        response = self._publish_data_room(data_room_definition)
        return _get_data_room_id(response).hex()

    def publish_data_room_configuration_commit(
            self,
            configuration_commit: ConfigurationCommit
    ) -> str:
        """
        Publish the given data room configuration commit.

        Configuration commits can be built using a `DataRoomCommitBuilder` object.

        The id returned from this method will be used when running development
        computations or when trying to merge this commit into the main
        data room configuration.
        """
        endpoint_protocols = [1]
        protocol = self._get_client_protocol(endpoint_protocols)
        scope_id = self.client._ensure_scope_with_metadata(
            self.email,
            {
                "type": ScopeTypes.DATA_ROOM_INTERMEDIATE_DATA,
                "data_room_id": configuration_commit.dataRoomId.hex(),
            }
        )
        request = CreateConfigurationCommitRequest(
            commit=configuration_commit,
            scope=bytes.fromhex(scope_id)
        )
        responses = self.send_request(
            GcgRequest(createConfigurationCommitRequest=request),
            protocol
        )
        if len(responses) != 1:
            raise Exception("Malformed response")

        response = responses[0]
        if not response.HasField("createConfigurationCommitResponse"):
            raise Exception(
                "Expected createConfigurationCommitResponse, got "
                + str(response.WhichOneof("gcg_response"))
            )

        return response.createConfigurationCommitResponse.commitId.hex()

    def retrieve_configuration_commit(
        self,
        configuration_commit_id: str,
    ) -> ConfigurationCommit:
        """
        Retrieve the content of given configuration commit id.

        **Returns**:
        A `ConfigurationCommit`.
        """
        endpoint_protocols = [1]
        protocol = self._get_client_protocol(endpoint_protocols)
        scope_id = self.client._ensure_scope_with_metadata(
            self.email,
            {
                "type": ScopeTypes.DATA_ROOM_COMMITS_DATA,
            }
        )
        request = RetrieveConfigurationCommitRequest(
            commitId=bytes.fromhex(configuration_commit_id),
            scope=bytes.fromhex(scope_id),
        )
        responses = self.send_request(
            GcgRequest(retrieveConfigurationCommitRequest=request),
            protocol
        )
        if len(responses) != 1:
            raise Exception("Malformed response")

        response = responses[0]
        if not response.HasField("retrieveConfigurationCommitResponse"):
            raise Exception(
                "Expected retrieveConfigurationCommitResponse, got "
                + str(response.WhichOneof("gcg_response"))
            )

        return response.retrieveConfigurationCommitResponse.commit

    def retrieve_configuration_commit_approvers(
        self,
        configuration_commit_id: str,
    ) -> List[str]:
        """
        Retrieve the list of users who need to approve the merger of a given
        configuration commit.

        **Returns**:
        A list of ids belonging to the users that need to approve the
        configuration commit.
        """
        endpoint_protocols = [1]
        protocol = self._get_client_protocol(endpoint_protocols)
        scope_id = self.client._ensure_scope_with_metadata(
            self.email,
            {
                "type": ScopeTypes.DATA_ROOM_COMMITS_DATA,
            }
        )
        request = RetrieveConfigurationCommitApproversRequest(
            commitId=bytes.fromhex(configuration_commit_id),
            scope=bytes.fromhex(scope_id),
        )
        responses = self.send_request(
            GcgRequest(retrieveConfigurationCommitApproversRequest=request),
            protocol
        )
        if len(responses) != 1:
            raise Exception("Malformed response")

        response = responses[0]
        if not response.HasField("retrieveConfigurationCommitApproversResponse"):
            raise Exception(
                "Expected retrieveConfigurationCommitApproversResponse, got "
                + str(response.WhichOneof("gcg_response"))
            )

        return response.retrieveConfigurationCommitApproversResponse.approvers


    def generate_merge_approval_signature(
            self,
            configuration_commit_id: str
    ) -> bytes:
        """
        Generate an approval signature required for merging a configuration
        commit.

        To merge a specific configuration commit, each user referenced in the list
        of ids returned by `retrieveConfigurationCommitApprovers` needs to
        generate an approval signature using this method.
        """
        endpoint_protocols = [1]
        protocol = self._get_client_protocol(endpoint_protocols)
        scope_id = self.client._ensure_scope_with_metadata(
            self.email,
            {
                "type": ScopeTypes.DATA_ROOM_COMMITS_DATA,
            }
        )
        request = GenerateMergeApprovalSignatureRequest(
            commitId=bytes.fromhex(configuration_commit_id),
            scope=bytes.fromhex(scope_id),
        )
        responses = self.send_request(
            GcgRequest(generateMergeApprovalSignatureRequest=request),
            protocol
        )
        if len(responses) != 1:
            raise Exception("Malformed response")

        response = responses[0]
        if not response.HasField("generateMergeApprovalSignatureResponse"):
            raise Exception(
                "Expected generateMergeApprovalSignatureResponse, got "
                + str(response.WhichOneof("gcg_response"))
            )

        return response.generateMergeApprovalSignatureResponse.signature

    def merge_configuration_commit(
        self,
        configuration_commit_id: str,
        approval_signatures: Dict[str, bytes],
    ) -> MergeConfigurationCommitResponse:
        """
        Request the enclave to merge the given configuration commit into the
        main data room configuration.

        **Parameters**:
        - `configuration_commit_id`: The id of the commit to be merged.
        - `approval_signatures`: A dictionary containing the approval signature for
            each of the required approvers, e.g. `{ "some@email.com": signature }`.
        """
        endpoint_protocols = [1]
        protocol = self._get_client_protocol(endpoint_protocols)
        scope_id = self.client._ensure_scope_with_metadata(
            self.email,
            {
                "type": ScopeTypes.DATA_ROOM_COMMITS_DATA,
            }
        )
        request = MergeConfigurationCommitRequest(
            scope=bytes.fromhex(scope_id),
            commitId=bytes.fromhex(configuration_commit_id),
            approvalSignatures=approval_signatures,
        )
        responses = self.send_request(
            GcgRequest(mergeConfigurationCommitRequest=request),
            protocol
        )
        if len(responses) != 1:
            raise Exception("Malformed response")

        response = responses[0]
        if not response.HasField("mergeConfigurationCommitResponse"):
            raise Exception(
                "Expected mergeConfigurationCommitResponse, got "
                + str(response.WhichOneof("gcg_response"))
            )

        return response.mergeConfigurationCommitResponse

    def retrieve_data_room_configuration_history(
        self,
        data_room_id: str
    ) -> RetrieveDataRoomConfigurationHistoryResponse:
        """
        Retrieve the current merged data room configuration, as well as the
        history of configuration commits that have already been merged.
        """
        endpoint_protocols = [1]
        protocol = self._get_client_protocol(endpoint_protocols)
        request = RetrieveDataRoomConfigurationHistoryRequest(
            dataRoomId=bytes.fromhex(data_room_id),
        )
        responses = self.send_request(
            GcgRequest(retrieveDataRoomConfigurationHistoryRequest=request),
            protocol
        )
        if len(responses) != 1:
            raise Exception("Malformed response")
        response = responses[0]
        if not response.HasField("retrieveDataRoomConfigurationHistoryResponse"):
            raise Exception(
                "Expected retrieveDataRoomConfigurationHistoryResponse, got "
                + str(response.WhichOneof("gcg_response"))
            )

        return response.retrieveDataRoomConfigurationHistoryResponse

    def retrieve_current_data_room_configuration(
            self,
            data_room_id: str
    ) -> Tuple[DataRoomConfiguration, str]:
        """
        Retrieve the current data room confguration, as well as the current "history pin".

        A history pin is the hash of all the ids of configuration commits that
        make up the structure of a data room. This pin therefore uniquely identifies
        a data room's structure at a certain point in time.
        A data room configuration, as well as its associated history pin, can be used
        to extend an existing data room (for example by adding new compute nodes).
        Extending an existing data room is done using the `DataRoomCommitBuilder` class.
        """
        response = self.retrieve_data_room_configuration_history(data_room_id)
        return (response.currentConfiguration, response.pin.hex())

    def stop_data_room(self, data_room_id: str):
        """
        Stop the data room with the given id, making it impossible to run new
        computations.
        """
        self._update_data_room_status(data_room_id, DataRoomStatus.Value("Stopped"))

    def _update_data_room_status(
            self,
            data_room_id: str,
            status # type: DataRoomStatus.V
    ) -> UpdateDataRoomStatusResponse:
        """
        Update the status of the data room.

        For the special case of stopping a data room, the method
        `stop_data_room` can be used.
        """
        endpoint_protocols = [0, 1]
        protocol = self._get_client_protocol(endpoint_protocols)
        scope_id = self.client._ensure_scope_with_metadata(
            self.email,
            {
                "type": ScopeTypes.DATA_ROOM_INTERMEDIATE_DATA,
                "data_room_id": data_room_id,
            }
        )
        request = UpdateDataRoomStatusRequest(
            scope=bytes.fromhex(scope_id),
            dataRoomId=bytes.fromhex(data_room_id),
            status=status,
        )
        responses = self.send_request(
            GcgRequest(updateDataRoomStatusRequest=request),
            protocol
        )

        if len(responses) != 1:
            raise Exception("Malformed response")

        response = responses[0]
        if response.HasField("updateDataRoomStatusResponse"):
            if self.is_integrated_with_platform:
                self.platform._platform_api.update_data_room_status(data_room_id, status)
        else:
            raise Exception(
                "Expected updateDataRoomStatusResponse, got "
                + str(response.WhichOneof("gcg_response"))
            )

        return response.updateDataRoomStatusResponse

    def retrieve_data_room_status(
            self,
            data_room_id: str
    ) -> str:
        """
        Returns the status of the data room. Valid values are `"Active"` or `"Stopped"`.
        """
        endpoint_protocols = [0, 1]
        protocol = self._get_client_protocol(endpoint_protocols)
        scope_id = self.client._ensure_scope_with_metadata(
            self.email,
            {
                "type": ScopeTypes.DATA_ROOM_INTERMEDIATE_DATA,
                "data_room_id": data_room_id,
            }
        )
        request = RetrieveDataRoomStatusRequest(
            scope=bytes.fromhex(scope_id),
            dataRoomId=bytes.fromhex(data_room_id),
        )
        responses = self.send_request(
            GcgRequest(retrieveDataRoomStatusRequest=request),
            protocol
        )
        if len(responses) != 1:
            raise Exception("Malformed response")
        response = responses[0]
        if not response.HasField("retrieveDataRoomStatusResponse"):
            raise Exception(
                "Expected retrieveDataRoomStatusResponse, got "
                + str(response.WhichOneof("gcg_response"))
            )

        return DataRoomStatus.Name(response.retrieveDataRoomStatusResponse.status)

    def retrieve_data_room_definition(
            self,
            data_room_id: str
    ) -> RetrieveDataRoomResponse:
        """
        Returns the underlying protobuf object for the data room.
        """
        endpoint_protocols = [1]
        protocol = self._get_client_protocol(endpoint_protocols)
        request = RetrieveDataRoomRequest(
            dataRoomId=bytes.fromhex(data_room_id),
        )
        responses = self.send_request(
            GcgRequest(retrieveDataRoomRequest=request),
            protocol
        )
        if len(responses) != 1:
            raise Exception("Malformed response")
        response = responses[0]
        if not response.HasField("retrieveDataRoomResponse"):
            raise Exception(
                "Expected retrieveDataRoomResponse, got "
                + str(response.WhichOneof("gcg_response"))
            )
        return response.retrieveDataRoomResponse

    def retrieve_audit_log(
            self,
            data_room_id: str
    ) -> RetrieveAuditLogResponse:
        """
        Returns the audit log for the data room.
        """
        endpoint_protocols = [0, 1]
        protocol = self._get_client_protocol(endpoint_protocols)
        scope_id = self.client._ensure_scope_with_metadata(
            self.email,
            {
                "type": ScopeTypes.DATA_ROOM_INTERMEDIATE_DATA,
                "data_room_id": data_room_id,
            }
        )
        request = RetrieveAuditLogRequest(
            scope=bytes.fromhex(scope_id),
            dataRoomId=bytes.fromhex(data_room_id),
        )
        responses = self.send_request(
            GcgRequest(retrieveAuditLogRequest=request),
            protocol
        )
        if len(responses) != 1:
            raise Exception("Malformed response")
        response = responses[0]
        if not response.HasField("retrieveAuditLogResponse"):
            raise Exception(
                "Expected retrieveAuditLogResponse, got "
                + str(response.WhichOneof("gcg_response"))
            )
        return response.retrieveAuditLogResponse

    def publish_dataset(
            self,
            data_room_id: str,
            manifest_hash: str,
            leaf_id: str,
            key: Key,
            *,
            force: bool = False
    ) -> PublishDatasetToDataRoomResponse:
        """
        Publishes a file and its encryption key to a data room.
        Neither the file or the encryption key will ever be stored in
        unencrypted form.

        This method will check whether the to-be-published file exists.
        If this is not the case, an exception will be raised.
        This behavior can be disabled by setting the `force` flag.

        In case the original client was created with platform integration
        enabled, the method will further check whether there already
        is a dataset published for the given data room.
        In this case, an exception will be thrown and the dataset
        will need to be unpublished first.
        """

        endpoint_protocols = [0, 1]
        protocol = self._get_client_protocol(endpoint_protocols)
        dataset = self.client.get_dataset(manifest_hash)
        if not dataset and not force:
            raise Exception(
                "The dataset you are trying to publish does not exist"
            )

        should_create_dataset_links =\
            self.is_integrated_with_platform and \
            self._is_web_data_room(data_room_id)

        if should_create_dataset_links:
            existing_link = self.platform._platform_api.get_dataset_link(
                data_room_id,
                self._convert_data_node_name_to_verifier_node(leaf_id)
            )
            if existing_link:
                existing_dataset = existing_link["dataset"]["datasetId"]
                raise Exception(
                    "The following dataset has already been published for this node." +
                    " Please unpublish this dataset first." +
                    f" Dataset: '{existing_dataset}'"
                )

        scope_id = self.client._ensure_scope_with_metadata(
            self.email,
            {
                "type": ScopeTypes.DATA_ROOM_INTERMEDIATE_DATA,
                "data_room_id": data_room_id,
            }
        )
        request = PublishDatasetToDataRoomRequest(
            dataRoomId=bytes.fromhex(data_room_id),
            datasetHash=bytes.fromhex(manifest_hash),
            leafName=leaf_id,
            encryptionKey=key.material,
            scope=bytes.fromhex(scope_id)
        )
        responses = self.send_request(
            GcgRequest(publishDatasetToDataRoomRequest=request),
            protocol
        )
        if len(responses) != 1:
            raise Exception("Malformed response")
        response = responses[0]
        if not response.HasField("publishDatasetToDataRoomResponse"):
            raise Exception(
                "Expected publishDatasetToDataRoomResponse, got "
                + str(response.WhichOneof("gcg_response"))
            )
        else:
            if should_create_dataset_links:
                self.platform._platform_api.create_dataset_link(
                    data_room_id,
                    manifest_hash,
                    self._convert_data_node_name_to_verifier_node(leaf_id)
                )

        return response.publishDatasetToDataRoomResponse

    def _convert_data_node_name_to_verifier_node(self, node_name: str) -> str:
        pattern = re.compile(r"@table/(.*)/dataset")
        match = pattern.match(node_name)
        if match:
            return match.group(1)
        else:
            return node_name

    def remove_published_dataset(
            self,
            data_room_id: str,
            leaf_id: str,
    ) -> RemovePublishedDatasetResponse:
        """
        Removes a published dataset from the data room.
        """
        endpoint_protocols = [0, 1]
        protocol = self._get_client_protocol(endpoint_protocols)
        scope_id = self.client._ensure_scope_with_metadata(
            self.email,
            {
                "type": ScopeTypes.DATA_ROOM_INTERMEDIATE_DATA,
                "data_room_id": data_room_id,
            }
        )
        request = RemovePublishedDatasetRequest(
            scope=bytes.fromhex(scope_id),
            dataRoomId=bytes.fromhex(data_room_id),
            leafName=leaf_id,
        )
        responses = self.send_request(
            GcgRequest(removePublishedDatasetRequest=request),
            protocol
        )
        if len(responses) != 1:
            raise Exception("Malformed response")
        response = responses[0]
        if not response.HasField("removePublishedDatasetResponse"):
            raise Exception(
                "Expected removePublishedDatasetResponse, got "
                + str(response.WhichOneof("gcg_response"))
            )
        else:
            if self.is_integrated_with_platform and self._is_web_data_room(data_room_id):
                try:
                    self.platform._platform_api.delete_dataset_link(
                        data_room_id,
                        self._convert_data_node_name_to_verifier_node(leaf_id)
                    )
                except Exception as error:
                    print(f"Error when deleting dataset link on platform: {error}")

        return response.removePublishedDatasetResponse

    def retrieve_published_datasets(
            self,
            data_room_id: str,
    ) -> RetrievePublishedDatasetsResponse:
        """
        Returns the datasets published to the given data room.
        """
        endpoint_protocols = [0, 1]
        protocol = self._get_client_protocol(endpoint_protocols)
        scope_id = self.client._ensure_scope_with_metadata(
            self.email,
            {
                "type": ScopeTypes.DATA_ROOM_INTERMEDIATE_DATA,
                "data_room_id": data_room_id,
            }
        )
        request = RetrievePublishedDatasetsRequest(
            scope=bytes.fromhex(scope_id),
            dataRoomId=bytes.fromhex(data_room_id),
        )
        responses = self.send_request(
            GcgRequest(retrievePublishedDatasetsRequest=request),
            protocol
        )
        if len(responses) != 1:
            raise Exception("Malformed response")
        response = responses[0]
        if not response.HasField("retrievePublishedDatasetsResponse"):
            raise Exception(
                "Expected retrievePublishedDatasetResponse, got "
                + str(response.WhichOneof("gcg_response"))
            )
        return response.retrievePublishedDatasetsResponse

    def _submit_dev_compute(
            self,
            configuration_commit_id: str,
            compute_node_ids: List[str],
            /, *,
            dry_run: bool = False,
    ) -> ExecuteComputeResponse:
        """
        Submits a computation request which will generate an execution plan to
        perform the computation of the goal nodes
        """
        endpoint_protocols = [1]
        protocol = self._get_client_protocol(endpoint_protocols)
        scope_id = self.client._ensure_scope_with_metadata(
            self.email,
            {
                "type": ScopeTypes.DATA_ROOM_COMMITS_DATA,
            }
        )
        request = ExecuteDevelopmentComputeRequest(
            configurationCommitId=bytes.fromhex(configuration_commit_id),
            computeNodeNames=compute_node_ids,
            isDryRun=dry_run,
            scope=bytes.fromhex(scope_id)
        )
        responses = self.send_request(
            GcgRequest(executeDevelopmentComputeRequest=request),
            protocol
        )
        if len(responses) != 1:
            raise Exception("Malformed response")
        response = responses[0]
        if not response.HasField("executeComputeResponse"):
            raise Exception(
                "Expected executeComputeResponse, got "
                + str(response.WhichOneof("gcg_response"))
            )
        return response.executeComputeResponse

    def _submit_compute(
            self,
            data_room_id: str,
            compute_node_ids: List[str],
            /, *,
            dry_run: bool = False,
    ) -> ExecuteComputeResponse:
        """
        Submits a computation request which will generate an execution plan to
        perform the computation of the goal nodes
        """
        endpoint_protocols = [0, 1]
        protocol = self._get_client_protocol(endpoint_protocols)
        scope_id = self.client._ensure_scope_with_metadata(
            self.email,
            {
                "type": ScopeTypes.DATA_ROOM_INTERMEDIATE_DATA,
                "data_room_id": data_room_id,
            }
        )
        request = ExecuteComputeRequest(
            dataRoomId=bytes.fromhex(data_room_id),
            computeNodeNames=compute_node_ids,
            isDryRun=dry_run,
            scope=bytes.fromhex(scope_id)
        )
        responses = self.send_request(
            GcgRequest(executeComputeRequest=request),
            protocol
        )
        if len(responses) != 1:
            raise Exception("Malformed response")
        response = responses[0]
        if not response.HasField("executeComputeResponse"):
            raise Exception(
                "Expected executeComputeResponse, got "
                + str(response.WhichOneof("gcg_response"))
            )
        return response.executeComputeResponse

    def get_computation_status(self, job_id: str) -> JobStatusResponse:
        """
        Returns the status of the provided `job_id` which will include the names
        of the nodes that completed their execution
        """
        endpoint_protocols = [0, 1]
        protocol = self._get_client_protocol(endpoint_protocols)
        request = JobStatusRequest(
            jobId=bytes.fromhex(job_id),
        )
        responses = self.send_request(
            GcgRequest(jobStatusRequest=request),
            protocol
        )
        if len(responses) != 1:
            raise Exception("Malformed response")
        response = responses[0]
        if not response.HasField("jobStatusResponse"):
            raise Exception(
                "Expected jobStatusResponse, got "
                + str(response.WhichOneof("gcg_response"))
            )
        return response.jobStatusResponse

    def _stream_job_results(
            self,
            job_id: bytes,
            compute_node_id: str,
    ) -> Iterator[GetResultsResponseChunk]:
        """
        Streams the results of the provided `job_id`
        """
        endpoint_protocols = [0, 1]
        protocol = self._get_client_protocol(endpoint_protocols)
        request = GetResultsRequest(
            jobId=job_id,
            computeNodeName=compute_node_id,
        )
        responses = self.send_request(
            GcgRequest(getResultsRequest=request),
            protocol
        )
        for response in responses:
            if response.HasField("getResultsResponseChunk"):
                yield response.getResultsResponseChunk
            elif response.HasField("getResultsResponseFooter"):
                return
            else:
                raise Exception(
                    "Expected getResultsResponseChunk or getResultsResponseFooter, got "
                    + str(response.WhichOneof("gcg_response"))
                )
        raise Exception("Enclave connection aborted while streaming results")

    def _get_job_results(
            self,
            job_id: bytes,
            compute_node_id: str,
    ) -> bytes:
        """
        Returns the results of the provided `job_id`
        """
        return b"".join(list(map(lambda chunk: chunk.data, self._stream_job_results(job_id, compute_node_id))))

    def run_computation(
            self,
            data_room_id: str,
            compute_node_id: str,
            /, *,
            dry_run: bool = False,
    ) -> JobId:
        """
        Run a specific computation within the data room with the given id.

        The result will be an identifier object of the job executing the computation.
        This object is required for checking a job's status and retrieving its results.
        """
        response = self._submit_compute(data_room_id, [compute_node_id], dry_run=dry_run)
        return JobId(response.jobId.hex(), compute_node_id)

    def wait_until_computation_has_finished(
        self,
        job_id: JobId,
        /, *,
        interval: int = 5,
        timeout: int = None
    ):
        """
        Wait for the given job to complete.

        The method will check for the job's completeness every `interval` seconds and up to
        an optional `timeout` seconds after which the method will raise an exception.
        """
        elapsed = 0
        while True:
            if timeout is not None and elapsed > timeout:
                raise Exception(
                    f"Timeout when trying to get result for job {job_id.id} of"
                    f" {job_id.compute_node_id} (waited {timeout} seconds)"
                )
            elif job_id.compute_node_id in self.get_computation_status(job_id.id).completeComputeNodeNames:
                break
            else:
                sleep(interval)
                elapsed += interval

    def run_dev_computation(
            self,
            configuration_commit_id: str,
            compute_node_id: str,
            /, *,
            dry_run: bool = False,
    ) -> JobId:
        """
        Run a specific computation within the context of the data room configuration
        defined by the given commit id.
        Such "development" computations can also be run for configuration commits
        that have not yet been merged.

        The result will be an identifier object of the job executing the computation.
        This object is required for checking a job's status and retrieving its results.
        """
        response = self._submit_dev_compute(configuration_commit_id, [compute_node_id], dry_run=dry_run)
        return JobId(response.jobId.hex(), compute_node_id)

    def get_computation_result(
            self,
            job_id: JobId,
            /, *,
            interval: int = 5,
            timeout: int = None
    ) -> bytes:
        """
        Wait for the given job to complete and retrieve its results as a raw byte string.

        The method will check for the job's completeness every `interval` seconds and up to
        an optional `timeout` seconds after which the method will raise an exception.
        If the job completes and the results can be retrieved successfully, a raw byte string
        will be returned. The bytes tring can be transformed into a more useful object using
        a variety of helper methods. These helper methods are specific for the type of computation
        you ran and can be found in the corresponding packages.
        """
        elapsed = 0
        job_id_bytes = bytes.fromhex(job_id.id)
        self.wait_until_computation_has_finished(
            job_id,
            interval=interval,
            timeout=timeout
        )
        results = self._get_job_results(job_id_bytes, job_id.compute_node_id)
        return results

    def run_computation_and_get_results(
            self,
            data_room_id: str,
            compute_node_id: str,
            /, *,
            interval: int = 5,
            timeout: int = None
    ) -> Optional[bytes]:
        """
        Run a specific computation and return its results.

        This method is simply a wrapper for running `run_computation` and `get_computation_result`
        directly after each other
        """
        job_id = self.run_computation(data_room_id, compute_node_id)
        return self.get_computation_result(
            job_id,
            interval=interval,
            timeout=timeout
        )

    def _create_data_room_in_platform(
            self,
            data_room_definition: DataRoom,
            data_room_hash: bytes
    ):
        attestation_spec_type =\
                self.driver_attestation_specification.WhichOneof("attestation_specification")

        if attestation_spec_type == "intelEpid":
            mrenclave = self.driver_attestation_specification.intelEpid.mrenclave
        elif attestation_spec_type == "intelDcap":
            mrenclave = self.driver_attestation_specification.intelDcap.mrenclave
        else:
            raise Exception("Unknown attestation specification type")

        attestation_specification_hash = hashlib.sha256(
            serialize_length_delimited(
                self.driver_attestation_specification
            )
        ).hexdigest()

        self.platform._platform_api.publish_data_room(
            data_room_definition,
            data_room_hash=data_room_hash.hex(),
            attestation_specification_hash=attestation_specification_hash
        )

    def _is_web_data_room(self, data_room_id: str) -> bool:
        data_room = self.platform._platform_api.get_data_room_by_hash(data_room_id)
        if data_room:
            return data_room["source"] == "WEB"
        else:
            raise Exception(f"Unable to find data room with id '{data_room_id}'")

    @property
    def platform(self) -> SessionPlatformFeatures:
        """
        Provider of a list of convenience methods to interact with the
        Decentriq platform.

        This field exposes an object that provides a set of convenience features
        known from the Decentriq web platform. These include, for example,
        getting the list of data sets that are also visible in the browser-based
        platform UI.
        """
        if self._platform:
            return self._platform
        else:
            raise Exception(
                "This field is not set as the client from wich this session has"
                " been derived has not been configured with integration to the web"
                " platform."
            )

    @property
    def is_integrated_with_platform(self):
        return self._platform is not None

Class for managing the communication with an enclave.

#   Session( client: decentriq_platform.client.Client, session_id: str, driver_attestation_specification: attestation_pb2.AttestationSpecification, client_protocols: List[int], auth: decentriq_platform.authentication.Auth, platform_api: Optional[decentriq_platform.platform.PlatformApi] = None )
View Source
    def __init__(
            self,
            client: Client,
            session_id: str,
            driver_attestation_specification: AttestationSpecification,
            client_protocols: List[int],
            auth: Auth,
            platform_api: Optional[PlatformApi] = None,
   ):
        """
        `Session` instances should not be instantiated directly but rather
         be created using a `Client` object using  `decentriq_platform.Client.create_session`.
        """
        url = Endpoints.SESSION_FATQUOTE.replace(":sessionId", session_id)
        response: FatquoteResBody = client._api.get(url).json()
        fatquote_bytes = b64decode(response["fatquoteBase64"])
        fatquote = Fatquote()
        fatquote.ParseFromString(fatquote_bytes)
        verification = Verification(attestation_specification=driver_attestation_specification)
        report_data = verification.verify(fatquote)
        self.client = client
        self.session_id = session_id
        self.auth = auth
        self.email = auth.user_id
        self.keypair = chily.Keypair.from_random()
        self.fatquote = fatquote
        self.report_data = report_data
        self.driver_attestation_specification = driver_attestation_specification
        self.client_protocols = client_protocols

        if platform_api:
            self._platform = SessionPlatformFeatures(self, platform_api)
        else:
            self._platform = None

Session instances should not be instantiated directly but rather be created using a Client object using decentriq_platform.Client.create_session.

#   def send_request( self, request: gcg_pb2.GcgRequest, protocol: int ) -> List[gcg_pb2.GcgResponse]:
View Source
    def send_request(
            self,
            request: GcgRequest,
            protocol: int,
    ) -> List[GcgResponse]:
        """
        Low-level method for sending a raw `GcgRequest` to the enclave.
        Use this method if any of the convenience methods (such as `run_computation`) don't perform
        the exact task you want.
        """
        gcg_protocol = serialize_length_delimited(
            ComputeNodeProtocol(
                version=protocol
            )
        )
        serialized_request = serialize_length_delimited(
            Request(
                deltaRequest= self._encrypt_and_encode_data(
                    gcg_protocol + serialize_length_delimited(request),
                    self.auth
                )
            )
        )
        url = Endpoints.SESSION_MESSAGES.replace(":sessionId", self.session_id)
        enclave_request = EnclaveMessage(data=b64encode(serialized_request).decode("ascii"))
        enclave_response: bytes = self.client._api.post(
            url, json.dumps(enclave_request), {"Content-type": "application/json", "Accept-Version": "2"}
        ).content

        responses: List[GcgResponse] = []
        offset = 0
        while offset < len(enclave_response):
            response_container = Response()
            offset += parse_length_delimited(enclave_response[offset:], response_container)
            if response_container.HasField("unsuccessfulResponse"):
                raise Exception(response_container.unsuccessfulResponse)
            else:
                response = GcgResponse()
                decrypted_response = self._decode_and_decrypt_data(
                    response_container.successfulResponse
                )
                response_protocol = ComputeNodeProtocol()
                response_offset = parse_length_delimited(
                        decrypted_response, response_protocol
                )
                if response_protocol.version != protocol:
                    raise Exception("Different response protocol version than requested")
                parse_length_delimited(decrypted_response[response_offset:], response)
                if response.HasField("failure"):
                    raise Exception(response.failure)
                responses.append(response)
        return responses

Low-level method for sending a raw GcgRequest to the enclave. Use this method if any of the convenience methods (such as run_computation) don't perform the exact task you want.

#   def publish_data_room( self, data_room_definition: 'Tuple[DataRoom, List[ConfigurationModification]]' ) -> str:
View Source
    def publish_data_room(
            self,
            data_room_definition: Tuple[DataRoom, List[ConfigurationModification]]
    ) -> str:
        """
        Create a data room with the provided protobuf configuration object
        and have the enclave apply the given list of modifications to the data
        room configuration.

        The id returned from this method will be used when interacting with the
        published data room (for example when running computations or publishing
        datasets).
        """
        response = self._publish_data_room(data_room_definition)
        return _get_data_room_id(response).hex()

Create a data room with the provided protobuf configuration object and have the enclave apply the given list of modifications to the data room configuration.

The id returned from this method will be used when interacting with the published data room (for example when running computations or publishing datasets).

#   def publish_data_room_configuration_commit(self, configuration_commit: 'ConfigurationCommit') -> str:
View Source
    def publish_data_room_configuration_commit(
            self,
            configuration_commit: ConfigurationCommit
    ) -> str:
        """
        Publish the given data room configuration commit.

        Configuration commits can be built using a `DataRoomCommitBuilder` object.

        The id returned from this method will be used when running development
        computations or when trying to merge this commit into the main
        data room configuration.
        """
        endpoint_protocols = [1]
        protocol = self._get_client_protocol(endpoint_protocols)
        scope_id = self.client._ensure_scope_with_metadata(
            self.email,
            {
                "type": ScopeTypes.DATA_ROOM_INTERMEDIATE_DATA,
                "data_room_id": configuration_commit.dataRoomId.hex(),
            }
        )
        request = CreateConfigurationCommitRequest(
            commit=configuration_commit,
            scope=bytes.fromhex(scope_id)
        )
        responses = self.send_request(
            GcgRequest(createConfigurationCommitRequest=request),
            protocol
        )
        if len(responses) != 1:
            raise Exception("Malformed response")

        response = responses[0]
        if not response.HasField("createConfigurationCommitResponse"):
            raise Exception(
                "Expected createConfigurationCommitResponse, got "
                + str(response.WhichOneof("gcg_response"))
            )

        return response.createConfigurationCommitResponse.commitId.hex()

Publish the given data room configuration commit.

Configuration commits can be built using a DataRoomCommitBuilder object.

The id returned from this method will be used when running development computations or when trying to merge this commit into the main data room configuration.

#   def retrieve_configuration_commit(self, configuration_commit_id: str) -> 'ConfigurationCommit':
View Source
    def retrieve_configuration_commit(
        self,
        configuration_commit_id: str,
    ) -> ConfigurationCommit:
        """
        Retrieve the content of given configuration commit id.

        **Returns**:
        A `ConfigurationCommit`.
        """
        endpoint_protocols = [1]
        protocol = self._get_client_protocol(endpoint_protocols)
        scope_id = self.client._ensure_scope_with_metadata(
            self.email,
            {
                "type": ScopeTypes.DATA_ROOM_COMMITS_DATA,
            }
        )
        request = RetrieveConfigurationCommitRequest(
            commitId=bytes.fromhex(configuration_commit_id),
            scope=bytes.fromhex(scope_id),
        )
        responses = self.send_request(
            GcgRequest(retrieveConfigurationCommitRequest=request),
            protocol
        )
        if len(responses) != 1:
            raise Exception("Malformed response")

        response = responses[0]
        if not response.HasField("retrieveConfigurationCommitResponse"):
            raise Exception(
                "Expected retrieveConfigurationCommitResponse, got "
                + str(response.WhichOneof("gcg_response"))
            )

        return response.retrieveConfigurationCommitResponse.commit

Retrieve the content of given configuration commit id.

Returns: A ConfigurationCommit.

#   def retrieve_configuration_commit_approvers(self, configuration_commit_id: str) -> List[str]:
View Source
    def retrieve_configuration_commit_approvers(
        self,
        configuration_commit_id: str,
    ) -> List[str]:
        """
        Retrieve the list of users who need to approve the merger of a given
        configuration commit.

        **Returns**:
        A list of ids belonging to the users that need to approve the
        configuration commit.
        """
        endpoint_protocols = [1]
        protocol = self._get_client_protocol(endpoint_protocols)
        scope_id = self.client._ensure_scope_with_metadata(
            self.email,
            {
                "type": ScopeTypes.DATA_ROOM_COMMITS_DATA,
            }
        )
        request = RetrieveConfigurationCommitApproversRequest(
            commitId=bytes.fromhex(configuration_commit_id),
            scope=bytes.fromhex(scope_id),
        )
        responses = self.send_request(
            GcgRequest(retrieveConfigurationCommitApproversRequest=request),
            protocol
        )
        if len(responses) != 1:
            raise Exception("Malformed response")

        response = responses[0]
        if not response.HasField("retrieveConfigurationCommitApproversResponse"):
            raise Exception(
                "Expected retrieveConfigurationCommitApproversResponse, got "
                + str(response.WhichOneof("gcg_response"))
            )

        return response.retrieveConfigurationCommitApproversResponse.approvers

Retrieve the list of users who need to approve the merger of a given configuration commit.

Returns: A list of ids belonging to the users that need to approve the configuration commit.

#   def generate_merge_approval_signature(self, configuration_commit_id: str) -> bytes:
View Source
    def generate_merge_approval_signature(
            self,
            configuration_commit_id: str
    ) -> bytes:
        """
        Generate an approval signature required for merging a configuration
        commit.

        To merge a specific configuration commit, each user referenced in the list
        of ids returned by `retrieveConfigurationCommitApprovers` needs to
        generate an approval signature using this method.
        """
        endpoint_protocols = [1]
        protocol = self._get_client_protocol(endpoint_protocols)
        scope_id = self.client._ensure_scope_with_metadata(
            self.email,
            {
                "type": ScopeTypes.DATA_ROOM_COMMITS_DATA,
            }
        )
        request = GenerateMergeApprovalSignatureRequest(
            commitId=bytes.fromhex(configuration_commit_id),
            scope=bytes.fromhex(scope_id),
        )
        responses = self.send_request(
            GcgRequest(generateMergeApprovalSignatureRequest=request),
            protocol
        )
        if len(responses) != 1:
            raise Exception("Malformed response")

        response = responses[0]
        if not response.HasField("generateMergeApprovalSignatureResponse"):
            raise Exception(
                "Expected generateMergeApprovalSignatureResponse, got "
                + str(response.WhichOneof("gcg_response"))
            )

        return response.generateMergeApprovalSignatureResponse.signature

Generate an approval signature required for merging a configuration commit.

To merge a specific configuration commit, each user referenced in the list of ids returned by retrieveConfigurationCommitApprovers needs to generate an approval signature using this method.

#   def merge_configuration_commit( self, configuration_commit_id: str, approval_signatures: Dict[str, bytes] ) -> 'MergeConfigurationCommitResponse':
View Source
    def merge_configuration_commit(
        self,
        configuration_commit_id: str,
        approval_signatures: Dict[str, bytes],
    ) -> MergeConfigurationCommitResponse:
        """
        Request the enclave to merge the given configuration commit into the
        main data room configuration.

        **Parameters**:
        - `configuration_commit_id`: The id of the commit to be merged.
        - `approval_signatures`: A dictionary containing the approval signature for
            each of the required approvers, e.g. `{ "some@email.com": signature }`.
        """
        endpoint_protocols = [1]
        protocol = self._get_client_protocol(endpoint_protocols)
        scope_id = self.client._ensure_scope_with_metadata(
            self.email,
            {
                "type": ScopeTypes.DATA_ROOM_COMMITS_DATA,
            }
        )
        request = MergeConfigurationCommitRequest(
            scope=bytes.fromhex(scope_id),
            commitId=bytes.fromhex(configuration_commit_id),
            approvalSignatures=approval_signatures,
        )
        responses = self.send_request(
            GcgRequest(mergeConfigurationCommitRequest=request),
            protocol
        )
        if len(responses) != 1:
            raise Exception("Malformed response")

        response = responses[0]
        if not response.HasField("mergeConfigurationCommitResponse"):
            raise Exception(
                "Expected mergeConfigurationCommitResponse, got "
                + str(response.WhichOneof("gcg_response"))
            )

        return response.mergeConfigurationCommitResponse

Request the enclave to merge the given configuration commit into the main data room configuration.

Parameters:

  • configuration_commit_id: The id of the commit to be merged.
  • approval_signatures: A dictionary containing the approval signature for each of the required approvers, e.g. { "some@email.com": signature }.
#   def retrieve_data_room_configuration_history( self, data_room_id: str ) -> gcg_pb2.RetrieveDataRoomConfigurationHistoryResponse:
View Source
    def retrieve_data_room_configuration_history(
        self,
        data_room_id: str
    ) -> RetrieveDataRoomConfigurationHistoryResponse:
        """
        Retrieve the current merged data room configuration, as well as the
        history of configuration commits that have already been merged.
        """
        endpoint_protocols = [1]
        protocol = self._get_client_protocol(endpoint_protocols)
        request = RetrieveDataRoomConfigurationHistoryRequest(
            dataRoomId=bytes.fromhex(data_room_id),
        )
        responses = self.send_request(
            GcgRequest(retrieveDataRoomConfigurationHistoryRequest=request),
            protocol
        )
        if len(responses) != 1:
            raise Exception("Malformed response")
        response = responses[0]
        if not response.HasField("retrieveDataRoomConfigurationHistoryResponse"):
            raise Exception(
                "Expected retrieveDataRoomConfigurationHistoryResponse, got "
                + str(response.WhichOneof("gcg_response"))
            )

        return response.retrieveDataRoomConfigurationHistoryResponse

Retrieve the current merged data room configuration, as well as the history of configuration commits that have already been merged.

#   def retrieve_current_data_room_configuration(self, data_room_id: str) -> 'Tuple[DataRoomConfiguration, str]':
View Source
    def retrieve_current_data_room_configuration(
            self,
            data_room_id: str
    ) -> Tuple[DataRoomConfiguration, str]:
        """
        Retrieve the current data room confguration, as well as the current "history pin".

        A history pin is the hash of all the ids of configuration commits that
        make up the structure of a data room. This pin therefore uniquely identifies
        a data room's structure at a certain point in time.
        A data room configuration, as well as its associated history pin, can be used
        to extend an existing data room (for example by adding new compute nodes).
        Extending an existing data room is done using the `DataRoomCommitBuilder` class.
        """
        response = self.retrieve_data_room_configuration_history(data_room_id)
        return (response.currentConfiguration, response.pin.hex())

Retrieve the current data room confguration, as well as the current "history pin".

A history pin is the hash of all the ids of configuration commits that make up the structure of a data room. This pin therefore uniquely identifies a data room's structure at a certain point in time. A data room configuration, as well as its associated history pin, can be used to extend an existing data room (for example by adding new compute nodes). Extending an existing data room is done using the DataRoomCommitBuilder class.

#   def stop_data_room(self, data_room_id: str):
View Source
    def stop_data_room(self, data_room_id: str):
        """
        Stop the data room with the given id, making it impossible to run new
        computations.
        """
        self._update_data_room_status(data_room_id, DataRoomStatus.Value("Stopped"))

Stop the data room with the given id, making it impossible to run new computations.

#   def retrieve_data_room_status(self, data_room_id: str) -> str:
View Source
    def retrieve_data_room_status(
            self,
            data_room_id: str
    ) -> str:
        """
        Returns the status of the data room. Valid values are `"Active"` or `"Stopped"`.
        """
        endpoint_protocols = [0, 1]
        protocol = self._get_client_protocol(endpoint_protocols)
        scope_id = self.client._ensure_scope_with_metadata(
            self.email,
            {
                "type": ScopeTypes.DATA_ROOM_INTERMEDIATE_DATA,
                "data_room_id": data_room_id,
            }
        )
        request = RetrieveDataRoomStatusRequest(
            scope=bytes.fromhex(scope_id),
            dataRoomId=bytes.fromhex(data_room_id),
        )
        responses = self.send_request(
            GcgRequest(retrieveDataRoomStatusRequest=request),
            protocol
        )
        if len(responses) != 1:
            raise Exception("Malformed response")
        response = responses[0]
        if not response.HasField("retrieveDataRoomStatusResponse"):
            raise Exception(
                "Expected retrieveDataRoomStatusResponse, got "
                + str(response.WhichOneof("gcg_response"))
            )

        return DataRoomStatus.Name(response.retrieveDataRoomStatusResponse.status)

Returns the status of the data room. Valid values are "Active" or "Stopped".

#   def retrieve_data_room_definition(self, data_room_id: str) -> gcg_pb2.RetrieveDataRoomResponse:
View Source
    def retrieve_data_room_definition(
            self,
            data_room_id: str
    ) -> RetrieveDataRoomResponse:
        """
        Returns the underlying protobuf object for the data room.
        """
        endpoint_protocols = [1]
        protocol = self._get_client_protocol(endpoint_protocols)
        request = RetrieveDataRoomRequest(
            dataRoomId=bytes.fromhex(data_room_id),
        )
        responses = self.send_request(
            GcgRequest(retrieveDataRoomRequest=request),
            protocol
        )
        if len(responses) != 1:
            raise Exception("Malformed response")
        response = responses[0]
        if not response.HasField("retrieveDataRoomResponse"):
            raise Exception(
                "Expected retrieveDataRoomResponse, got "
                + str(response.WhichOneof("gcg_response"))
            )
        return response.retrieveDataRoomResponse

Returns the underlying protobuf object for the data room.

#   def retrieve_audit_log(self, data_room_id: str) -> gcg_pb2.RetrieveAuditLogResponse:
View Source
    def retrieve_audit_log(
            self,
            data_room_id: str
    ) -> RetrieveAuditLogResponse:
        """
        Returns the audit log for the data room.
        """
        endpoint_protocols = [0, 1]
        protocol = self._get_client_protocol(endpoint_protocols)
        scope_id = self.client._ensure_scope_with_metadata(
            self.email,
            {
                "type": ScopeTypes.DATA_ROOM_INTERMEDIATE_DATA,
                "data_room_id": data_room_id,
            }
        )
        request = RetrieveAuditLogRequest(
            scope=bytes.fromhex(scope_id),
            dataRoomId=bytes.fromhex(data_room_id),
        )
        responses = self.send_request(
            GcgRequest(retrieveAuditLogRequest=request),
            protocol
        )
        if len(responses) != 1:
            raise Exception("Malformed response")
        response = responses[0]
        if not response.HasField("retrieveAuditLogResponse"):
            raise Exception(
                "Expected retrieveAuditLogResponse, got "
                + str(response.WhichOneof("gcg_response"))
            )
        return response.retrieveAuditLogResponse

Returns the audit log for the data room.

#   def publish_dataset( self, data_room_id: str, manifest_hash: str, leaf_id: str, key: decentriq_platform.storage.Key, *, force: bool = False ) -> gcg_pb2.PublishDatasetToDataRoomResponse:
View Source
    def publish_dataset(
            self,
            data_room_id: str,
            manifest_hash: str,
            leaf_id: str,
            key: Key,
            *,
            force: bool = False
    ) -> PublishDatasetToDataRoomResponse:
        """
        Publishes a file and its encryption key to a data room.
        Neither the file or the encryption key will ever be stored in
        unencrypted form.

        This method will check whether the to-be-published file exists.
        If this is not the case, an exception will be raised.
        This behavior can be disabled by setting the `force` flag.

        In case the original client was created with platform integration
        enabled, the method will further check whether there already
        is a dataset published for the given data room.
        In this case, an exception will be thrown and the dataset
        will need to be unpublished first.
        """

        endpoint_protocols = [0, 1]
        protocol = self._get_client_protocol(endpoint_protocols)
        dataset = self.client.get_dataset(manifest_hash)
        if not dataset and not force:
            raise Exception(
                "The dataset you are trying to publish does not exist"
            )

        should_create_dataset_links =\
            self.is_integrated_with_platform and \
            self._is_web_data_room(data_room_id)

        if should_create_dataset_links:
            existing_link = self.platform._platform_api.get_dataset_link(
                data_room_id,
                self._convert_data_node_name_to_verifier_node(leaf_id)
            )
            if existing_link:
                existing_dataset = existing_link["dataset"]["datasetId"]
                raise Exception(
                    "The following dataset has already been published for this node." +
                    " Please unpublish this dataset first." +
                    f" Dataset: '{existing_dataset}'"
                )

        scope_id = self.client._ensure_scope_with_metadata(
            self.email,
            {
                "type": ScopeTypes.DATA_ROOM_INTERMEDIATE_DATA,
                "data_room_id": data_room_id,
            }
        )
        request = PublishDatasetToDataRoomRequest(
            dataRoomId=bytes.fromhex(data_room_id),
            datasetHash=bytes.fromhex(manifest_hash),
            leafName=leaf_id,
            encryptionKey=key.material,
            scope=bytes.fromhex(scope_id)
        )
        responses = self.send_request(
            GcgRequest(publishDatasetToDataRoomRequest=request),
            protocol
        )
        if len(responses) != 1:
            raise Exception("Malformed response")
        response = responses[0]
        if not response.HasField("publishDatasetToDataRoomResponse"):
            raise Exception(
                "Expected publishDatasetToDataRoomResponse, got "
                + str(response.WhichOneof("gcg_response"))
            )
        else:
            if should_create_dataset_links:
                self.platform._platform_api.create_dataset_link(
                    data_room_id,
                    manifest_hash,
                    self._convert_data_node_name_to_verifier_node(leaf_id)
                )

        return response.publishDatasetToDataRoomResponse

Publishes a file and its encryption key to a data room. Neither the file or the encryption key will ever be stored in unencrypted form.

This method will check whether the to-be-published file exists. If this is not the case, an exception will be raised. This behavior can be disabled by setting the force flag.

In case the original client was created with platform integration enabled, the method will further check whether there already is a dataset published for the given data room. In this case, an exception will be thrown and the dataset will need to be unpublished first.

#   def remove_published_dataset( self, data_room_id: str, leaf_id: str ) -> gcg_pb2.RemovePublishedDatasetResponse:
View Source
    def remove_published_dataset(
            self,
            data_room_id: str,
            leaf_id: str,
    ) -> RemovePublishedDatasetResponse:
        """
        Removes a published dataset from the data room.
        """
        endpoint_protocols = [0, 1]
        protocol = self._get_client_protocol(endpoint_protocols)
        scope_id = self.client._ensure_scope_with_metadata(
            self.email,
            {
                "type": ScopeTypes.DATA_ROOM_INTERMEDIATE_DATA,
                "data_room_id": data_room_id,
            }
        )
        request = RemovePublishedDatasetRequest(
            scope=bytes.fromhex(scope_id),
            dataRoomId=bytes.fromhex(data_room_id),
            leafName=leaf_id,
        )
        responses = self.send_request(
            GcgRequest(removePublishedDatasetRequest=request),
            protocol
        )
        if len(responses) != 1:
            raise Exception("Malformed response")
        response = responses[0]
        if not response.HasField("removePublishedDatasetResponse"):
            raise Exception(
                "Expected removePublishedDatasetResponse, got "
                + str(response.WhichOneof("gcg_response"))
            )
        else:
            if self.is_integrated_with_platform and self._is_web_data_room(data_room_id):
                try:
                    self.platform._platform_api.delete_dataset_link(
                        data_room_id,
                        self._convert_data_node_name_to_verifier_node(leaf_id)
                    )
                except Exception as error:
                    print(f"Error when deleting dataset link on platform: {error}")

        return response.removePublishedDatasetResponse

Removes a published dataset from the data room.

#   def retrieve_published_datasets(self, data_room_id: str) -> gcg_pb2.RetrievePublishedDatasetsResponse:
View Source
    def retrieve_published_datasets(
            self,
            data_room_id: str,
    ) -> RetrievePublishedDatasetsResponse:
        """
        Returns the datasets published to the given data room.
        """
        endpoint_protocols = [0, 1]
        protocol = self._get_client_protocol(endpoint_protocols)
        scope_id = self.client._ensure_scope_with_metadata(
            self.email,
            {
                "type": ScopeTypes.DATA_ROOM_INTERMEDIATE_DATA,
                "data_room_id": data_room_id,
            }
        )
        request = RetrievePublishedDatasetsRequest(
            scope=bytes.fromhex(scope_id),
            dataRoomId=bytes.fromhex(data_room_id),
        )
        responses = self.send_request(
            GcgRequest(retrievePublishedDatasetsRequest=request),
            protocol
        )
        if len(responses) != 1:
            raise Exception("Malformed response")
        response = responses[0]
        if not response.HasField("retrievePublishedDatasetsResponse"):
            raise Exception(
                "Expected retrievePublishedDatasetResponse, got "
                + str(response.WhichOneof("gcg_response"))
            )
        return response.retrievePublishedDatasetsResponse

Returns the datasets published to the given data room.

#   def get_computation_status(self, job_id: str) -> gcg_pb2.JobStatusResponse:
View Source
    def get_computation_status(self, job_id: str) -> JobStatusResponse:
        """
        Returns the status of the provided `job_id` which will include the names
        of the nodes that completed their execution
        """
        endpoint_protocols = [0, 1]
        protocol = self._get_client_protocol(endpoint_protocols)
        request = JobStatusRequest(
            jobId=bytes.fromhex(job_id),
        )
        responses = self.send_request(
            GcgRequest(jobStatusRequest=request),
            protocol
        )
        if len(responses) != 1:
            raise Exception("Malformed response")
        response = responses[0]
        if not response.HasField("jobStatusResponse"):
            raise Exception(
                "Expected jobStatusResponse, got "
                + str(response.WhichOneof("gcg_response"))
            )
        return response.jobStatusResponse

Returns the status of the provided job_id which will include the names of the nodes that completed their execution

#   def run_computation( self, data_room_id: str, compute_node_id: str, /, *, dry_run: bool = False ) -> decentriq_platform.types.JobId:
View Source
    def run_computation(
            self,
            data_room_id: str,
            compute_node_id: str,
            /, *,
            dry_run: bool = False,
    ) -> JobId:
        """
        Run a specific computation within the data room with the given id.

        The result will be an identifier object of the job executing the computation.
        This object is required for checking a job's status and retrieving its results.
        """
        response = self._submit_compute(data_room_id, [compute_node_id], dry_run=dry_run)
        return JobId(response.jobId.hex(), compute_node_id)

Run a specific computation within the data room with the given id.

The result will be an identifier object of the job executing the computation. This object is required for checking a job's status and retrieving its results.

#   def wait_until_computation_has_finished( self, job_id: decentriq_platform.types.JobId, /, *, interval: int = 5, timeout: int = None ):
View Source
    def wait_until_computation_has_finished(
        self,
        job_id: JobId,
        /, *,
        interval: int = 5,
        timeout: int = None
    ):
        """
        Wait for the given job to complete.

        The method will check for the job's completeness every `interval` seconds and up to
        an optional `timeout` seconds after which the method will raise an exception.
        """
        elapsed = 0
        while True:
            if timeout is not None and elapsed > timeout:
                raise Exception(
                    f"Timeout when trying to get result for job {job_id.id} of"
                    f" {job_id.compute_node_id} (waited {timeout} seconds)"
                )
            elif job_id.compute_node_id in self.get_computation_status(job_id.id).completeComputeNodeNames:
                break
            else:
                sleep(interval)
                elapsed += interval

Wait for the given job to complete.

The method will check for the job's completeness every interval seconds and up to an optional timeout seconds after which the method will raise an exception.

#   def run_dev_computation( self, configuration_commit_id: str, compute_node_id: str, /, *, dry_run: bool = False ) -> decentriq_platform.types.JobId:
View Source
    def run_dev_computation(
            self,
            configuration_commit_id: str,
            compute_node_id: str,
            /, *,
            dry_run: bool = False,
    ) -> JobId:
        """
        Run a specific computation within the context of the data room configuration
        defined by the given commit id.
        Such "development" computations can also be run for configuration commits
        that have not yet been merged.

        The result will be an identifier object of the job executing the computation.
        This object is required for checking a job's status and retrieving its results.
        """
        response = self._submit_dev_compute(configuration_commit_id, [compute_node_id], dry_run=dry_run)
        return JobId(response.jobId.hex(), compute_node_id)

Run a specific computation within the context of the data room configuration defined by the given commit id. Such "development" computations can also be run for configuration commits that have not yet been merged.

The result will be an identifier object of the job executing the computation. This object is required for checking a job's status and retrieving its results.

#   def get_computation_result( self, job_id: decentriq_platform.types.JobId, /, *, interval: int = 5, timeout: int = None ) -> bytes:
View Source
    def get_computation_result(
            self,
            job_id: JobId,
            /, *,
            interval: int = 5,
            timeout: int = None
    ) -> bytes:
        """
        Wait for the given job to complete and retrieve its results as a raw byte string.

        The method will check for the job's completeness every `interval` seconds and up to
        an optional `timeout` seconds after which the method will raise an exception.
        If the job completes and the results can be retrieved successfully, a raw byte string
        will be returned. The bytes tring can be transformed into a more useful object using
        a variety of helper methods. These helper methods are specific for the type of computation
        you ran and can be found in the corresponding packages.
        """
        elapsed = 0
        job_id_bytes = bytes.fromhex(job_id.id)
        self.wait_until_computation_has_finished(
            job_id,
            interval=interval,
            timeout=timeout
        )
        results = self._get_job_results(job_id_bytes, job_id.compute_node_id)
        return results

Wait for the given job to complete and retrieve its results as a raw byte string.

The method will check for the job's completeness every interval seconds and up to an optional timeout seconds after which the method will raise an exception. If the job completes and the results can be retrieved successfully, a raw byte string will be returned. The bytes tring can be transformed into a more useful object using a variety of helper methods. These helper methods are specific for the type of computation you ran and can be found in the corresponding packages.

#   def run_computation_and_get_results( self, data_room_id: str, compute_node_id: str, /, *, interval: int = 5, timeout: int = None ) -> Optional[bytes]:
View Source
    def run_computation_and_get_results(
            self,
            data_room_id: str,
            compute_node_id: str,
            /, *,
            interval: int = 5,
            timeout: int = None
    ) -> Optional[bytes]:
        """
        Run a specific computation and return its results.

        This method is simply a wrapper for running `run_computation` and `get_computation_result`
        directly after each other
        """
        job_id = self.run_computation(data_room_id, compute_node_id)
        return self.get_computation_result(
            job_id,
            interval=interval,
            timeout=timeout
        )

Run a specific computation and return its results.

This method is simply a wrapper for running run_computation and get_computation_result directly after each other

Provider of a list of convenience methods to interact with the Decentriq platform.

This field exposes an object that provides a set of convenience features known from the Decentriq web platform. These include, for example, getting the list of data sets that are also visible in the browser-based platform UI.

#   is_integrated_with_platform
#   class DataRoomBuilder:
View Source
class DataRoomBuilder():
    """
    A helper class to ease the building process of a data clean room.
    """
    name: str
    governance_protocol: GovernanceProtocolProto
    description: Optional[str]
    owner_email: Optional[str]
    modifications_builder: DataRoomModificationsBuilder

    def __init__(
            self,
            name: str,
            enclave_specs: Dict[str, EnclaveSpecification],
            governance_protocol: GovernanceProtocolProto = GovernanceProtocol.static(),
            *,
            add_basic_user_permissions: bool = True,
            description: str = None,
            owner_email: str = None,
        ) -> None:
        """
        Create a data room builder object.

        **Parameters**:
        - `name`: The name of the data room to be created.
        - `enclave_specs`: The enclave specification set in which to look-up
            enclave specs for enclaves responsible for executing compute nodes.
            These specs are provided by the `decentriq_platform.enclave_specifications`
            catalogue.
        - `governance_protocol`: The protocol that defines whether and how a
            data room can be changed after it has been published.
        - `add_basic_user_permissions`: Whether to add basic user permissions
            for each participant. These are:
            1. Permission to retrieve the data room definition
            2. Permission to retrieve the status of the data room
            3. Permission to retrieve the audit log
            4. Permission to retrieve the list of datasets that have been published to the data room
            5. Permission to run development computations
        - `description`: Description of the data room.
        - `owner_email`: A custom owner of the data room. By default this will
            be set to the owner of the session publishing the data room.
        """
        assert name, "The DCR must have a non-empty name"

        self.modifications_builder = DataRoomModificationsBuilder(
            enclave_specs,
            add_basic_user_permissions=add_basic_user_permissions
        )
        self.name = name
        self.owner_email = owner_email
        self.description = description
        self.governance_protocol = governance_protocol
        self.enclave_specs = enclave_specs

        if description:
            self.add_description(description)

    def add_data_node(
            self,
            name: str,
            is_required: bool = False,
            node_id: Optional[str] = None
    ) -> str:
        """
        Add a new data node. If the node is marked as required, any computation
        which includes it as a dependency will not start in case no data has
        been provided yet.

        **Parameters**:
        - `name`: Name of the data node.
        - `is_required`: If true, any computation which depends on this data node
            can only be run if data has been provided for this node.
        - `node_id`: A custom identifier for this node.
            If not specified, the identifier is generated automatically.

        **Returns**:
        The id that was assigned to the added data node.
        This id will be needed when connecting a dataset or when permissions condering
        this node are defined.
        """
        return self.modifications_builder.add_data_node(
            name,
            is_required=is_required,
            node_id=node_id
        )

    def add_compute_node(
            self,
            node: Node,
            node_id: Optional[str] = None,
            attestation_id: Optional[str] = None
    ) -> str:
        """
        Add a new compute node. Specific compute node classes are provided either
        by the main package or by the respective compute submodules.

        **Parameters**:
        - `node`: The node object to add.
        - `node_id`: A customer identifier for this node. If not specified,
            the identifier is generated automatically.
        - `attestation_id`: An identifier for a concrete attestation specification
            to use for this compute node. If not specified, the specification is
            chosen automatically based on the type of compute node.

        **Returns**:
        The id that was assigned to the added compute node.
        This id is needed when running computations or when adding permissions
        concerning this node.
        """
        return self.modifications_builder.add_compute_node(
            node,
            node_id=node_id,
            attestation_id=attestation_id
        )

    def add_user_permission(
            self,
            email: str,
            authentication_method: AuthenticationMethod,
            permissions: List[Permission],
    ):
        """
        Add permissions for a user.
        The authentication is performed on the enclave side based on the method supplied.
        """
        return self.modifications_builder.add_user_permission(
            email,
            authentication_method,
            permissions
        )

    def add_description(self, description: str):
        """Add a description to the data room being built."""
        self.description = description

    def add_owner_email(self, email: str):
        """
        Specify a specific owner of the data room.
        By default, the current user will be used.
        """
        self.owner_email = email

    def build(self) -> Tuple[DataRoom, List[ConfigurationModification]]:
        """
        Finalize data room contruction.

        Built data rooms still need to be published by a
        `decentriq_platform.Session` before they can be interacted with.

        **Returns**:
        A tuple containing the following elements:
        (1) A data room object that stores the core properties of the DCR, such
        as its name, its owner, and the government protocol that defines what changes
        can be made to the data room's configuration.
        (2) A list of the recorded modifications that will be applied to the initially
        empty data room configuration within the enclave.
        """
        data_room = DataRoom()
        data_room.name = self.name
        if self.owner_email:
            data_room.ownerEmail = self.owner_email
        if self.description:
            data_room.description = self.description
        data_room.id = DataRoomBuilder._generate_id()
        data_room.governanceProtocol.CopyFrom(self.governance_protocol)
        modifications = self.modifications_builder.build()

        return data_room, modifications

    @staticmethod
    def _generate_id():
        return str(uuid.uuid4())

A helper class to ease the building process of a data clean room.

#   DataRoomBuilder( name: str, enclave_specs: Dict[str, decentriq_platform.types.EnclaveSpecification], governance_protocol: data_room_pb2.GovernanceProtocol = staticDataRoomPolicy { } , *, add_basic_user_permissions: bool = True, description: str = None, owner_email: str = None )
View Source
    def __init__(
            self,
            name: str,
            enclave_specs: Dict[str, EnclaveSpecification],
            governance_protocol: GovernanceProtocolProto = GovernanceProtocol.static(),
            *,
            add_basic_user_permissions: bool = True,
            description: str = None,
            owner_email: str = None,
        ) -> None:
        """
        Create a data room builder object.

        **Parameters**:
        - `name`: The name of the data room to be created.
        - `enclave_specs`: The enclave specification set in which to look-up
            enclave specs for enclaves responsible for executing compute nodes.
            These specs are provided by the `decentriq_platform.enclave_specifications`
            catalogue.
        - `governance_protocol`: The protocol that defines whether and how a
            data room can be changed after it has been published.
        - `add_basic_user_permissions`: Whether to add basic user permissions
            for each participant. These are:
            1. Permission to retrieve the data room definition
            2. Permission to retrieve the status of the data room
            3. Permission to retrieve the audit log
            4. Permission to retrieve the list of datasets that have been published to the data room
            5. Permission to run development computations
        - `description`: Description of the data room.
        - `owner_email`: A custom owner of the data room. By default this will
            be set to the owner of the session publishing the data room.
        """
        assert name, "The DCR must have a non-empty name"

        self.modifications_builder = DataRoomModificationsBuilder(
            enclave_specs,
            add_basic_user_permissions=add_basic_user_permissions
        )
        self.name = name
        self.owner_email = owner_email
        self.description = description
        self.governance_protocol = governance_protocol
        self.enclave_specs = enclave_specs

        if description:
            self.add_description(description)

Create a data room builder object.

Parameters:

  • name: The name of the data room to be created.
  • enclave_specs: The enclave specification set in which to look-up enclave specs for enclaves responsible for executing compute nodes. These specs are provided by the decentriq_platform.enclave_specifications catalogue.
  • governance_protocol: The protocol that defines whether and how a data room can be changed after it has been published.
  • add_basic_user_permissions: Whether to add basic user permissions for each participant. These are:
    1. Permission to retrieve the data room definition
    2. Permission to retrieve the status of the data room
    3. Permission to retrieve the audit log
    4. Permission to retrieve the list of datasets that have been published to the data room
    5. Permission to run development computations
  • description: Description of the data room.
  • owner_email: A custom owner of the data room. By default this will be set to the owner of the session publishing the data room.
#   def add_data_node( self, name: str, is_required: bool = False, node_id: Optional[str] = None ) -> str:
View Source
    def add_data_node(
            self,
            name: str,
            is_required: bool = False,
            node_id: Optional[str] = None
    ) -> str:
        """
        Add a new data node. If the node is marked as required, any computation
        which includes it as a dependency will not start in case no data has
        been provided yet.

        **Parameters**:
        - `name`: Name of the data node.
        - `is_required`: If true, any computation which depends on this data node
            can only be run if data has been provided for this node.
        - `node_id`: A custom identifier for this node.
            If not specified, the identifier is generated automatically.

        **Returns**:
        The id that was assigned to the added data node.
        This id will be needed when connecting a dataset or when permissions condering
        this node are defined.
        """
        return self.modifications_builder.add_data_node(
            name,
            is_required=is_required,
            node_id=node_id
        )

Add a new data node. If the node is marked as required, any computation which includes it as a dependency will not start in case no data has been provided yet.

Parameters:

  • name: Name of the data node.
  • is_required: If true, any computation which depends on this data node can only be run if data has been provided for this node.
  • node_id: A custom identifier for this node. If not specified, the identifier is generated automatically.

Returns: The id that was assigned to the added data node. This id will be needed when connecting a dataset or when permissions condering this node are defined.

#   def add_compute_node( self, node: decentriq_platform.node.Node, node_id: Optional[str] = None, attestation_id: Optional[str] = None ) -> str:
View Source
    def add_compute_node(
            self,
            node: Node,
            node_id: Optional[str] = None,
            attestation_id: Optional[str] = None
    ) -> str:
        """
        Add a new compute node. Specific compute node classes are provided either
        by the main package or by the respective compute submodules.

        **Parameters**:
        - `node`: The node object to add.
        - `node_id`: A customer identifier for this node. If not specified,
            the identifier is generated automatically.
        - `attestation_id`: An identifier for a concrete attestation specification
            to use for this compute node. If not specified, the specification is
            chosen automatically based on the type of compute node.

        **Returns**:
        The id that was assigned to the added compute node.
        This id is needed when running computations or when adding permissions
        concerning this node.
        """
        return self.modifications_builder.add_compute_node(
            node,
            node_id=node_id,
            attestation_id=attestation_id
        )

Add a new compute node. Specific compute node classes are provided either by the main package or by the respective compute submodules.

Parameters:

  • node: The node object to add.
  • node_id: A customer identifier for this node. If not specified, the identifier is generated automatically.
  • attestation_id: An identifier for a concrete attestation specification to use for this compute node. If not specified, the specification is chosen automatically based on the type of compute node.

Returns: The id that was assigned to the added compute node. This id is needed when running computations or when adding permissions concerning this node.

#   def add_user_permission( self, email: str, authentication_method: data_room_pb2.AuthenticationMethod, permissions: List[data_room_pb2.Permission] ):
View Source
    def add_user_permission(
            self,
            email: str,
            authentication_method: AuthenticationMethod,
            permissions: List[Permission],
    ):
        """
        Add permissions for a user.
        The authentication is performed on the enclave side based on the method supplied.
        """
        return self.modifications_builder.add_user_permission(
            email,
            authentication_method,
            permissions
        )

Add permissions for a user. The authentication is performed on the enclave side based on the method supplied.

#   def add_description(self, description: str):
View Source
    def add_description(self, description: str):
        """Add a description to the data room being built."""
        self.description = description

Add a description to the data room being built.

#   def add_owner_email(self, email: str):
View Source
    def add_owner_email(self, email: str):
        """
        Specify a specific owner of the data room.
        By default, the current user will be used.
        """
        self.owner_email = email

Specify a specific owner of the data room. By default, the current user will be used.

#   def build( self ) -> Tuple[data_room_pb2.DataRoom, List[data_room_pb2.ConfigurationModification]]:
View Source
    def build(self) -> Tuple[DataRoom, List[ConfigurationModification]]:
        """
        Finalize data room contruction.

        Built data rooms still need to be published by a
        `decentriq_platform.Session` before they can be interacted with.

        **Returns**:
        A tuple containing the following elements:
        (1) A data room object that stores the core properties of the DCR, such
        as its name, its owner, and the government protocol that defines what changes
        can be made to the data room's configuration.
        (2) A list of the recorded modifications that will be applied to the initially
        empty data room configuration within the enclave.
        """
        data_room = DataRoom()
        data_room.name = self.name
        if self.owner_email:
            data_room.ownerEmail = self.owner_email
        if self.description:
            data_room.description = self.description
        data_room.id = DataRoomBuilder._generate_id()
        data_room.governanceProtocol.CopyFrom(self.governance_protocol)
        modifications = self.modifications_builder.build()

        return data_room, modifications

Finalize data room contruction.

Built data rooms still need to be published by a decentriq_platform.Session before they can be interacted with.

Returns: A tuple containing the following elements: (1) A data room object that stores the core properties of the DCR, such as its name, its owner, and the government protocol that defines what changes can be made to the data room's configuration. (2) A list of the recorded modifications that will be applied to the initially empty data room configuration within the enclave.

#   class DataRoomCommitBuilder:
View Source
class DataRoomCommitBuilder:
    """
    A helper class to build a data room configuration commit,
    i.e. a list of modifications that are to be applied to the configuration
    of an existing data room.
    """
    data_room_id: str
    history_pin: str
    modifications_builder: DataRoomModificationsBuilder

    def __init__(
            self,
            data_room_id: str,
            current_configuration: DataRoomConfiguration,
            history_pin: str,
            enclave_specs: Dict[str, EnclaveSpecification],
            *,
            add_basic_user_permissions: bool = False,
    ):
        """
        Construct a builder object for constructing new data room
        configuration commits.
        A configuraton commit contains a list of modifications that
        can be applied to existing data room configuration.

        **Parameters**:
        - `data_room_id`: The data room to which the modifications should be
            applied.
        - `data_room_configuration`: The current configuration of the data room
            to be altered.
        - `history_pin`: The current history pin that identifies a specific
            point in time within a line of configuration changes.
        - `enclave_specs`: The enclave specification set in which to look-up
            enclave specs for enclaves responsible for executing compute nodes.
            These specs are provided by the `decentriq_platform.enclave_specifications`
            catalogue.
        - `add_basic_user_permissions`: Whether to add basic user permissions
            for each participant. These are:
            1. Permission to retrieve the data room definition
            2. Permission to retrieve the status of the data room
            3. Permission to retrieve the audit log
            4. Permission to retrieve the list of datasets that have been published to the data room
            5. Permission to run development computations
        """
        self.data_room_id = data_room_id
        self.history_pin = history_pin
        self.modifications_builder = DataRoomModificationsBuilder(
            enclave_specs,
            data_room_id=data_room_id,
            history_pin=history_pin,
            data_room_configuration=current_configuration,
            add_basic_user_permissions=add_basic_user_permissions,
        )

    def add_data_node(
            self,
            name: str,
            is_required: bool = False,
            node_id: Optional[str] = None
    ) -> str:
        """
        Add a new data node. If the node is marked as required, any computation
        which includes it as a dependency will not start in case no data has
        been provided yet.

        **Parameters**:
        - `name`: Name of the data node.
        - `is_required`: If true, any computation which depends on this data node
            can only be run if data has been provided for this node.
        - `node_id`: A custom identifier for this node.
            If not specified, the identifier is generated automatically.

        **Returns**:
        The id that was assigned to the added data node.
        This id will be needed when connecting a dataset or when permissions condering
        this node are defined.
        """
        return self.modifications_builder.add_data_node(
            name,
            is_required=is_required,
            node_id=node_id
        )

    def change_attestation_specification(
            self,
            attestation_id: str,
            attestation_specification: AttestationSpecification
    ) -> str:
        self.modifications_builder.change_attestation_specification(
            attestation_id, attestation_specification
        )

    def add_compute_node(
            self,
            node: Node,
            node_id: Optional[str] = None,
            attestation_id: Optional[str] = None
    ) -> str:
        """
        Add a new compute node. Specific compute node classes are provided either
        by the main package or by the respective compute submodules.

        **Parameters**:
        - `node`: The node object to add.
        - `node_id`: A customer identifier for this node. If not specified,
            the identifier is generated automatically.
        - `attestation_id`: An identifier for a concrete attestation specification
            to use for this compute node. If not specified, the specification is
            chosen automatically based on the type of compute node.

        **Returns**:
        The id that was assigned to the added compute node.
        This id is needed when running computations or when adding permissions
        concerning this node.
        """
        return self.modifications_builder.add_compute_node(
            node,
            node_id=node_id,
            attestation_id=attestation_id
        )

    def add_user_permission(
            self,
            email: str,
            authentication_method: AuthenticationMethod,
            permissions: List[Permission],
    ):
        """
        Add permissions for a user.
        The authentication is performed on the enclave side based on the method supplied.
        """
        return self.modifications_builder.add_user_permission(
            email,
            authentication_method,
            permissions
        )

    def build(self):
        """
        Build the data room configuration commit.

        The built commit still needs to be published and merged in order for
        it to be made part of the data room configuration.
        """
        return ConfigurationCommit(
            dataRoomId=bytes.fromhex(self.data_room_id),
            dataRoomHistoryPin=bytes.fromhex(self.history_pin),
            modifications=self.modifications_builder.build()
        )

A helper class to build a data room configuration commit, i.e. a list of modifications that are to be applied to the configuration of an existing data room.

#   DataRoomCommitBuilder( data_room_id: str, current_configuration: data_room_pb2.DataRoomConfiguration, history_pin: str, enclave_specs: Dict[str, decentriq_platform.types.EnclaveSpecification], *, add_basic_user_permissions: bool = False )
View Source
    def __init__(
            self,
            data_room_id: str,
            current_configuration: DataRoomConfiguration,
            history_pin: str,
            enclave_specs: Dict[str, EnclaveSpecification],
            *,
            add_basic_user_permissions: bool = False,
    ):
        """
        Construct a builder object for constructing new data room
        configuration commits.
        A configuraton commit contains a list of modifications that
        can be applied to existing data room configuration.

        **Parameters**:
        - `data_room_id`: The data room to which the modifications should be
            applied.
        - `data_room_configuration`: The current configuration of the data room
            to be altered.
        - `history_pin`: The current history pin that identifies a specific
            point in time within a line of configuration changes.
        - `enclave_specs`: The enclave specification set in which to look-up
            enclave specs for enclaves responsible for executing compute nodes.
            These specs are provided by the `decentriq_platform.enclave_specifications`
            catalogue.
        - `add_basic_user_permissions`: Whether to add basic user permissions
            for each participant. These are:
            1. Permission to retrieve the data room definition
            2. Permission to retrieve the status of the data room
            3. Permission to retrieve the audit log
            4. Permission to retrieve the list of datasets that have been published to the data room
            5. Permission to run development computations
        """
        self.data_room_id = data_room_id
        self.history_pin = history_pin
        self.modifications_builder = DataRoomModificationsBuilder(
            enclave_specs,
            data_room_id=data_room_id,
            history_pin=history_pin,
            data_room_configuration=current_configuration,
            add_basic_user_permissions=add_basic_user_permissions,
        )

Construct a builder object for constructing new data room configuration commits. A configuraton commit contains a list of modifications that can be applied to existing data room configuration.

Parameters:

  • data_room_id: The data room to which the modifications should be applied.
  • data_room_configuration: The current configuration of the data room to be altered.
  • history_pin: The current history pin that identifies a specific point in time within a line of configuration changes.
  • enclave_specs: The enclave specification set in which to look-up enclave specs for enclaves responsible for executing compute nodes. These specs are provided by the decentriq_platform.enclave_specifications catalogue.
  • add_basic_user_permissions: Whether to add basic user permissions for each participant. These are:
    1. Permission to retrieve the data room definition
    2. Permission to retrieve the status of the data room
    3. Permission to retrieve the audit log
    4. Permission to retrieve the list of datasets that have been published to the data room
    5. Permission to run development computations
#   def add_data_node( self, name: str, is_required: bool = False, node_id: Optional[str] = None ) -> str:
View Source
    def add_data_node(
            self,
            name: str,
            is_required: bool = False,
            node_id: Optional[str] = None
    ) -> str:
        """
        Add a new data node. If the node is marked as required, any computation
        which includes it as a dependency will not start in case no data has
        been provided yet.

        **Parameters**:
        - `name`: Name of the data node.
        - `is_required`: If true, any computation which depends on this data node
            can only be run if data has been provided for this node.
        - `node_id`: A custom identifier for this node.
            If not specified, the identifier is generated automatically.

        **Returns**:
        The id that was assigned to the added data node.
        This id will be needed when connecting a dataset or when permissions condering
        this node are defined.
        """
        return self.modifications_builder.add_data_node(
            name,
            is_required=is_required,
            node_id=node_id
        )

Add a new data node. If the node is marked as required, any computation which includes it as a dependency will not start in case no data has been provided yet.

Parameters:

  • name: Name of the data node.
  • is_required: If true, any computation which depends on this data node can only be run if data has been provided for this node.
  • node_id: A custom identifier for this node. If not specified, the identifier is generated automatically.

Returns: The id that was assigned to the added data node. This id will be needed when connecting a dataset or when permissions condering this node are defined.

#   def change_attestation_specification( self, attestation_id: str, attestation_specification: attestation_pb2.AttestationSpecification ) -> str:
View Source
    def change_attestation_specification(
            self,
            attestation_id: str,
            attestation_specification: AttestationSpecification
    ) -> str:
        self.modifications_builder.change_attestation_specification(
            attestation_id, attestation_specification
        )
#   def add_compute_node( self, node: decentriq_platform.node.Node, node_id: Optional[str] = None, attestation_id: Optional[str] = None ) -> str:
View Source
    def add_compute_node(
            self,
            node: Node,
            node_id: Optional[str] = None,
            attestation_id: Optional[str] = None
    ) -> str:
        """
        Add a new compute node. Specific compute node classes are provided either
        by the main package or by the respective compute submodules.

        **Parameters**:
        - `node`: The node object to add.
        - `node_id`: A customer identifier for this node. If not specified,
            the identifier is generated automatically.
        - `attestation_id`: An identifier for a concrete attestation specification
            to use for this compute node. If not specified, the specification is
            chosen automatically based on the type of compute node.

        **Returns**:
        The id that was assigned to the added compute node.
        This id is needed when running computations or when adding permissions
        concerning this node.
        """
        return self.modifications_builder.add_compute_node(
            node,
            node_id=node_id,
            attestation_id=attestation_id
        )

Add a new compute node. Specific compute node classes are provided either by the main package or by the respective compute submodules.

Parameters:

  • node: The node object to add.
  • node_id: A customer identifier for this node. If not specified, the identifier is generated automatically.
  • attestation_id: An identifier for a concrete attestation specification to use for this compute node. If not specified, the specification is chosen automatically based on the type of compute node.

Returns: The id that was assigned to the added compute node. This id is needed when running computations or when adding permissions concerning this node.

#   def add_user_permission( self, email: str, authentication_method: data_room_pb2.AuthenticationMethod, permissions: List[data_room_pb2.Permission] ):
View Source
    def add_user_permission(
            self,
            email: str,
            authentication_method: AuthenticationMethod,
            permissions: List[Permission],
    ):
        """
        Add permissions for a user.
        The authentication is performed on the enclave side based on the method supplied.
        """
        return self.modifications_builder.add_user_permission(
            email,
            authentication_method,
            permissions
        )

Add permissions for a user. The authentication is performed on the enclave side based on the method supplied.

#   def build(self):
View Source
    def build(self):
        """
        Build the data room configuration commit.

        The built commit still needs to be published and merged in order for
        it to be made part of the data room configuration.
        """
        return ConfigurationCommit(
            dataRoomId=bytes.fromhex(self.data_room_id),
            dataRoomHistoryPin=bytes.fromhex(self.history_pin),
            modifications=self.modifications_builder.build()
        )

Build the data room configuration commit.

The built commit still needs to be published and merged in order for it to be made part of the data room configuration.

#   class Permissions:
View Source
class Permissions:
    """Helper class for creating data room permissions."""

    def __init__(self):
        """This class is not meant to be instantiated."""

    @staticmethod
    def leaf_crud(leaf_node_name: str) -> Permission:
        """Permission required for publishing a dataset to a data room."""
        return Permission(
            leafCrudPermission=LeafCrudPermission(leafNodeName=leaf_node_name)
        )

    @staticmethod
    def retrieve_data_room() -> Permission:
        """Permission required for retrieving a data room's definition after it has
        been published."""
        return Permission(retrieveDataRoomPermission=RetrieveDataRoomPermission())

    @staticmethod
    def retrieve_audit_log() -> Permission:
        """Permission for retrieving the audit log, a log detailing all past interactions
        with the data room."""
        return Permission(retrieveAuditLogPermission=RetrieveAuditLogPermission())

    @staticmethod
    def execute_compute(compute_node_name: str) -> Permission:
        """Permission for executing the computation with the given name."""
        return Permission(
            executeComputePermission=ExecuteComputePermission(
                computeNodeName=compute_node_name
            )
        )

    @staticmethod
    def retrieve_data_room_status() -> Permission:
        """Permission for retrieving the status of a data room."""
        return Permission(
            retrieveDataRoomStatusPermission=RetrieveDataRoomStatusPermission()
        )

    @staticmethod
    def update_data_room_status() -> Permission:
        """Permission for updating the status of a data room (e.g. irreversibly stopping it)."""
        return Permission(
            updateDataRoomStatusPermission=UpdateDataRoomStatusPermission()
        )

    @staticmethod
    def retrieve_published_datasets() -> Permission:
        """Permission for retrieving the list of datasets that has been published to the data room."""
        return Permission(
            retrievePublishedDatasetsPermission=RetrievePublishedDatasetsPermission()
        )

    @staticmethod
    def dry_run() -> Permission:
        """Permission for triggering a dry run on the data room."""
        return Permission(dryRunPermission=DryRunPermission())

    @staticmethod
    def generate_merge_signature() -> Permission:
        """Permission for generating signatures required for merge approvals."""
        return Permission(
            generateMergeSignaturePermission=GenerateMergeSignaturePermission()
        )

    @staticmethod
    def execute_development_compute() -> Permission:
        """Permission for executing computations in development mode."""
        return Permission(
            executeDevelopmentComputePermission=ExecuteDevelopmentComputePermission()
        )

    @staticmethod
    def merge_configuration_commit() -> Permission:
        """
        Permission for merging configuration commits into the current data
        room configuration.
        """
        return Permission(
            mergeConfigurationCommitPermission=MergeConfigurationCommitPermission()
        )

Helper class for creating data room permissions.

#   Permissions()
View Source
    def __init__(self):
        """This class is not meant to be instantiated."""

This class is not meant to be instantiated.

#  
@staticmethod
def leaf_crud(leaf_node_name: str) -> data_room_pb2.Permission:
View Source
    @staticmethod
    def leaf_crud(leaf_node_name: str) -> Permission:
        """Permission required for publishing a dataset to a data room."""
        return Permission(
            leafCrudPermission=LeafCrudPermission(leafNodeName=leaf_node_name)
        )

Permission required for publishing a dataset to a data room.

#  
@staticmethod
def retrieve_data_room() -> data_room_pb2.Permission:
View Source
    @staticmethod
    def retrieve_data_room() -> Permission:
        """Permission required for retrieving a data room's definition after it has
        been published."""
        return Permission(retrieveDataRoomPermission=RetrieveDataRoomPermission())

Permission required for retrieving a data room's definition after it has been published.

#  
@staticmethod
def retrieve_audit_log() -> data_room_pb2.Permission:
View Source
    @staticmethod
    def retrieve_audit_log() -> Permission:
        """Permission for retrieving the audit log, a log detailing all past interactions
        with the data room."""
        return Permission(retrieveAuditLogPermission=RetrieveAuditLogPermission())

Permission for retrieving the audit log, a log detailing all past interactions with the data room.

#  
@staticmethod
def execute_compute(compute_node_name: str) -> data_room_pb2.Permission:
View Source
    @staticmethod
    def execute_compute(compute_node_name: str) -> Permission:
        """Permission for executing the computation with the given name."""
        return Permission(
            executeComputePermission=ExecuteComputePermission(
                computeNodeName=compute_node_name
            )
        )

Permission for executing the computation with the given name.

#  
@staticmethod
def retrieve_data_room_status() -> data_room_pb2.Permission:
View Source
    @staticmethod
    def retrieve_data_room_status() -> Permission:
        """Permission for retrieving the status of a data room."""
        return Permission(
            retrieveDataRoomStatusPermission=RetrieveDataRoomStatusPermission()
        )

Permission for retrieving the status of a data room.

#  
@staticmethod
def update_data_room_status() -> data_room_pb2.Permission:
View Source
    @staticmethod
    def update_data_room_status() -> Permission:
        """Permission for updating the status of a data room (e.g. irreversibly stopping it)."""
        return Permission(
            updateDataRoomStatusPermission=UpdateDataRoomStatusPermission()
        )

Permission for updating the status of a data room (e.g. irreversibly stopping it).

#  
@staticmethod
def retrieve_published_datasets() -> data_room_pb2.Permission:
View Source
    @staticmethod
    def retrieve_published_datasets() -> Permission:
        """Permission for retrieving the list of datasets that has been published to the data room."""
        return Permission(
            retrievePublishedDatasetsPermission=RetrievePublishedDatasetsPermission()
        )

Permission for retrieving the list of datasets that has been published to the data room.

#  
@staticmethod
def dry_run() -> data_room_pb2.Permission:
View Source
    @staticmethod
    def dry_run() -> Permission:
        """Permission for triggering a dry run on the data room."""
        return Permission(dryRunPermission=DryRunPermission())

Permission for triggering a dry run on the data room.

#  
@staticmethod
def generate_merge_signature() -> data_room_pb2.Permission:
View Source
    @staticmethod
    def generate_merge_signature() -> Permission:
        """Permission for generating signatures required for merge approvals."""
        return Permission(
            generateMergeSignaturePermission=GenerateMergeSignaturePermission()
        )

Permission for generating signatures required for merge approvals.

#  
@staticmethod
def execute_development_compute() -> data_room_pb2.Permission:
View Source
    @staticmethod
    def execute_development_compute() -> Permission:
        """Permission for executing computations in development mode."""
        return Permission(
            executeDevelopmentComputePermission=ExecuteDevelopmentComputePermission()
        )

Permission for executing computations in development mode.

#  
@staticmethod
def merge_configuration_commit() -> data_room_pb2.Permission:
View Source
    @staticmethod
    def merge_configuration_commit() -> Permission:
        """
        Permission for merging configuration commits into the current data
        room configuration.
        """
        return Permission(
            mergeConfigurationCommitPermission=MergeConfigurationCommitPermission()
        )

Permission for merging configuration commits into the current data room configuration.

#   class GovernanceProtocol:
View Source
class GovernanceProtocol:
    """
    The protocol that defines whether and how a data room can be changed
    after it has been published.
    """

    @staticmethod
    def static():
        """
        The data room cannot be changed after it has been published.
        Participants are still allowed to execute development computations
        as long as the required permissions have been granted.
        """
        return GovernanceProtocolProto(
            staticDataRoomPolicy=StaticDataRoomPolicy()
        )

    @staticmethod
    def affected_data_owners_approve():
        """
        The addition of compute nodes must be approved by all data owners
        on whose data the new compute node will depend.
        """
        return GovernanceProtocolProto(
            affectedDataOwnersApprovePolicy=AffectedDataOwnersApprovePolicy()
        )

The protocol that defines whether and how a data room can be changed after it has been published.

#   GovernanceProtocol()
#  
@staticmethod
def static():
View Source
    @staticmethod
    def static():
        """
        The data room cannot be changed after it has been published.
        Participants are still allowed to execute development computations
        as long as the required permissions have been granted.
        """
        return GovernanceProtocolProto(
            staticDataRoomPolicy=StaticDataRoomPolicy()
        )

The data room cannot be changed after it has been published. Participants are still allowed to execute development computations as long as the required permissions have been granted.

#  
@staticmethod
def affected_data_owners_approve():
View Source
    @staticmethod
    def affected_data_owners_approve():
        """
        The addition of compute nodes must be approved by all data owners
        on whose data the new compute node will depend.
        """
        return GovernanceProtocolProto(
            affectedDataOwnersApprovePolicy=AffectedDataOwnersApprovePolicy()
        )

The addition of compute nodes must be approved by all data owners on whose data the new compute node will depend.

#   enclave_specifications = <
#   class EnclaveSpecifications:
View Source
class EnclaveSpecifications:
    """
    Provider of the available enclave specifications provided by the Decentriq platform.

    Enclave specifications enable you to express which particular enclaves you trust.
    The field containing the measurement (e.g. `mrenclave` in the case of Intel SGX) identifies
    the exact binary that will process your data.
    Users of the Decentriq platform are encouraged to reproduce this value by building the enclave
    binary from audited source code and re-producing the measurement (in the case of Intel SGX,
    this would involve simply hashing the produced executable).

    When connecting to the driver enclave, the configured attestation algorithm will guarantee that the
    enclave you connect to is the one corresponding to the enclave specification you chose.
    The associated root certificate will be used to verify that the attestation was signed
    by the expected party (e.g. Intel/AMD/Amazon, depending on the CC technology used).

    Any communication between the driver enclave and worker enclaves handling your data will also
    first be secured by additional attestation procedures. Which enclaves are trusted by the
    driver enclave is controlled by choosing the additional enclave specs from the respective
    compute packages.

    A list of enclave specifications, each encoding your trust in a particular enclave type, can
    be obtained by selecting a subset of the enclave specifications provided by the object
    `decentriq_platform.enclave_specifications`. Selecting the subset of versions should be done
    by calling its `versions` method.
    """

    def __init__(self, specifications: Dict[str, EnclaveSpecification]):
        self.specifications = specifications

    def list(self) -> List[str]:
        """Get a list of all available enclave identifiers."""
        return sorted(self.specifications.keys())

    def versions(self, enclave_versions: List[str]) -> Dict[str, EnclaveSpecification]:
        """
        Get the enclave specifications for the given versioned enclave types.

        Make sure to always include the specification of a *driver enclave*, e.g.
        `"decentriq.driver:v1"` as this is the node with which you communicate directly.
        Add additional versioned enclaves depending on the compute module you use.
        Refer to the main documentation page of each compute module to learn which
        enclaves are available.
        """
        selected_specifcations = {}
        for version in enclave_versions:
            enclave_type = version.split(":")[0]
            selected_specifcations[enclave_type] = self.specifications[version]
        return selected_specifcations

    def all(self) -> List[EnclaveSpecification]:
        """Get a list of all available enclave specifications."""
        return list(self.specifications.values())

    def merge(self, other):
        """
        Merge two sets of enclave specifications into a single set.
        """
        return EnclaveSpecifications({**self.specifications, **other.specifications})

Provider of the available enclave specifications provided by the Decentriq platform.

Enclave specifications enable you to express which particular enclaves you trust. The field containing the measurement (e.g. mrenclave in the case of Intel SGX) identifies the exact binary that will process your data. Users of the Decentriq platform are encouraged to reproduce this value by building the enclave binary from audited source code and re-producing the measurement (in the case of Intel SGX, this would involve simply hashing the produced executable).

When connecting to the driver enclave, the configured attestation algorithm will guarantee that the enclave you connect to is the one corresponding to the enclave specification you chose. The associated root certificate will be used to verify that the attestation was signed by the expected party (e.g. Intel/AMD/Amazon, depending on the CC technology used).

Any communication between the driver enclave and worker enclaves handling your data will also first be secured by additional attestation procedures. Which enclaves are trusted by the driver enclave is controlled by choosing the additional enclave specs from the respective compute packages.

A list of enclave specifications, each encoding your trust in a particular enclave type, can be obtained by selecting a subset of the enclave specifications provided by the object decentriq_platform.enclave_specifications. Selecting the subset of versions should be done by calling its versions method.

#   EnclaveSpecifications( specifications: Dict[str, decentriq_platform.types.EnclaveSpecification] )
View Source
    def __init__(self, specifications: Dict[str, EnclaveSpecification]):
        self.specifications = specifications
#   def list(self) -> List[str]:
View Source
    def list(self) -> List[str]:
        """Get a list of all available enclave identifiers."""
        return sorted(self.specifications.keys())

Get a list of all available enclave identifiers.

#   def versions( self, enclave_versions: List[str] ) -> Dict[str, decentriq_platform.types.EnclaveSpecification]:
View Source
    def versions(self, enclave_versions: List[str]) -> Dict[str, EnclaveSpecification]:
        """
        Get the enclave specifications for the given versioned enclave types.

        Make sure to always include the specification of a *driver enclave*, e.g.
        `"decentriq.driver:v1"` as this is the node with which you communicate directly.
        Add additional versioned enclaves depending on the compute module you use.
        Refer to the main documentation page of each compute module to learn which
        enclaves are available.
        """
        selected_specifcations = {}
        for version in enclave_versions:
            enclave_type = version.split(":")[0]
            selected_specifcations[enclave_type] = self.specifications[version]
        return selected_specifcations

Get the enclave specifications for the given versioned enclave types.

Make sure to always include the specification of a driver enclave, e.g. "decentriq.driver:v1" as this is the node with which you communicate directly. Add additional versioned enclaves depending on the compute module you use. Refer to the main documentation page of each compute module to learn which enclaves are available.

View Source
    def all(self) -> List[EnclaveSpecification]:
        """Get a list of all available enclave specifications."""
        return list(self.specifications.values())

Get a list of all available enclave specifications.

#   def merge(self, other):
View Source
    def merge(self, other):
        """
        Merge two sets of enclave specifications into a single set.
        """
        return EnclaveSpecifications({**self.specifications, **other.specifications})

Merge two sets of enclave specifications into a single set.

#   class Key:
View Source
class Key():
    """
    This class wraps the key material that is used to encrypt the
    files that are uploaded to the decentriq platform.
    """
    material: bytes

    def __init__(self, material: Optional[bytes] = None):
        """
        Returns a new `Key` instance, can optional specify the raw key material.
        """
        if material == None:
            key_bytes = os.urandom(KEY_LEN)
        else:
            if len(material) != KEY_LEN:
                raise Exception("Invalid key length, must be 32 bytes")
            key_bytes = material
        self.material = key_bytes

This class wraps the key material that is used to encrypt the files that are uploaded to the decentriq platform.

#   Key(material: Optional[bytes] = None)
View Source
    def __init__(self, material: Optional[bytes] = None):
        """
        Returns a new `Key` instance, can optional specify the raw key material.
        """
        if material == None:
            key_bytes = os.urandom(KEY_LEN)
        else:
            if len(material) != KEY_LEN:
                raise Exception("Invalid key length, must be 32 bytes")
            key_bytes = material
        self.material = key_bytes

Returns a new Key instance, can optional specify the raw key material.

#   class StaticContent(decentriq_platform.node.Node):
View Source
class StaticContent(Node):
    """
    Computation node which outputs the content specified in its configuration.
    This is mostly used to allow users to specify dependencies with a static
    content, which are part of the DCR definition.
    """

    def __init__(
            self,
            name: str,
            content: bytes,
            dependencies: List[str] = []
    ) -> None:
        """
        Create a node with the given name and content.

        In case the source of the content is a file on your local machine,
        you can open the file in binary mode before reading it:

        ```
        # Note the "rb" argument
        with open("my_script.py", "rb") as data:
            my_script_content = data.read()

        # my_script_content can now be passed to the StaticContent constructor
        ```
        """
        config = serialize_length_delimited(
            DriverTaskConfig(
                staticContent=StaticContentConfig(
                    content=content
                )
            )
        )
        super().__init__(
            name,
            config=config,
            enclave_type="decentriq.driver",
            dependencies=dependencies,
            output_format=ComputeNodeFormat.RAW
        )

Computation node which outputs the content specified in its configuration. This is mostly used to allow users to specify dependencies with a static content, which are part of the DCR definition.

#   StaticContent(name: str, content: bytes, dependencies: List[str] = [])
View Source
    def __init__(
            self,
            name: str,
            content: bytes,
            dependencies: List[str] = []
    ) -> None:
        """
        Create a node with the given name and content.

        In case the source of the content is a file on your local machine,
        you can open the file in binary mode before reading it:

        ```
        # Note the "rb" argument
        with open("my_script.py", "rb") as data:
            my_script_content = data.read()

        # my_script_content can now be passed to the StaticContent constructor
        ```
        """
        config = serialize_length_delimited(
            DriverTaskConfig(
                staticContent=StaticContentConfig(
                    content=content
                )
            )
        )
        super().__init__(
            name,
            config=config,
            enclave_type="decentriq.driver",
            dependencies=dependencies,
            output_format=ComputeNodeFormat.RAW
        )

Create a node with the given name and content.

In case the source of the content is a file on your local machine, you can open the file in binary mode before reading it:

# Note the "rb" argument
with open("my_script.py", "rb") as data:
    my_script_content = data.read()

# my_script_content can now be passed to the StaticContent constructor
View Source
class Noop(Node):
    """
    Computation node which does not perform any operation and produces an empty
    output. This is mostly used to allow users to test the execution of other
    computation nodes without giving access to the results.
    """

    def __init__(self, name: str, dependencies: List[str] = []) -> None:
        config = serialize_length_delimited(
            DriverTaskConfig(noop=NoopConfig())
        )
        super().__init__(
            name,
            config,
            "decentriq.driver",
            dependencies,
            ComputeNodeFormat.RAW
        )

Computation node which does not perform any operation and produces an empty output. This is mostly used to allow users to test the execution of other computation nodes without giving access to the results.

#   Noop(name: str, dependencies: List[str] = [])
View Source
    def __init__(self, name: str, dependencies: List[str] = []) -> None:
        config = serialize_length_delimited(
            DriverTaskConfig(noop=NoopConfig())
        )
        super().__init__(
            name,
            config,
            "decentriq.driver",
            dependencies,
            ComputeNodeFormat.RAW
        )
View Source
"""
.. include:: ../../../decentriq_platform_docs/sql_getting_started.md
---
"""
__docformat__ = "restructuredtext"

from .compute import SqlCompute, SqlSchemaVerifier
from .proto.compute_sql_pb2 import (
    PrimitiveType,
)
from .helpers import (
    TabularDataNodeBuilder,
    read_sql_query_result_as_string,
    read_input_csv_file,
    read_input_csv_string,
    upload_and_publish_tabular_dataset,
)


__all__ = [
    "SqlCompute",
    "TabularDataNodeBuilder",
    "PrimitiveType",
    "read_input_csv_file",
    "read_input_csv_string",
    "upload_and_publish_tabular_dataset",
    "read_sql_query_result_as_string",
    "SqlSchemaVerifier",
]

View Source
"""
.. include:: ../../../decentriq_platform_docs/container_getting_started.md
---
"""
__docformat__ = "restructuredtext"

from .compute import StaticContainerCompute
from .helpers import read_result_as_zipfile


__all__ = [
    "StaticContainerCompute",
    "read_result_as_zipfile",
]

View Source
import chily
import os
from hashlib import sha256
from .proto import serialize_length_delimited
from .proto import ChunkHeader, EncryptionHeader, VersionHeader
from typing import BinaryIO, Iterator, Tuple, Optional
from io import TextIOWrapper

__all__ = ["Key"]

KEY_LEN = 32

class Key():
    """
    This class wraps the key material that is used to encrypt the
    files that are uploaded to the decentriq platform.
    """
    material: bytes

    def __init__(self, material: Optional[bytes] = None):
        """
        Returns a new `Key` instance, can optional specify the raw key material.
        """
        if material == None:
            key_bytes = os.urandom(KEY_LEN)
        else:
            if len(material) != KEY_LEN:
                raise Exception("Invalid key length, must be 32 bytes")
            key_bytes = material
        self.material = key_bytes

def create_chunk_header(extra_entropy: bytes) -> bytes:
    chunk_header = ChunkHeader()
    chunk_header.extraEntropy = extra_entropy

    chunk_header_bytes = serialize_length_delimited(chunk_header)
    return chunk_header_bytes


def create_version_header() -> bytes:
    version_header = VersionHeader()
    version_header.version = 0
    return serialize_length_delimited(version_header)

# Returns (integrity hash, encrypted blob)
def create_encrypted_chunk(
        key: bytes,
        extra_entropy: bytes,
        data: bytes
) -> Tuple[bytes, bytes]:
    chunk_bytes = []

    version_header = create_version_header()
    chunk_bytes.append(version_header)

    chunk_header = create_chunk_header(extra_entropy)
    chunk_bytes.append(chunk_header)

    chunk_bytes.append(data)

    chunk = b''.join(chunk_bytes)
    chunk_hasher = sha256()
    chunk_hasher.update(chunk)
    chunk_hash = chunk_hasher.digest()

    cipher = StorageCipher(key)
    encrypted_chunk = cipher.encrypt(chunk)

    return chunk_hash, encrypted_chunk

class Chunker(Iterator):
    def __init__(self, input_stream: BinaryIO, chunk_size: int):
        self.chunk_size = chunk_size
        self.input_stream = input_stream

    def __iter__(self) -> Iterator[Tuple[bytes, bytes]]:
        self.input_stream.seek(0)
        return self

    # returns (hash, chunk)
    def __next__(self) -> Tuple[bytes, bytes]:
        version_header_bytes = create_version_header()
        chunk_header_bytes = create_chunk_header(os.urandom(16))

        # Does not account for header size
        current_chunk_size = 0
        chunk_bytes = [version_header_bytes, chunk_header_bytes]

        input_chunk_bytes = self.input_stream.read(self.chunk_size)
        if len(input_chunk_bytes) == 0:
            raise StopIteration
        chunk_bytes.append(input_chunk_bytes)

        chunk = b''.join(chunk_bytes)
        chunk_hasher = sha256()
        chunk_hasher.update(chunk)
        chunk_hash = chunk_hasher.digest()
        return chunk_hash, chunk


class StorageCipher():
    def __init__(self, symmetric_key: bytes):
        self.enc_key = symmetric_key
        self.cipher: chily.Cipher = chily.Cipher.from_symmetric(self.enc_key)

    def encrypt(self, data: bytes) -> bytes:
        nonce = chily.Nonce.from_random()
        encrypted_data = self.cipher.encrypt("storage cipher", data, nonce)

        encryption_header = EncryptionHeader()
        encryption_header.chilyKey.encryptionNonce = bytes(nonce.bytes)

        serialized_encryption_header = serialize_length_delimited(encryption_header)
        encrypted_data_with_header = bytes(list(serialized_encryption_header) + encrypted_data)
        return encrypted_data_with_header
View Source
from typing import Dict, List, Tuple
from .types import EnclaveSpecification
from .compute import GcgDriverDecoder
from .sql.compute import SqlWorkerDecoder
from .container.compute import ContainerWorkerDecoder
from .proto import (
    AttestationSpecification,
    AttestationSpecificationIntelEpid,
    AttestationSpecificationIntelDcap,
    AttestationSpecificationAwsNitro,
)
import asn1crypto.pem
from .certs import (
    aws_nitro_root_ca_pem,
    intel_sgx_dcap_root_ca,
    intel_sgx_ias_root_ca
)


intel_sgx_dcap_root_ca_der = asn1crypto.pem.unarmor(intel_sgx_dcap_root_ca)[2]
intel_sgx_ias_root_ca_der = asn1crypto.pem.unarmor(intel_sgx_ias_root_ca)[2]
aws_nitro_root_ca_der = asn1crypto.pem.unarmor(aws_nitro_root_ca_pem)[2]


SPECIFICATIONS = {
    "decentriq.driver:v4": EnclaveSpecification(
        name="decentriq.driver",
        version="3",
        proto=AttestationSpecification(
            intelDcap=AttestationSpecificationIntelDcap(
                mrenclave=bytes.fromhex(
                    "f3746dc7b06d7f1aa6fc1fc9dbf61920663466bc75476a9b1460f6a112be71cf"
                ),
                dcapRootCaDer=intel_sgx_dcap_root_ca_der,
                accept_debug=False,
                accept_out_of_date=False,
                accept_configuration_needed=False,
                accept_sw_hardening_needed=False,
                accept_revoked=False,
            )
        ),
        workerProtocols=[0],
        decoder=GcgDriverDecoder(),
        clientProtocols=[1],
    ),
    "decentriq.driver:v3": EnclaveSpecification(
        name="decentriq.driver",
        version="3",
        proto=AttestationSpecification(
            intelDcap=AttestationSpecificationIntelDcap(
                mrenclave=bytes.fromhex(
                    "564d4744604e252e046be2b2ba4d86fb7eb5a2ec85046735bd5abe62654b9d61"
                ),
                dcapRootCaDer=intel_sgx_dcap_root_ca_der,
                accept_debug=False,
                accept_out_of_date=False,
                accept_configuration_needed=False,
                accept_sw_hardening_needed=False,
                accept_revoked=False,
            )
        ),
        workerProtocols=[0],
        decoder=GcgDriverDecoder(),
        clientProtocols=[1],
    ),
    "decentriq.driver:v2": EnclaveSpecification(
        name="decentriq.driver",
        version="2",
        proto=AttestationSpecification(
            intelDcap=AttestationSpecificationIntelDcap(
                mrenclave=bytes.fromhex(
                    "a88ec2195974edf693e45b2ccdf39cf53c9382bb5309ba3863b5d0f2591542f1"
                ),
                dcapRootCaDer=intel_sgx_dcap_root_ca_der,
                accept_debug=False,
                accept_out_of_date=False,
                accept_configuration_needed=False,
                accept_sw_hardening_needed=False,
                accept_revoked=False,
            )
        ),
        workerProtocols=[0],
        decoder=GcgDriverDecoder(),
        clientProtocols=[0],
    ),
    "decentriq.sql-worker:v4": EnclaveSpecification(
        name="decentriq.sql-worker",
        version="4",
        proto=AttestationSpecification(
            intelDcap=AttestationSpecificationIntelDcap(
                mrenclave=bytes.fromhex(
                    "5f07de831d93b1ff5446ef0e17d8dcf0418ce8416cd0d5039c4266503fd4c7c9"
                ),
                dcapRootCaDer=intel_sgx_dcap_root_ca_der,
                accept_debug=False,
                accept_out_of_date=False,
                accept_configuration_needed=False,
                accept_sw_hardening_needed=False,
                accept_revoked=False,
            )
        ),
        workerProtocols=[0],
        decoder=SqlWorkerDecoder()
    ),
    "decentriq.sql-worker:v3": EnclaveSpecification(
        name="decentriq.sql-worker",
        version="3",
        proto=AttestationSpecification(
            intelDcap=AttestationSpecificationIntelDcap(
                mrenclave=bytes.fromhex(
                    "5b1838ec6d3509fe1c1ac1b13d394bc9df76057ddd18c1b8fff882b379d110e2"
                ),
                dcapRootCaDer=intel_sgx_dcap_root_ca_der,
                accept_debug=False,
                accept_out_of_date=False,
                accept_configuration_needed=False,
                accept_sw_hardening_needed=False,
                accept_revoked=False,
            )
        ),
        workerProtocols=[0],
        decoder=SqlWorkerDecoder()
    ),
    "decentriq.sql-worker:v2": EnclaveSpecification(
        name="decentriq.sql-worker",
        version="2",
        proto=AttestationSpecification(
            intelDcap=AttestationSpecificationIntelDcap(
                mrenclave=bytes.fromhex(
                    "c04e429edd5fc767e00ec4839b1f8c57375fda178905866ac6d4debee4fbe7d1"
                ),
                dcapRootCaDer=intel_sgx_dcap_root_ca_der,
                accept_debug=False,
                accept_out_of_date=False,
                accept_configuration_needed=False,
                accept_sw_hardening_needed=False,
                accept_revoked=False,
            )
        ),
        workerProtocols=[0],
        decoder=SqlWorkerDecoder()
    ),
    "decentriq.python-ml-worker:v2": EnclaveSpecification(
        name="decentriq.python-ml-worker",
        version="2",
        proto=AttestationSpecification(
            awsNitro=AttestationSpecificationAwsNitro(
                nitroRootCaDer=aws_nitro_root_ca_der,
                pcr0=bytes.fromhex("ea004dce7a444fdf4603b903f9bc730494fc6c876ffd86fdf8f7c3910803b38ca60d0e38af0bbd44f159918d90fa46c4"),
                pcr1=bytes.fromhex("ea004dce7a444fdf4603b903f9bc730494fc6c876ffd86fdf8f7c3910803b38ca60d0e38af0bbd44f159918d90fa46c4"),
                pcr2=bytes.fromhex("21b9efbc184807662e966d34f390821309eeac6802309798826296bf3e8bec7c10edb30948c90ba67310f7b964fc500a"),
                pcr8=bytes.fromhex("000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000")
            )
        ),
        workerProtocols=[0],
        decoder=ContainerWorkerDecoder()
    ),
    "decentriq.python-ml-worker:v1": EnclaveSpecification(
        name="decentriq.python-ml-worker",
        version="1",
        proto=AttestationSpecification(
            awsNitro=AttestationSpecificationAwsNitro(
                nitroRootCaDer=aws_nitro_root_ca_der,
                pcr0=bytes.fromhex("9103b7019cd97a2837b1631648bc683b3470b402b3c0638d8ff16178c7fd031407c06aeff8f96517a320fb573d7d281f"),
                pcr1=bytes.fromhex("9103b7019cd97a2837b1631648bc683b3470b402b3c0638d8ff16178c7fd031407c06aeff8f96517a320fb573d7d281f"),
                pcr2=bytes.fromhex("21b9efbc184807662e966d34f390821309eeac6802309798826296bf3e8bec7c10edb30948c90ba67310f7b964fc500a"),
                pcr8=bytes.fromhex("000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000")
            )
        ),
        workerProtocols=[0],
        decoder=ContainerWorkerDecoder()
    ),
    "decentriq.python-synth-data-worker:v2": EnclaveSpecification(
        name="decentriq.python-synth-data-worker",
        version="2",
        proto=AttestationSpecification(
            awsNitro=AttestationSpecificationAwsNitro(
                nitroRootCaDer=aws_nitro_root_ca_der,
                pcr0=bytes.fromhex("3759aad200a137252997a459a38c1953c8868844c7271487ae536a441133c407fb2dd3e1d4b167d997a8aa48b54341c1"),
                pcr1=bytes.fromhex("3759aad200a137252997a459a38c1953c8868844c7271487ae536a441133c407fb2dd3e1d4b167d997a8aa48b54341c1"),
                pcr2=bytes.fromhex("21b9efbc184807662e966d34f390821309eeac6802309798826296bf3e8bec7c10edb30948c90ba67310f7b964fc500a"),
                pcr8=bytes.fromhex("000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000")
            )
        ),
        workerProtocols=[0],
        decoder=ContainerWorkerDecoder()
    ),
    "decentriq.python-synth-data-worker:v1": EnclaveSpecification(
        name="decentriq.python-synth-data-worker",
        version="1",
        proto=AttestationSpecification(
            awsNitro=AttestationSpecificationAwsNitro(
                nitroRootCaDer=aws_nitro_root_ca_der,
                pcr0=bytes.fromhex("b1eeccf3396b7d2163b7ff90017dfea71911b282e7ac683574237b0f6011d1617cb42322342e43d8721ccb9aefb918f4"),
                pcr1=bytes.fromhex("b1eeccf3396b7d2163b7ff90017dfea71911b282e7ac683574237b0f6011d1617cb42322342e43d8721ccb9aefb918f4"),
                pcr2=bytes.fromhex("21b9efbc184807662e966d34f390821309eeac6802309798826296bf3e8bec7c10edb30948c90ba67310f7b964fc500a"),
                pcr8=bytes.fromhex("000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000")
            )
        ),
        workerProtocols=[0],
        decoder=ContainerWorkerDecoder()
    ),
    "decentriq.r-latex-worker:v2": EnclaveSpecification(
        name="decentriq.r-latex-worker",
        version="2",
        proto=AttestationSpecification(
            awsNitro=AttestationSpecificationAwsNitro(
                nitroRootCaDer=aws_nitro_root_ca_der,
                pcr0=bytes.fromhex("43223aecfeae895fcb966b41618611f16d675991c472ad58d358a846bd9fa472d2eeabd130ad1aee97bf74067ccf5ac8"),
                pcr1=bytes.fromhex("43223aecfeae895fcb966b41618611f16d675991c472ad58d358a846bd9fa472d2eeabd130ad1aee97bf74067ccf5ac8"),
                pcr2=bytes.fromhex("21b9efbc184807662e966d34f390821309eeac6802309798826296bf3e8bec7c10edb30948c90ba67310f7b964fc500a"),
                pcr8=bytes.fromhex("000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000")
            )
        ),
        workerProtocols=[0],
        decoder=ContainerWorkerDecoder()
    ),
    "decentriq.r-latex-worker:v1": EnclaveSpecification(
        name="decentriq.r-latex-worker",
        version="1",
        proto=AttestationSpecification(
            awsNitro=AttestationSpecificationAwsNitro(
                nitroRootCaDer=aws_nitro_root_ca_der,
                pcr0=bytes.fromhex("7e78d613db8801fe051abc1325eb1dfd12a42b3c8cd9ba6925c35d2bc91549c79b62577c1fd799bedc8ed8a3f85422e6"),
                pcr1=bytes.fromhex("7e78d613db8801fe051abc1325eb1dfd12a42b3c8cd9ba6925c35d2bc91549c79b62577c1fd799bedc8ed8a3f85422e6"),
                pcr2=bytes.fromhex("21b9efbc184807662e966d34f390821309eeac6802309798826296bf3e8bec7c10edb30948c90ba67310f7b964fc500a"),
                pcr8=bytes.fromhex("000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000")
            )
        ),
        workerProtocols=[0],
        decoder=ContainerWorkerDecoder()
    )
}


class EnclaveSpecifications:
    """
    Provider of the available enclave specifications provided by the Decentriq platform.

    Enclave specifications enable you to express which particular enclaves you trust.
    The field containing the measurement (e.g. `mrenclave` in the case of Intel SGX) identifies
    the exact binary that will process your data.
    Users of the Decentriq platform are encouraged to reproduce this value by building the enclave
    binary from audited source code and re-producing the measurement (in the case of Intel SGX,
    this would involve simply hashing the produced executable).

    When connecting to the driver enclave, the configured attestation algorithm will guarantee that the
    enclave you connect to is the one corresponding to the enclave specification you chose.
    The associated root certificate will be used to verify that the attestation was signed
    by the expected party (e.g. Intel/AMD/Amazon, depending on the CC technology used).

    Any communication between the driver enclave and worker enclaves handling your data will also
    first be secured by additional attestation procedures. Which enclaves are trusted by the
    driver enclave is controlled by choosing the additional enclave specs from the respective
    compute packages.

    A list of enclave specifications, each encoding your trust in a particular enclave type, can
    be obtained by selecting a subset of the enclave specifications provided by the object
    `decentriq_platform.enclave_specifications`. Selecting the subset of versions should be done
    by calling its `versions` method.
    """

    def __init__(self, specifications: Dict[str, EnclaveSpecification]):
        self.specifications = specifications

    def list(self) -> List[str]:
        """Get a list of all available enclave identifiers."""
        return sorted(self.specifications.keys())

    def versions(self, enclave_versions: List[str]) -> Dict[str, EnclaveSpecification]:
        """
        Get the enclave specifications for the given versioned enclave types.

        Make sure to always include the specification of a *driver enclave*, e.g.
        `"decentriq.driver:v1"` as this is the node with which you communicate directly.
        Add additional versioned enclaves depending on the compute module you use.
        Refer to the main documentation page of each compute module to learn which
        enclaves are available.
        """
        selected_specifcations = {}
        for version in enclave_versions:
            enclave_type = version.split(":")[0]
            selected_specifcations[enclave_type] = self.specifications[version]
        return selected_specifcations

    def all(self) -> List[EnclaveSpecification]:
        """Get a list of all available enclave specifications."""
        return list(self.specifications.values())

    def merge(self, other):
        """
        Merge two sets of enclave specifications into a single set.
        """
        return EnclaveSpecifications({**self.specifications, **other.specifications})


enclave_specifications: EnclaveSpecifications = EnclaveSpecifications(SPECIFICATIONS)
"""The main catalogue of enclave specifications available within the Decentriq platform."""
View Source
from google.protobuf.json_format import MessageToDict
from .proto import AttestationSpecification, ComputeNodeProtocol, DriverTaskConfig
from typing import List, Dict, Optional, Any
from typing_extensions import TypedDict
from enum import Enum
from .proto.length_delimited import parse_length_delimited
from .proto.compute_sql_pb2 import SqlWorkerConfiguration
from .container.proto.compute_container_pb2 import ContainerWorkerConfiguration


__all__ = [
    "EnclaveSpecification",
    "JobId",
    "DataRoomDescription",
    "DatasetDescription",
]


class JobId:
    """
    Class for identifying running or already run jobs.

    Objects of this class can be used to retrieve results for processed computations.
    """
    def __init__(self, job_id: str, compute_node_id: str):
        self.id = job_id
        """The identifier of the job that processed a particular computation."""

        self.compute_node_id = compute_node_id
        """The id of the computation that was processed."""


class ScopeTypes(str, Enum):
    USER_FILE = "user_file",
    DATA_ROOM_INTERMEDIATE_DATA = "dataroom_intermediate_data"
    DATA_ROOM_COMMITS_DATA = "dataroom_commits_data"


class UserResponse(TypedDict):
    id: str
    email: str


class UserCsrRequest(TypedDict):
    csrPem: str


class UserCsrResponse(TypedDict):
    certChainPem: str


class SystemCaResponse(TypedDict):
    rootCertificate: str


class CreateSessionRequest(TypedDict):
    attestationSpecificationHash: str


class SessionJsonResponse(TypedDict):
    sessionId: str
    attestationSpecificationHash: str


class FinalizeUpload(TypedDict):
    uploadId: str
    manifest: str
    name: str
    manifestHash: str
    chunks: List[str]
    scopeId: str


class ChunkWrapper(TypedDict):
    hash: str
    data: str


class UploadDescription(TypedDict):
    uploadId: str


class ChunkDescription(TypedDict):
    chunkHash: str


class DataRoomDescription(TypedDict):
    dataRoomId: str
    name: str
    description: str
    mrenclave: str
    ownerEmail: str
    creationDate: str
    status: str


class DatasetDescription(TypedDict):
    """
    This class includes information about an uploaded dataset
    """
    datasetId: str
    """
    The data set id as a hex-encoded string. This id is also called the manifest hash.
    """
    name: str
    """The name of this dataset"""
    description: str
    """An optional description"""
    ownerEmail: str
    """The original uploader of the dataset"""
    creationDate: str


class SignatureResponse(TypedDict):
    type: str
    data: List[int]


class EnclaveMessage(TypedDict):
    data: str


class FatquoteResBody(TypedDict):
    fatquoteBase64: str


class DatasetManifestMetadata(TypedDict):
    name: str
    manifestHash: str
    chunks: List[str]


class EnclaveSpecification(TypedDict):
    """
    This class includes information about an enclave deployed in the platform.
    Please refer to `decentriq_platform.EnclaveSpecifications` for a detailed explanation.
    """
    name: str
    """The name of the enclave."""
    version: str
    """The version of the enclave."""
    proto: AttestationSpecification
    """The Protobuf object."""
    workerProtocols: List[int]
    """The worker protocol versions supported by the node"""
    decoder: Optional[Any]
    """
    Decoder object that can be used to decode the binary configs belonging
    to enclaves of this type.
    """
    clientProtocols: Optional[List[int]]
    """The client protocol versions supported by the node"""


class CreateScopeRequest(TypedDict):
    metadata: Dict[str, str]


class ScopeJson(TypedDict):
    scopeId: str
    metadata: Dict[str, str]


class EnclaveSpecificationJson(TypedDict):
    name: str
    version: str
    spec: str


class EnclaveSpecificationResponse(TypedDict):
    attestationSpecs: List[EnclaveSpecificationJson]


class Tcb(TypedDict):
    sgxtcbcomp01svn: int
    sgxtcbcomp02svn: int
    sgxtcbcomp03svn: int
    sgxtcbcomp04svn: int
    sgxtcbcomp05svn: int
    sgxtcbcomp06svn: int
    sgxtcbcomp07svn: int
    sgxtcbcomp08svn: int
    sgxtcbcomp09svn: int
    sgxtcbcomp10svn: int
    sgxtcbcomp11svn: int
    sgxtcbcomp12svn: int
    sgxtcbcomp13svn: int
    sgxtcbcomp14svn: int
    sgxtcbcomp15svn: int
    sgxtcbcomp16svn: int
    pcesvn: int


class TcbLevel(TypedDict):
    tcb: Tcb
    tcbStatus: str


class TcbInfo(TypedDict):
    version: int
    issueDate: str
    nextUpdate: str
    fmspc: str
    pceId: str
    tcbType: int
    tcbEvaluationDataNumber: int
    tcbLevels: List[TcbLevel]


class TcbInfoContainer(TypedDict):
    tcbInfo: TcbInfo
    signature: str


class IasResponse(TypedDict):
    isvEnclaveQuoteBody: str
    isvEnclaveQuoteStatus: str
View Source
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding, rsa

__all__ = ["Auth"]

PKey = rsa.RSAPrivateKey;

class Auth:
    """
    This class wraps the certificate used to identify a user and implements the
    signing of the messages that are sent to the enclave
    """
    def __init__(
            self,
            certificate_chain: bytes,
            keypair: PKey,
            user_id: str,
    ):
        """
        Create an authentication object with the supplied certificate chain and
        keypair. To use the identity provider of the decentriq platform use
        `decentriq_platform.Client.create_auth`
        """
        self.certificate_chain = certificate_chain
        self.kp = keypair
        self.user_id = user_id

    def _get_user_id(self) -> str:
        return self.user_id

    def _sign(self, data: bytes) -> bytes:
        return self.kp.sign(data, padding.PKCS1v15(), hashes.SHA512())

    def get_certificate_chain_pem(self) -> bytes:
        """
        Returns the chain of certificates in PEM format
        """
        return self.certificate_chain

class Sigma:
    def __init__(self, signature: bytes, mac_tag: bytes, auth_pki: Auth):
        self.signature: bytes = signature
        self.mac_tag: bytes = mac_tag
        self.auth_pki: Auth = auth_pki

    def get_mac_tag(self) -> bytes:
        return self.mac_tag

    def get_signature(self) -> bytes:
        return self.signature

    def get_cert_chain(self) -> bytes:
        return self.auth_pki.get_certificate_chain_pem()


def generate_key(bit_size: int = 4096) -> PKey:
    key = rsa.generate_private_key(
        public_exponent=65537,
        key_size=bit_size,
    )
    return key


def generate_csr(user_email: str, key: PKey) -> bytes:
    csr_builder = x509.CertificateSigningRequestBuilder()
    csr = csr_builder.subject_name(x509.Name([
        x509.NameAttribute(NameOID.COMMON_NAME, user_email)
    ])).add_extension(
        x509.BasicConstraints(ca=False, path_length=None), critical=True
    ).add_extension(
        x509.KeyUsage(
            digital_signature=True,
            content_commitment=True,
            key_encipherment=True,
            data_encipherment=False,
            key_agreement=False,
            key_cert_sign=False,
            crl_sign=False,
            encipher_only=False,
            decipher_only=False
        ),
        critical=True
    ).sign(key, hashes.SHA512())
    return csr.public_bytes(serialization.Encoding.PEM)
View Source
from typing import List, Optional, Tuple
import datetime
import base64
import json
import uuid
from .authentication import generate_csr, generate_key, Auth
from .types import (
    DataRoomDescription,
    DatasetDescription,
    UserCsrResponse,
    UserCsrRequest,
)
from .api import API, Endpoints
from .proto import (
    AuthenticationMethod, TrustedPki, DataRoom, DataRoomStatus, ConfigurationModification
)
from .config import (
    DECENTRIQ_CLIENT_ID,
    DECENTRIQ_API_PLATFORM_HOST, DECENTRIQ_API_PLATFORM_PORT, DECENTRIQ_API_PLATFORM_USE_TLS
)

__all__ = [
    "ClientPlatformFeatures", "SessionPlatformFeatures"
]


def platform_hash_from_str(s: str) -> bytes:
    """
    The reverse operation of `platform_hash_to_str`.
    """
    list_u8 = [int(x) for x in base64.b64decode(s).decode('ascii').split(',')]
    return bytes(list_u8)


def platform_hash_to_str(bs: bytes) -> str:
    """
    Transform a bytestring received from the
    The platform stores hashes (both files and data rooms) in the following form:
    Given a series of bytes with values 1 2 3, it will store the ascii codes of the
    string '1,2,3' (including the commas), so we get the byte array 49 44 50 44 51.
    This is then base64 encoded and sent to the backend where it is base64 decoded.
    """
    return base64.b64encode(','.join([str(b) for b in bs]).encode('ascii')).decode('ascii')


_GET_DATASET_LINKS_QUERY = """
query getDatasetLinks($filter: DatasetLinkFilter!) {
    datasetLinks(filter: $filter) {
        nodes {
            datasetLinkUuid: datasetLinkId
            computeNode {
                computeNodeUuid: computeNodeId
                nodeName
                dataRoom {
                    dataRoomUuid: dataRoomId
                    dataRoomId: dataRoomHashEncoded
                }
            }
            dataset {
                name
                datasetId: datasetHashEncoded
            }
        }
    }
}
"""


class PlatformApi:
    """
    Class to interact with the GraphQL endpoint of the Decentriq platform.

    This class enables the integration of the Python SDK with the browser-based
    Decentriq platform UI by providing the necessary CRUD functions.
    """

    user_email: str
    _http_api: API

    def __init__(self, user_email: str, http_api: API):
        self.user_email = user_email
        self._http_api = http_api

    def get_data_room_descriptions(self) -> List[DataRoomDescription]:
        data = self._post_graphql(
            """
            query getDataRooms($filter: DataRoomFilter) {
                dataRooms(filter: $filter, orderBy: NAME_ASC) {
                    nodes {
                        dataRoomId: dataRoomHashEncoded
                        name
                        description
                        mrenclave
                        ownerEmail
                        creationDate: createdAt
                        state {
                            status
                        }
                    }
                }
            }
            """
        )
        return self._parse_get_data_rooms_response(data)

    def update_data_room_status(
            self,
            data_room_hash: str,
            status # type: DataRoomStatus.V
    ):
        data_room = self.get_data_room_by_hash(data_room_hash)
        if not data_room:
            raise Exception(f"Unable to find data room with hash '{data_room_hash}'")
        data_room_uuid = data_room["dataRoomUuid"]
        current_datetime_str = datetime.datetime.now().isoformat()
        self._post_graphql(
            """
            mutation upsertState($input: UpsertStateInput!) {
                upsertState(input: $input) {
                    state {
                        id
                        status
                        statusUpdatedAt
                        updatedAt
                        updatedByEmail
                    }
                }
            }
            """,
            {
                "input": {
                    "clientMutationId": str(uuid.uuid4()),
                    "state": {
                        "dataRoomId": data_room_uuid,
                        "status": DataRoomStatus.Name(status).upper(),
                        "statusUpdatedAt": current_datetime_str,
                    }
                }
            }
        )

    def get_dataset_link(
            self,
            data_room_id: str,
            leaf_id: str,
    ) -> Optional[dict]:
        data_room = self.get_data_room_by_hash(data_room_id)
        if not data_room:
            raise Exception(f"Unable to get data room for hash '{data_room_id}'")
        data_room_uuid = data_room["dataRoomUuid"]
        compute_node = self._get_compute_node(data_room_uuid, leaf_id)
        if compute_node:
            compute_node_uuid = compute_node["computeNodeUuid"]
            return self._get_dataset_link(compute_node_uuid)
        else:
            raise Exception(
                f"Unable to find leaf with name '{leaf_id}' for data room '{data_room_id}'"
            )

    def get_dataset_links_for_manifest_hash(
            self,
            manifest_hash: str,
    ) -> Optional[List[str]]:
        links = self._get_dataset_links_for_manifest_hash(manifest_hash)
        return [link["datasetLinkUuid"] for link in links]

    def _get_dataset_links_for_manifest_hash(
            self,
            manifest_hash: str,
    ) -> List[dict]:
        data = self._post_graphql(
            _GET_DATASET_LINKS_QUERY,
            {
                "filter": {
                    "datasetHashEncoded": {"equalTo": manifest_hash},
                }
            }
        )
        return data["datasetLinks"]["nodes"]

    def _get_dataset_link(self, compute_node_uuid: str) -> Optional[dict]:
        data = self._post_graphql(
            _GET_DATASET_LINKS_QUERY,
            {
                "filter": {
                    "computeNode": {
                        "computeNodeId": {"equalTo": compute_node_uuid}
                    }
                }
            }
        )

        nodes = data["datasetLinks"]["nodes"]
        if nodes:
            return nodes[0]
        else:
            return None

    def get_data_rooms_with_published_dataset(self, manifest_hash) -> List[str]:
        links = self._get_dataset_links_for_manifest_hash(manifest_hash)
        data_room_ids = []
        for link in links:
            if "computeNode" in link and "dataRoom" in link["computeNode"]:
                data_room_id = link["computeNode"]["dataRoom"].get("dataRoomId")
                if data_room_id:
                    data_room_ids.append(data_room_id)
        return data_room_ids

    def delete_dataset_link(
            self,
            data_room_id: str,
            leaf_id: str,
    ) -> Optional[dict]:
        dataset_link = self.get_dataset_link(data_room_id, leaf_id)
        if not dataset_link:
            raise Exception(
                f"Unable to find a dataset link for data room '{data_room_id}'" +
                f" and data node '{leaf_id}'"
            )
        else:
            self._post_graphql(
                """
                mutation deleteDatasetLink($input: DeleteDatasetLinkInput!) {
                    deleteDatasetLink(input: $input) {
                        clientMutationId
                    }
                }
                """,
                {
                    "input": {
                        "clientMutationId": str(uuid.uuid4()),
                        "datasetLinkId": dataset_link["datasetLinkUuid"]
                    }
                }
            )

    def delete_dataset_links_for_manifest_hash(
            self,
            manifest_hash: str,
    ) -> Optional[dict]:
        uuids = self.get_dataset_links_for_manifest_hash(manifest_hash)
        if uuids:
            for link_id in uuids:
                self._post_graphql(
                    """
                    mutation deleteDatasetLink($input: DeleteDatasetLinkInput!) {
                        deleteDatasetLink(input: $input) {
                            clientMutationId
                        }
                    }
                    """,
                    {
                        "input": {
                            "clientMutationId": str(uuid.uuid4()),
                            "datasetLinkId": link_id
                        }
                    }
                )

    def _delete_data_room(self, data_room_uuid: str) -> str:
        data = self._post_graphql(
            """
            mutation deleteDataRoom($input: DeleteDataRoomByIdInput!) {
                deleteDataRoomById(input: $input) {
                    clientMutationId
                    dataRoom {
                        id
                    }

                }
            }
            """,
            {
                "input": {
                    "clientMutationId": str(uuid.uuid4()),
                    "id": data_room_uuid,
                }
            }
        )
        deleted_id = data.get("deleteDataRoomById", {}).get("dataRoom", {}).get("id")
        if deleted_id is None:
            raise Exception(
                "Received malformed response when trying to delete DCR in backend"
            )
        else:
            return data_room_uuid

    def publish_data_room(
            self,
            data_room_definition: Tuple[DataRoom, List[ConfigurationModification]],
            data_room_hash: str,
            attestation_specification_hash: str,
            additional_fields: dict = {},
    ):
        data_room, conf_modifications = data_room_definition
        owner_email = data_room.ownerEmail

        participant_emails = [
            op.add.element.userPermission.email
            for op in conf_modifications
            if op.HasField("add") and op.add.element.HasField("userPermission")
        ]

        if owner_email in participant_emails:
            participant_emails.remove(owner_email)

        user_permissions_input = []
        for participant_email in participant_emails:
            user_permissions_input.append({
                "email": participant_email,
            })

        data = self._post_graphql(
            """
            mutation createDataRoom($input: CreateDataRoomInput!) {
                createDataRoom(input: $input) {
                    dataRoom {
                        dataRoomUuid: dataRoomId
                        dataRoomHashEncoded
                        lock {
                            isLocked
                        }
                    }
                }
            }
            """,
            {
                "input": {
                    "clientMutationId": str(uuid.uuid4()),
                    "dataRoom": {
                        "dataRoomHash": platform_hash_to_str(bytes.fromhex(data_room_hash)),
                        "dataRoomHashEncoded": data_room_hash,
                        "name": data_room.name,
                        "description": data_room.description,
                        "mrenclave": attestation_specification_hash,
                        "source": "PYTHON",
                        "ownerEmail": data_room.ownerEmail,
                        "userPermissions": {
                            "create": user_permissions_input
                        },
                        **additional_fields,
                        "lock": {
                            "create": {
                                "isLocked": True
                            }
                        }
                    }
                }
            }
        )

        data_room_uuid = data.get("createDataRoom", {}).get("dataRoom", {}).get("dataRoomUuid")
        if data_room_uuid is None:
            raise Exception(
                "Received malformed response when trying to create DCR in backend"
            )

    def _parse_get_data_rooms_response(self, data) -> List[DataRoomDescription]:
        remove_keys = set(["state"])
        def payload_to_dcr_description(d):
            # A DCR without a state should be displayed as a DCR with "Active" status
            # Uppercase status such as "STOPPED" should be displayed as "Stopped" to match
            # the proto version.
            if d.get("state") and d["state"].get("status"):
                status = d["state"]["status"].capitalize()
            else:
                status = "Active"
            d_cleaned = {k: v for k, v in d.items() if k not in remove_keys}
            return DataRoomDescription(status=status, **d_cleaned)

        dcr_dicts = data.get("dataRooms", {}).get("nodes", [])
        dcr_descriptions = [payload_to_dcr_description(d) for d in dcr_dicts]

        # Remove non-published DCRs from list
        return [desc for desc in dcr_descriptions if desc["dataRoomId"]]

    def get_data_room_by_hash(self, data_room_hash: str) -> Optional[dict]:
        data = self._post_graphql(
            """
            query getDataRoomByHash($dataRoomHashEncoded: String!) {
                dataRooms(condition: {dataRoomHashEncoded: $dataRoomHashEncoded}) {
                    nodes {
                        dataRoomUuid: dataRoomId
                        source
                    }
                }
            }
            """,
            { "dataRoomHashEncoded": data_room_hash }
        )
        entries = data.get("dataRooms", {}).get("nodes", [])

        if len(entries) > 1:
            raise Exception("Cannot have multiple DCRs with the same hashcode")
        elif len(entries) == 0:
            return None
        else:
            return entries[0]

    def get_datasets_of_user(self) -> List[DatasetDescription]:
        data = self._post_graphql(
            """
            query getDatasets {
                datasets {
                    nodes {
                        datasetId: datasetHashEncoded
                        name
                        description
                        ownerEmail
                        creationDate: createdAt
                        datasetMeta {
                            description
                        }
                    }
                }
            }
            """
        )
        nodes = data.get("datasets", {}).get("nodes", [])
        datasets = []
        for node in nodes:
            meta_info = node.get("datasetMeta")
            if meta_info:
                description = meta_info.get("description")
            else:
                description = None
            datasets.append(
                DatasetDescription(
                    datasetId=node["datasetId"],
                    name=node["name"],
                    description=description,
                    ownerEmail=node["ownerEmail"],
                    creationDate=node["creationDate"]
                )
            )
        return datasets

    def save_dataset_metadata(
            self,
            manifest_hash: str,
            file_name: str,
            description: str,
            owner_email: str
    ):
        self._post_graphql(
            """
            mutation createDatasetMeta($input: CreateDatasetMetaInput!) {
                createDatasetMeta(input: $input) {
                    datasetMeta {
                        datasetHashEncoded
                        name
                        description
                    }
                }
            }
            """,
            {
                "input": {
                    "clientMutationId": str(uuid.uuid4()),
                    "datasetMeta": {
                        "datasetHash": platform_hash_to_str(bytes.fromhex(manifest_hash)),
                        "name": file_name,
                        "description": description,
                        "ownerEmail": owner_email,
                    }
                }
            }
        )

    def delete_dataset_metadata(self, manifest_hash: str):
        self._post_graphql(
            """
            mutation deleteDatasetMeta($datasetHashEncoded: String!) {
                deleteDatasetMetaByDatasetHashEncoded(input: {datasetHashEncoded: $datasetHashEncoded}) {
                    datasetMeta {
                        id
                    }
                }
            }
            """,
            {
                "datasetHashEncoded":  manifest_hash
            }
        )

    def create_dataset_link(
            self,
            data_room_id: str,
            manifest_hash: str,
            leaf_id: str
    ):
        data_room = self.get_data_room_by_hash(data_room_id)
        if not data_room:
            raise Exception(f"Unable to get data room id for hash {data_room_id}")
        else:
            if data_room["source"] == "WEB":
                data_room_uuid = data_room["dataRoomUuid"]
                compute_node = self._get_compute_node(data_room_uuid, leaf_id)
                if compute_node:
                    compute_node_uuid = compute_node["computeNodeUuid"]
                    # Can link to both compute nodes (type BRANCH) and leaf nodes (type LEAF).
                    # For SQL nodes we need to link to the verifier compute node.
                    if compute_node_uuid:
                        existing_link = self._get_dataset_link(compute_node_uuid)
                        if existing_link:
                            dataset_hash = existing_link["dataset"]["datasetId"]
                            raise Exception(
                                "The following dataset has already been published for this node." +
                                " Please unpublish this dataset first." +
                                f" Dataset: '{dataset_hash}'"
                            )
                        self._post_graphql(
                            """
                            mutation createDatasetLink($input: CreateDatasetLinkInput!) {
                                createDatasetLink(input: $input) {
                                    clientMutationId
                                    datasetLink {
                                        datasetHashEncoded
                                    }
                                }
                            }
                            """,
                            {
                                "input": {
                                    "clientMutationId": str(uuid.uuid4()),
                                    "datasetLink": {
                                        "datasetHash": platform_hash_to_str(
                                            bytes.fromhex(manifest_hash)
                                        ),
                                        "computeNodeId": compute_node_uuid,
                                    }
                                }
                            }
                        )
                else:
                    raise Exception(
                        f"Unable to find leaf with name '{leaf_id}' for data room '{data_room_id}'"
                    )
            else:
                pass

    def get_datasets_by_ids(
            self,
            manifest_hashes: List[str]
    ) -> List[DatasetDescription]:
        """
        Returns the a list of datasets with the given ids.
        """
        data = self._post_graphql(
            """
            query getDatasets($filter: DatasetMetaFilter!) {
                datasetMetas(filter: $filter) {
                    nodes {
                        datasetId: datasetHashEncoded
                        name
                        description
                        ownerEmail
                        creationDate: createdAt
                    }
                }
            }
            """,
            {
                "filter": {
                    "datasetHashEncoded": {
                        "in": manifest_hashes
                    }
                }
            }
        )
        nodes = data.get("datasetMetas", {}).get("nodes", [])
        return [DatasetDescription(**node) for node in nodes]

    def _get_compute_node(self, data_room_uuid: str, leaf_id: str) -> Optional[dict]:
        data = self._post_graphql(
            """
            query getComputeNodes($filter: ComputeNodeFilter!) {
                computeNodes(filter: $filter) {
                    nodes {
                        computeNodeUuid: computeNodeId
                        nodeName
                        computeNodeType
                    }
                }
            }
            """,
            {
                "filter": {
                    "dataRoomId": { "equalTo": data_room_uuid },
                    "nodeName": { "equalTo": leaf_id },
                }
            }
        )
        nodes = data["computeNodes"]["nodes"]
        if nodes and len(nodes) == 1:
            return nodes[0]
        else:
            return None

    def _post_graphql(self, query: str, variables={}) -> dict:
        request_payload = { "query": query }
        if variables:
            request_payload["variables"] = variables
        response = self._http_api.post(
            "",
            json.dumps(request_payload),
            {"Content-type": "application/json"}
        )
        payload = response.json()
        if "errors" in payload:
            error_messages = []
            for message in payload["errors"]:
                decoded_message =\
                    base64.b64decode(message["message"]).decode('utf-8')
                error_messages.append(decoded_message)
            raise Exception(",".join(error_messages))
        elif "data" in payload:
            return payload["data"]
        else:
            raise Exception("Malformed GraphQL response: no 'data' or 'errors' key")


def create_platform_api(
    api_token: str,
    user_email: str,
    client_id: str = DECENTRIQ_CLIENT_ID,
    api_host: str = DECENTRIQ_API_PLATFORM_HOST,
    api_port: int = DECENTRIQ_API_PLATFORM_PORT,
    api_use_tls: bool = DECENTRIQ_API_PLATFORM_USE_TLS
):
    http_api = API(
        api_token,
        client_id,
        api_host,
        api_port,
        api_prefix="/api/decentriq-platform/graphql",
        use_tls=api_use_tls,
        additional_auth_headers={
            "Authorization-User-Email": user_email,
        }
    )
    return PlatformApi(user_email, http_api)


class ClientPlatformFeatures:
    """
    Provider of a list of methods and properties mirroring what is offered by the Decentriq
    web platform.
    """
    _http_api: API
    _platform_api: PlatformApi

    def __init__(
        self,
        api_token: str,
        user_email: str,
        http_api: API,
        client_id: str = DECENTRIQ_CLIENT_ID,
        api_host: str = DECENTRIQ_API_PLATFORM_HOST,
        api_port: int = DECENTRIQ_API_PLATFORM_PORT,
        api_use_tls: bool = DECENTRIQ_API_PLATFORM_USE_TLS,
    ):
        """
        Creating objects of this class directly is not necessary as an object of this class is
        provided as part of each `Client` object.
        """
        self._http_api = http_api
        self._platform_api = create_platform_api(api_token, user_email, client_id, api_host, api_port, api_use_tls)
        self.user_email = user_email

    @property
    def decentriq_ca_root_certificate(self) -> bytes:
        """
        Returns the root certificate used by the Decentriq identity provider.
        Note that when using this certificate in any authentication scheme, you trust Decentriq as an identity provider!
        """
        url = Endpoints.SYSTEM_CERTIFICATE_AUTHORITY
        response = self._http_api.get(url).json()
        return response["rootCertificate"].encode("utf-8")

    @property
    def decentriq_pki_authentication(self) -> AuthenticationMethod:
        """
        The authentication method that uses the Decentriq root certificate to authenticate
        users.

        This method should be specified when building a data room in case you want to interact
        with the that data room either via the web interface or with sessions created using
        `create_auth_using_decentriq_pki`.
        Note that when using this authentication method you trust Decentriq as an identity provider!

        You can also create an `AuthenticationMethod` object directly and supply your own root certificate,
        with which to authenticate users connecting to your data room.
        In this case you will also need to issue corresponding user certificates and create your
        own custom `decentriq_platform.authentication.Auth` objects.
        """
        root_pki = self.decentriq_ca_root_certificate
        return AuthenticationMethod(
            trustedPki=TrustedPki(rootCertificatePem=root_pki)
        )

    def create_auth_using_decentriq_pki(self, email: str = None) -> Auth:
        """
        Creates a `decentriq_platform.authentication.Auth` object which can be
        attached to a `decentriq_platform.session.Session`.
        Sessions created using such an `Auth` object will commonly be used with
        data rooms that have been configured to use the `decentriq_pki_authentication`
        authentication method.
        """
        email = email if email is not None else self.user_email
        keypair = generate_key()
        csr = generate_csr(email, keypair)
        url = Endpoints.USER_CERTIFICATE.replace(":userId", email)
        csr_req = UserCsrRequest(csrPem=csr.decode("utf-8"))
        resp: UserCsrResponse = self._http_api.post(url, req_body=json.dumps(csr_req)).json()
        cert_chain_pem = resp["certChainPem"].encode("utf-8")
        auth = Auth(cert_chain_pem, keypair, email)
        return auth

    def get_data_room_descriptions(self) -> List[DataRoomDescription]:
        """
        Returns the a list of descriptions of all the data rooms a user created
        or participates in.
        """
        return self._platform_api.get_data_room_descriptions()

    def get_available_datasets(self) -> List[DatasetDescription]:
        """
        Returns the a list of datasets that the current user uploaded, regardless
        of whether they have already been connected to a data room or not.
        """
        return self._platform_api.get_datasets_of_user()


class SessionPlatformFeatures:
    """
    A provider for methods that mirror functionality known from the Decentriq
    web platform.

    This class is similar to `ClientPlatformFeatures` but methods provided on
    this class require communication with an enclave.
    """

    _platform_api: PlatformApi

    def __init__(self, session, api: PlatformApi):
        """
        Instances of this class should not be created directly but rather be
        accessed via the `Session.platform` field.
        """
        self._session = session
        self._platform_api = api

    def get_published_datasets(
            self,
            data_room_id: str
    ) -> List[DatasetDescription]:
        """
        Get a list of all the datasets that were published for a given data room.
        """
        response = self._session.retrieve_published_datasets(data_room_id)
        manifest_hashes =\
            [dataset.datasetHash.hex() for dataset in response.publishedDatasets]
        return self._platform_api.get_datasets_by_ids(manifest_hashes)
View Source
from __future__ import annotations
import chily
import re
import hashlib
import hmac
import json
from base64 import b64encode, b64decode
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from cryptography.hazmat.primitives import hashes
from typing import Any, List, Tuple, TYPE_CHECKING, Iterator, Optional, Dict
from time import sleep
from .platform import PlatformApi
from .api import Endpoints
from .authentication import Auth, Sigma
from .proto import (
    DataRoom, ComputeNodeProtocol, DataNoncePubkey, Request, Response,
    CreateDataRoomRequest, CreateDataRoomResponse,
    ExecuteComputeRequest, ExecuteComputeResponse, GcgRequest, GcgResponse, GetResultsRequest,
    GetResultsResponseChunk, JobStatusRequest, JobStatusResponse,
    PublishDatasetToDataRoomRequest, PublishDatasetToDataRoomResponse,
    RemovePublishedDatasetRequest, RemovePublishedDatasetResponse,
    RetrieveAuditLogRequest, RetrieveAuditLogResponse, RetrieveDataRoomRequest,
    RetrieveDataRoomResponse, RetrieveDataRoomStatusRequest,
    RetrievePublishedDatasetsRequest,
    RetrievePublishedDatasetsResponse, UpdateDataRoomStatusRequest,
    ExecuteDevelopmentComputeRequest, CreateConfigurationCommitRequest,
    GenerateMergeApprovalSignatureRequest, MergeConfigurationCommitRequest,
    RetrieveDataRoomConfigurationHistoryRequest, RetrieveDataRoomConfigurationHistoryResponse,
    UpdateDataRoomStatusResponse, DataRoomStatus, AttestationSpecification, Fatquote,
    CreateConfigurationCommitRequest, RetrieveConfigurationCommitApproversRequest,
    RetrieveConfigurationCommitRequest
)
from .proto.length_delimited import parse_length_delimited, serialize_length_delimited
from .storage import Key
from .types import FatquoteResBody, EnclaveMessage, ScopeTypes, JobId
from .verification import QuoteBody, Verification
from .platform import SessionPlatformFeatures
if TYPE_CHECKING:
    from .client import Client


__all__ = [ "Session", "LATEST_GCG_PROTOCOL_VERSION", "LATEST_WORKER_PROTOCOL_VERSION" ]


LATEST_GCG_PROTOCOL_VERSION = 1
LATEST_WORKER_PROTOCOL_VERSION = 0


def _get_data_room_id(create_data_room_response: CreateDataRoomResponse) -> bytes:
    response_type = create_data_room_response.WhichOneof("create_data_room_response")
    if response_type is None:
        raise Exception("Empty CreateDataRoomResponse")
    elif response_type == "dataRoomId":
        return create_data_room_response.dataRoomId
    elif response_type == "dataRoomValidationError":
        raise Exception(
            "DataRoom creation failed",
            create_data_room_response.dataRoomValidationError
        )
    else:
        raise Exception("Unknown response type for CreateDataRoomResponse", response_type)


def datanoncepubkey_to_message(
        encrypted_data: bytes,
        nonce: bytes,
        pubkey: bytes,
        sigma_auth: Sigma
) -> DataNoncePubkey:
    message = DataNoncePubkey()
    message.data = encrypted_data
    message.nonce = nonce
    message.pubkey = pubkey
    message.pki.certChainPem = sigma_auth.get_cert_chain()
    message.pki.signature = sigma_auth.get_signature()
    message.pki.idMac = sigma_auth.get_mac_tag()
    return message


def message_to_datanoncepubkey(message: bytes) -> Tuple[bytes, bytes, bytes]:
    parsed_msg = DataNoncePubkey()
    parse_length_delimited(message, parsed_msg)
    return (parsed_msg.data, parsed_msg.nonce, parsed_msg.pubkey)


class Session():
    """
    Class for managing the communication with an enclave.
    """
    client: Client
    session_id: str
    enclave_identifier: str
    auth: Auth
    email: str
    keypair: Any
    fatquote: Fatquote
    quote: QuoteBody
    driver_attestation_specification: AttestationSpecification
    client_protocols: List[int]

    def __init__(
            self,
            client: Client,
            session_id: str,
            driver_attestation_specification: AttestationSpecification,
            client_protocols: List[int],
            auth: Auth,
            platform_api: Optional[PlatformApi] = None,
   ):
        """
        `Session` instances should not be instantiated directly but rather
         be created using a `Client` object using  `decentriq_platform.Client.create_session`.
        """
        url = Endpoints.SESSION_FATQUOTE.replace(":sessionId", session_id)
        response: FatquoteResBody = client._api.get(url).json()
        fatquote_bytes = b64decode(response["fatquoteBase64"])
        fatquote = Fatquote()
        fatquote.ParseFromString(fatquote_bytes)
        verification = Verification(attestation_specification=driver_attestation_specification)
        report_data = verification.verify(fatquote)
        self.client = client
        self.session_id = session_id
        self.auth = auth
        self.email = auth.user_id
        self.keypair = chily.Keypair.from_random()
        self.fatquote = fatquote
        self.report_data = report_data
        self.driver_attestation_specification = driver_attestation_specification
        self.client_protocols = client_protocols

        if platform_api:
            self._platform = SessionPlatformFeatures(self, platform_api)
        else:
            self._platform = None

    def _get_client_protocol(self, endpoint_protocols: List[int]) -> int:
        try:
            protocol = max(set(self.client_protocols) & set(endpoint_protocols))
            return protocol
        except ValueError:
            min_enclave_version = min(self.client_protocols)
            max_endpoint_version = min(endpoint_protocols)
            exception_message = "Endpoint is only available with protocol versions {} but the enclave only supports {}\n".format(
                endpoint_protocols,
                self.client_protocols
            )
            if min_enclave_version > max_endpoint_version:
                exception_message += "Try upgrading to a newer version of the SDK"
            else:
                exception_message += "Try using an older version of the SDK"
            raise Exception(exception_message)

    def _get_enclave_pubkey(self):
        pub_keyB = bytearray(self.report_data[:32])
        return chily.PublicKey.from_bytes(pub_keyB)

    def _encrypt_and_encode_data(self, data: bytes, auth: Auth) -> bytes:
        nonce = chily.Nonce.from_random()
        cipher = chily.Cipher(
            self.keypair.secret, self._get_enclave_pubkey()
        )
        enc_data = cipher.encrypt("client sent session data", data, nonce)
        public_keys = bytes(self.keypair.public_key.bytes) + bytes(self._get_enclave_pubkey().bytes)
        signature = auth._sign(public_keys)
        shared_key = bytes(self.keypair.secret.diffie_hellman(self._get_enclave_pubkey()).bytes)
        hkdf = HKDF(algorithm=hashes.SHA512(), length=64, info=b"IdP KDF Context", salt=b"")
        mac_key = hkdf.derive(shared_key)
        mac_tag = hmac.digest(mac_key, auth._get_user_id().encode(), "sha512")
        sigma_auth = Sigma(signature, mac_tag, auth)
        return datanoncepubkey_to_message(
            bytes(enc_data),
            bytes(nonce.bytes),
            bytes(self.keypair.public_key.bytes),
            sigma_auth
        )

    def _decode_and_decrypt_data(self, data: bytes) -> bytes:
        dec_data, nonceB, _ = message_to_datanoncepubkey(data)
        cipher = chily.Cipher(
            self.keypair.secret, self._get_enclave_pubkey()
        )
        return cipher.decrypt("client received session data", dec_data, chily.Nonce.from_bytes(nonceB))

    def send_request(
            self,
            request: GcgRequest,
            protocol: int,
    ) -> List[GcgResponse]:
        """
        Low-level method for sending a raw `GcgRequest` to the enclave.
        Use this method if any of the convenience methods (such as `run_computation`) don't perform
        the exact task you want.
        """
        gcg_protocol = serialize_length_delimited(
            ComputeNodeProtocol(
                version=protocol
            )
        )
        serialized_request = serialize_length_delimited(
            Request(
                deltaRequest= self._encrypt_and_encode_data(
                    gcg_protocol + serialize_length_delimited(request),
                    self.auth
                )
            )
        )
        url = Endpoints.SESSION_MESSAGES.replace(":sessionId", self.session_id)
        enclave_request = EnclaveMessage(data=b64encode(serialized_request).decode("ascii"))
        enclave_response: bytes = self.client._api.post(
            url, json.dumps(enclave_request), {"Content-type": "application/json", "Accept-Version": "2"}
        ).content

        responses: List[GcgResponse] = []
        offset = 0
        while offset < len(enclave_response):
            response_container = Response()
            offset += parse_length_delimited(enclave_response[offset:], response_container)
            if response_container.HasField("unsuccessfulResponse"):
                raise Exception(response_container.unsuccessfulResponse)
            else:
                response = GcgResponse()
                decrypted_response = self._decode_and_decrypt_data(
                    response_container.successfulResponse
                )
                response_protocol = ComputeNodeProtocol()
                response_offset = parse_length_delimited(
                        decrypted_response, response_protocol
                )
                if response_protocol.version != protocol:
                    raise Exception("Different response protocol version than requested")
                parse_length_delimited(decrypted_response[response_offset:], response)
                if response.HasField("failure"):
                    raise Exception(response.failure)
                responses.append(response)
        return responses

    def _publish_data_room(
            self,
            data_room_definition: Tuple[DataRoom, List[ConfigurationModification]],
    ) -> CreateDataRoomResponse:
        endpoint_protocols = [1]
        protocol = self._get_client_protocol(endpoint_protocols)
        data_room, conf_modifications = data_room_definition
        if not data_room.ownerEmail:
            data_room.ownerEmail = self.email
        request = CreateDataRoomRequest(
            dataRoom=data_room,
            initialConfiguration=conf_modifications,
        )
        responses = self.send_request(GcgRequest(createDataRoomRequest=request), protocol)
        if len(responses) != 1:
            raise Exception("Malformed response")

        response = responses[0]

        if response.HasField("createDataRoomResponse"):
            if response.createDataRoomResponse.HasField("dataRoomValidationError"):
                raise Exception(
                    "Error when validating data room: {} (compute node id '{}')".format(
                        response.createDataRoomResponse.dataRoomValidationError.message,
                        response.createDataRoomResponse.dataRoomValidationError.computeNodeId
                    )
                )
            else:
                if self.is_integrated_with_platform:
                    data_room_hash = response.createDataRoomResponse.dataRoomId
                    self._create_data_room_in_platform(data_room_definition, data_room_hash)
        else:
            raise Exception(
                "Expected createDataRoomResponse, got "
                + str(response.WhichOneof("gcg_response"))
            )

        return response.createDataRoomResponse

    def publish_data_room(
            self,
            data_room_definition: Tuple[DataRoom, List[ConfigurationModification]]
    ) -> str:
        """
        Create a data room with the provided protobuf configuration object
        and have the enclave apply the given list of modifications to the data
        room configuration.

        The id returned from this method will be used when interacting with the
        published data room (for example when running computations or publishing
        datasets).
        """
        response = self._publish_data_room(data_room_definition)
        return _get_data_room_id(response).hex()

    def publish_data_room_configuration_commit(
            self,
            configuration_commit: ConfigurationCommit
    ) -> str:
        """
        Publish the given data room configuration commit.

        Configuration commits can be built using a `DataRoomCommitBuilder` object.

        The id returned from this method will be used when running development
        computations or when trying to merge this commit into the main
        data room configuration.
        """
        endpoint_protocols = [1]
        protocol = self._get_client_protocol(endpoint_protocols)
        scope_id = self.client._ensure_scope_with_metadata(
            self.email,
            {
                "type": ScopeTypes.DATA_ROOM_INTERMEDIATE_DATA,
                "data_room_id": configuration_commit.dataRoomId.hex(),
            }
        )
        request = CreateConfigurationCommitRequest(
            commit=configuration_commit,
            scope=bytes.fromhex(scope_id)
        )
        responses = self.send_request(
            GcgRequest(createConfigurationCommitRequest=request),
            protocol
        )
        if len(responses) != 1:
            raise Exception("Malformed response")

        response = responses[0]
        if not response.HasField("createConfigurationCommitResponse"):
            raise Exception(
                "Expected createConfigurationCommitResponse, got "
                + str(response.WhichOneof("gcg_response"))
            )

        return response.createConfigurationCommitResponse.commitId.hex()

    def retrieve_configuration_commit(
        self,
        configuration_commit_id: str,
    ) -> ConfigurationCommit:
        """
        Retrieve the content of given configuration commit id.

        **Returns**:
        A `ConfigurationCommit`.
        """
        endpoint_protocols = [1]
        protocol = self._get_client_protocol(endpoint_protocols)
        scope_id = self.client._ensure_scope_with_metadata(
            self.email,
            {
                "type": ScopeTypes.DATA_ROOM_COMMITS_DATA,
            }
        )
        request = RetrieveConfigurationCommitRequest(
            commitId=bytes.fromhex(configuration_commit_id),
            scope=bytes.fromhex(scope_id),
        )
        responses = self.send_request(
            GcgRequest(retrieveConfigurationCommitRequest=request),
            protocol
        )
        if len(responses) != 1:
            raise Exception("Malformed response")

        response = responses[0]
        if not response.HasField("retrieveConfigurationCommitResponse"):
            raise Exception(
                "Expected retrieveConfigurationCommitResponse, got "
                + str(response.WhichOneof("gcg_response"))
            )

        return response.retrieveConfigurationCommitResponse.commit

    def retrieve_configuration_commit_approvers(
        self,
        configuration_commit_id: str,
    ) -> List[str]:
        """
        Retrieve the list of users who need to approve the merger of a given
        configuration commit.

        **Returns**:
        A list of ids belonging to the users that need to approve the
        configuration commit.
        """
        endpoint_protocols = [1]
        protocol = self._get_client_protocol(endpoint_protocols)
        scope_id = self.client._ensure_scope_with_metadata(
            self.email,
            {
                "type": ScopeTypes.DATA_ROOM_COMMITS_DATA,
            }
        )
        request = RetrieveConfigurationCommitApproversRequest(
            commitId=bytes.fromhex(configuration_commit_id),
            scope=bytes.fromhex(scope_id),
        )
        responses = self.send_request(
            GcgRequest(retrieveConfigurationCommitApproversRequest=request),
            protocol
        )
        if len(responses) != 1:
            raise Exception("Malformed response")

        response = responses[0]
        if not response.HasField("retrieveConfigurationCommitApproversResponse"):
            raise Exception(
                "Expected retrieveConfigurationCommitApproversResponse, got "
                + str(response.WhichOneof("gcg_response"))
            )

        return response.retrieveConfigurationCommitApproversResponse.approvers


    def generate_merge_approval_signature(
            self,
            configuration_commit_id: str
    ) -> bytes:
        """
        Generate an approval signature required for merging a configuration
        commit.

        To merge a specific configuration commit, each user referenced in the list
        of ids returned by `retrieveConfigurationCommitApprovers` needs to
        generate an approval signature using this method.
        """
        endpoint_protocols = [1]
        protocol = self._get_client_protocol(endpoint_protocols)
        scope_id = self.client._ensure_scope_with_metadata(
            self.email,
            {
                "type": ScopeTypes.DATA_ROOM_COMMITS_DATA,
            }
        )
        request = GenerateMergeApprovalSignatureRequest(
            commitId=bytes.fromhex(configuration_commit_id),
            scope=bytes.fromhex(scope_id),
        )
        responses = self.send_request(
            GcgRequest(generateMergeApprovalSignatureRequest=request),
            protocol
        )
        if len(responses) != 1:
            raise Exception("Malformed response")

        response = responses[0]
        if not response.HasField("generateMergeApprovalSignatureResponse"):
            raise Exception(
                "Expected generateMergeApprovalSignatureResponse, got "
                + str(response.WhichOneof("gcg_response"))
            )

        return response.generateMergeApprovalSignatureResponse.signature

    def merge_configuration_commit(
        self,
        configuration_commit_id: str,
        approval_signatures: Dict[str, bytes],
    ) -> MergeConfigurationCommitResponse:
        """
        Request the enclave to merge the given configuration commit into the
        main data room configuration.

        **Parameters**:
        - `configuration_commit_id`: The id of the commit to be merged.
        - `approval_signatures`: A dictionary containing the approval signature for
            each of the required approvers, e.g. `{ "some@email.com": signature }`.
        """
        endpoint_protocols = [1]
        protocol = self._get_client_protocol(endpoint_protocols)
        scope_id = self.client._ensure_scope_with_metadata(
            self.email,
            {
                "type": ScopeTypes.DATA_ROOM_COMMITS_DATA,
            }
        )
        request = MergeConfigurationCommitRequest(
            scope=bytes.fromhex(scope_id),
            commitId=bytes.fromhex(configuration_commit_id),
            approvalSignatures=approval_signatures,
        )
        responses = self.send_request(
            GcgRequest(mergeConfigurationCommitRequest=request),
            protocol
        )
        if len(responses) != 1:
            raise Exception("Malformed response")

        response = responses[0]
        if not response.HasField("mergeConfigurationCommitResponse"):
            raise Exception(
                "Expected mergeConfigurationCommitResponse, got "
                + str(response.WhichOneof("gcg_response"))
            )

        return response.mergeConfigurationCommitResponse

    def retrieve_data_room_configuration_history(
        self,
        data_room_id: str
    ) -> RetrieveDataRoomConfigurationHistoryResponse:
        """
        Retrieve the current merged data room configuration, as well as the
        history of configuration commits that have already been merged.
        """
        endpoint_protocols = [1]
        protocol = self._get_client_protocol(endpoint_protocols)
        request = RetrieveDataRoomConfigurationHistoryRequest(
            dataRoomId=bytes.fromhex(data_room_id),
        )
        responses = self.send_request(
            GcgRequest(retrieveDataRoomConfigurationHistoryRequest=request),
            protocol
        )
        if len(responses) != 1:
            raise Exception("Malformed response")
        response = responses[0]
        if not response.HasField("retrieveDataRoomConfigurationHistoryResponse"):
            raise Exception(
                "Expected retrieveDataRoomConfigurationHistoryResponse, got "
                + str(response.WhichOneof("gcg_response"))
            )

        return response.retrieveDataRoomConfigurationHistoryResponse

    def retrieve_current_data_room_configuration(
            self,
            data_room_id: str
    ) -> Tuple[DataRoomConfiguration, str]:
        """
        Retrieve the current data room confguration, as well as the current "history pin".

        A history pin is the hash of all the ids of configuration commits that
        make up the structure of a data room. This pin therefore uniquely identifies
        a data room's structure at a certain point in time.
        A data room configuration, as well as its associated history pin, can be used
        to extend an existing data room (for example by adding new compute nodes).
        Extending an existing data room is done using the `DataRoomCommitBuilder` class.
        """
        response = self.retrieve_data_room_configuration_history(data_room_id)
        return (response.currentConfiguration, response.pin.hex())

    def stop_data_room(self, data_room_id: str):
        """
        Stop the data room with the given id, making it impossible to run new
        computations.
        """
        self._update_data_room_status(data_room_id, DataRoomStatus.Value("Stopped"))

    def _update_data_room_status(
            self,
            data_room_id: str,
            status # type: DataRoomStatus.V
    ) -> UpdateDataRoomStatusResponse:
        """
        Update the status of the data room.

        For the special case of stopping a data room, the method
        `stop_data_room` can be used.
        """
        endpoint_protocols = [0, 1]
        protocol = self._get_client_protocol(endpoint_protocols)
        scope_id = self.client._ensure_scope_with_metadata(
            self.email,
            {
                "type": ScopeTypes.DATA_ROOM_INTERMEDIATE_DATA,
                "data_room_id": data_room_id,
            }
        )
        request = UpdateDataRoomStatusRequest(
            scope=bytes.fromhex(scope_id),
            dataRoomId=bytes.fromhex(data_room_id),
            status=status,
        )
        responses = self.send_request(
            GcgRequest(updateDataRoomStatusRequest=request),
            protocol
        )

        if len(responses) != 1:
            raise Exception("Malformed response")

        response = responses[0]
        if response.HasField("updateDataRoomStatusResponse"):
            if self.is_integrated_with_platform:
                self.platform._platform_api.update_data_room_status(data_room_id, status)
        else:
            raise Exception(
                "Expected updateDataRoomStatusResponse, got "
                + str(response.WhichOneof("gcg_response"))
            )

        return response.updateDataRoomStatusResponse

    def retrieve_data_room_status(
            self,
            data_room_id: str
    ) -> str:
        """
        Returns the status of the data room. Valid values are `"Active"` or `"Stopped"`.
        """
        endpoint_protocols = [0, 1]
        protocol = self._get_client_protocol(endpoint_protocols)
        scope_id = self.client._ensure_scope_with_metadata(
            self.email,
            {
                "type": ScopeTypes.DATA_ROOM_INTERMEDIATE_DATA,
                "data_room_id": data_room_id,
            }
        )
        request = RetrieveDataRoomStatusRequest(
            scope=bytes.fromhex(scope_id),
            dataRoomId=bytes.fromhex(data_room_id),
        )
        responses = self.send_request(
            GcgRequest(retrieveDataRoomStatusRequest=request),
            protocol
        )
        if len(responses) != 1:
            raise Exception("Malformed response")
        response = responses[0]
        if not response.HasField("retrieveDataRoomStatusResponse"):
            raise Exception(
                "Expected retrieveDataRoomStatusResponse, got "
                + str(response.WhichOneof("gcg_response"))
            )

        return DataRoomStatus.Name(response.retrieveDataRoomStatusResponse.status)

    def retrieve_data_room_definition(
            self,
            data_room_id: str
    ) -> RetrieveDataRoomResponse:
        """
        Returns the underlying protobuf object for the data room.
        """
        endpoint_protocols = [1]
        protocol = self._get_client_protocol(endpoint_protocols)
        request = RetrieveDataRoomRequest(
            dataRoomId=bytes.fromhex(data_room_id),
        )
        responses = self.send_request(
            GcgRequest(retrieveDataRoomRequest=request),
            protocol
        )
        if len(responses) != 1:
            raise Exception("Malformed response")
        response = responses[0]
        if not response.HasField("retrieveDataRoomResponse"):
            raise Exception(
                "Expected retrieveDataRoomResponse, got "
                + str(response.WhichOneof("gcg_response"))
            )
        return response.retrieveDataRoomResponse

    def retrieve_audit_log(
            self,
            data_room_id: str
    ) -> RetrieveAuditLogResponse:
        """
        Returns the audit log for the data room.
        """
        endpoint_protocols = [0, 1]
        protocol = self._get_client_protocol(endpoint_protocols)
        scope_id = self.client._ensure_scope_with_metadata(
            self.email,
            {
                "type": ScopeTypes.DATA_ROOM_INTERMEDIATE_DATA,
                "data_room_id": data_room_id,
            }
        )
        request = RetrieveAuditLogRequest(
            scope=bytes.fromhex(scope_id),
            dataRoomId=bytes.fromhex(data_room_id),
        )
        responses = self.send_request(
            GcgRequest(retrieveAuditLogRequest=request),
            protocol
        )
        if len(responses) != 1:
            raise Exception("Malformed response")
        response = responses[0]
        if not response.HasField("retrieveAuditLogResponse"):
            raise Exception(
                "Expected retrieveAuditLogResponse, got "
                + str(response.WhichOneof("gcg_response"))
            )
        return response.retrieveAuditLogResponse

    def publish_dataset(
            self,
            data_room_id: str,
            manifest_hash: str,
            leaf_id: str,
            key: Key,
            *,
            force: bool = False
    ) -> PublishDatasetToDataRoomResponse:
        """
        Publishes a file and its encryption key to a data room.
        Neither the file or the encryption key will ever be stored in
        unencrypted form.

        This method will check whether the to-be-published file exists.
        If this is not the case, an exception will be raised.
        This behavior can be disabled by setting the `force` flag.

        In case the original client was created with platform integration
        enabled, the method will further check whether there already
        is a dataset published for the given data room.
        In this case, an exception will be thrown and the dataset
        will need to be unpublished first.
        """

        endpoint_protocols = [0, 1]
        protocol = self._get_client_protocol(endpoint_protocols)
        dataset = self.client.get_dataset(manifest_hash)
        if not dataset and not force:
            raise Exception(
                "The dataset you are trying to publish does not exist"
            )

        should_create_dataset_links =\
            self.is_integrated_with_platform and \
            self._is_web_data_room(data_room_id)

        if should_create_dataset_links:
            existing_link = self.platform._platform_api.get_dataset_link(
                data_room_id,
                self._convert_data_node_name_to_verifier_node(leaf_id)
            )
            if existing_link:
                existing_dataset = existing_link["dataset"]["datasetId"]
                raise Exception(
                    "The following dataset has already been published for this node." +
                    " Please unpublish this dataset first." +
                    f" Dataset: '{existing_dataset}'"
                )

        scope_id = self.client._ensure_scope_with_metadata(
            self.email,
            {
                "type": ScopeTypes.DATA_ROOM_INTERMEDIATE_DATA,
                "data_room_id": data_room_id,
            }
        )
        request = PublishDatasetToDataRoomRequest(
            dataRoomId=bytes.fromhex(data_room_id),
            datasetHash=bytes.fromhex(manifest_hash),
            leafName=leaf_id,
            encryptionKey=key.material,
            scope=bytes.fromhex(scope_id)
        )
        responses = self.send_request(
            GcgRequest(publishDatasetToDataRoomRequest=request),
            protocol
        )
        if len(responses) != 1:
            raise Exception("Malformed response")
        response = responses[0]
        if not response.HasField("publishDatasetToDataRoomResponse"):
            raise Exception(
                "Expected publishDatasetToDataRoomResponse, got "
                + str(response.WhichOneof("gcg_response"))
            )
        else:
            if should_create_dataset_links:
                self.platform._platform_api.create_dataset_link(
                    data_room_id,
                    manifest_hash,
                    self._convert_data_node_name_to_verifier_node(leaf_id)
                )

        return response.publishDatasetToDataRoomResponse

    def _convert_data_node_name_to_verifier_node(self, node_name: str) -> str:
        pattern = re.compile(r"@table/(.*)/dataset")
        match = pattern.match(node_name)
        if match:
            return match.group(1)
        else:
            return node_name

    def remove_published_dataset(
            self,
            data_room_id: str,
            leaf_id: str,
    ) -> RemovePublishedDatasetResponse:
        """
        Removes a published dataset from the data room.
        """
        endpoint_protocols = [0, 1]
        protocol = self._get_client_protocol(endpoint_protocols)
        scope_id = self.client._ensure_scope_with_metadata(
            self.email,
            {
                "type": ScopeTypes.DATA_ROOM_INTERMEDIATE_DATA,
                "data_room_id": data_room_id,
            }
        )
        request = RemovePublishedDatasetRequest(
            scope=bytes.fromhex(scope_id),
            dataRoomId=bytes.fromhex(data_room_id),
            leafName=leaf_id,
        )
        responses = self.send_request(
            GcgRequest(removePublishedDatasetRequest=request),
            protocol
        )
        if len(responses) != 1:
            raise Exception("Malformed response")
        response = responses[0]
        if not response.HasField("removePublishedDatasetResponse"):
            raise Exception(
                "Expected removePublishedDatasetResponse, got "
                + str(response.WhichOneof("gcg_response"))
            )
        else:
            if self.is_integrated_with_platform and self._is_web_data_room(data_room_id):
                try:
                    self.platform._platform_api.delete_dataset_link(
                        data_room_id,
                        self._convert_data_node_name_to_verifier_node(leaf_id)
                    )
                except Exception as error:
                    print(f"Error when deleting dataset link on platform: {error}")

        return response.removePublishedDatasetResponse

    def retrieve_published_datasets(
            self,
            data_room_id: str,
    ) -> RetrievePublishedDatasetsResponse:
        """
        Returns the datasets published to the given data room.
        """
        endpoint_protocols = [0, 1]
        protocol = self._get_client_protocol(endpoint_protocols)
        scope_id = self.client._ensure_scope_with_metadata(
            self.email,
            {
                "type": ScopeTypes.DATA_ROOM_INTERMEDIATE_DATA,
                "data_room_id": data_room_id,
            }
        )
        request = RetrievePublishedDatasetsRequest(
            scope=bytes.fromhex(scope_id),
            dataRoomId=bytes.fromhex(data_room_id),
        )
        responses = self.send_request(
            GcgRequest(retrievePublishedDatasetsRequest=request),
            protocol
        )
        if len(responses) != 1:
            raise Exception("Malformed response")
        response = responses[0]
        if not response.HasField("retrievePublishedDatasetsResponse"):
            raise Exception(
                "Expected retrievePublishedDatasetResponse, got "
                + str(response.WhichOneof("gcg_response"))
            )
        return response.retrievePublishedDatasetsResponse

    def _submit_dev_compute(
            self,
            configuration_commit_id: str,
            compute_node_ids: List[str],
            /, *,
            dry_run: bool = False,
    ) -> ExecuteComputeResponse:
        """
        Submits a computation request which will generate an execution plan to
        perform the computation of the goal nodes
        """
        endpoint_protocols = [1]
        protocol = self._get_client_protocol(endpoint_protocols)
        scope_id = self.client._ensure_scope_with_metadata(
            self.email,
            {
                "type": ScopeTypes.DATA_ROOM_COMMITS_DATA,
            }
        )
        request = ExecuteDevelopmentComputeRequest(
            configurationCommitId=bytes.fromhex(configuration_commit_id),
            computeNodeNames=compute_node_ids,
            isDryRun=dry_run,
            scope=bytes.fromhex(scope_id)
        )
        responses = self.send_request(
            GcgRequest(executeDevelopmentComputeRequest=request),
            protocol
        )
        if len(responses) != 1:
            raise Exception("Malformed response")
        response = responses[0]
        if not response.HasField("executeComputeResponse"):
            raise Exception(
                "Expected executeComputeResponse, got "
                + str(response.WhichOneof("gcg_response"))
            )
        return response.executeComputeResponse

    def _submit_compute(
            self,
            data_room_id: str,
            compute_node_ids: List[str],
            /, *,
            dry_run: bool = False,
    ) -> ExecuteComputeResponse:
        """
        Submits a computation request which will generate an execution plan to
        perform the computation of the goal nodes
        """
        endpoint_protocols = [0, 1]
        protocol = self._get_client_protocol(endpoint_protocols)
        scope_id = self.client._ensure_scope_with_metadata(
            self.email,
            {
                "type": ScopeTypes.DATA_ROOM_INTERMEDIATE_DATA,
                "data_room_id": data_room_id,
            }
        )
        request = ExecuteComputeRequest(
            dataRoomId=bytes.fromhex(data_room_id),
            computeNodeNames=compute_node_ids,
            isDryRun=dry_run,
            scope=bytes.fromhex(scope_id)
        )
        responses = self.send_request(
            GcgRequest(executeComputeRequest=request),
            protocol
        )
        if len(responses) != 1:
            raise Exception("Malformed response")
        response = responses[0]
        if not response.HasField("executeComputeResponse"):
            raise Exception(
                "Expected executeComputeResponse, got "
                + str(response.WhichOneof("gcg_response"))
            )
        return response.executeComputeResponse

    def get_computation_status(self, job_id: str) -> JobStatusResponse:
        """
        Returns the status of the provided `job_id` which will include the names
        of the nodes that completed their execution
        """
        endpoint_protocols = [0, 1]
        protocol = self._get_client_protocol(endpoint_protocols)
        request = JobStatusRequest(
            jobId=bytes.fromhex(job_id),
        )
        responses = self.send_request(
            GcgRequest(jobStatusRequest=request),
            protocol
        )
        if len(responses) != 1:
            raise Exception("Malformed response")
        response = responses[0]
        if not response.HasField("jobStatusResponse"):
            raise Exception(
                "Expected jobStatusResponse, got "
                + str(response.WhichOneof("gcg_response"))
            )
        return response.jobStatusResponse

    def _stream_job_results(
            self,
            job_id: bytes,
            compute_node_id: str,
    ) -> Iterator[GetResultsResponseChunk]:
        """
        Streams the results of the provided `job_id`
        """
        endpoint_protocols = [0, 1]
        protocol = self._get_client_protocol(endpoint_protocols)
        request = GetResultsRequest(
            jobId=job_id,
            computeNodeName=compute_node_id,
        )
        responses = self.send_request(
            GcgRequest(getResultsRequest=request),
            protocol
        )
        for response in responses:
            if response.HasField("getResultsResponseChunk"):
                yield response.getResultsResponseChunk
            elif response.HasField("getResultsResponseFooter"):
                return
            else:
                raise Exception(
                    "Expected getResultsResponseChunk or getResultsResponseFooter, got "
                    + str(response.WhichOneof("gcg_response"))
                )
        raise Exception("Enclave connection aborted while streaming results")

    def _get_job_results(
            self,
            job_id: bytes,
            compute_node_id: str,
    ) -> bytes:
        """
        Returns the results of the provided `job_id`
        """
        return b"".join(list(map(lambda chunk: chunk.data, self._stream_job_results(job_id, compute_node_id))))

    def run_computation(
            self,
            data_room_id: str,
            compute_node_id: str,
            /, *,
            dry_run: bool = False,
    ) -> JobId:
        """
        Run a specific computation within the data room with the given id.

        The result will be an identifier object of the job executing the computation.
        This object is required for checking a job's status and retrieving its results.
        """
        response = self._submit_compute(data_room_id, [compute_node_id], dry_run=dry_run)
        return JobId(response.jobId.hex(), compute_node_id)

    def wait_until_computation_has_finished(
        self,
        job_id: JobId,
        /, *,
        interval: int = 5,
        timeout: int = None
    ):
        """
        Wait for the given job to complete.

        The method will check for the job's completeness every `interval` seconds and up to
        an optional `timeout` seconds after which the method will raise an exception.
        """
        elapsed = 0
        while True:
            if timeout is not None and elapsed > timeout:
                raise Exception(
                    f"Timeout when trying to get result for job {job_id.id} of"
                    f" {job_id.compute_node_id} (waited {timeout} seconds)"
                )
            elif job_id.compute_node_id in self.get_computation_status(job_id.id).completeComputeNodeNames:
                break
            else:
                sleep(interval)
                elapsed += interval

    def run_dev_computation(
            self,
            configuration_commit_id: str,
            compute_node_id: str,
            /, *,
            dry_run: bool = False,
    ) -> JobId:
        """
        Run a specific computation within the context of the data room configuration
        defined by the given commit id.
        Such "development" computations can also be run for configuration commits
        that have not yet been merged.

        The result will be an identifier object of the job executing the computation.
        This object is required for checking a job's status and retrieving its results.
        """
        response = self._submit_dev_compute(configuration_commit_id, [compute_node_id], dry_run=dry_run)
        return JobId(response.jobId.hex(), compute_node_id)

    def get_computation_result(
            self,
            job_id: JobId,
            /, *,
            interval: int = 5,
            timeout: int = None
    ) -> bytes:
        """
        Wait for the given job to complete and retrieve its results as a raw byte string.

        The method will check for the job's completeness every `interval` seconds and up to
        an optional `timeout` seconds after which the method will raise an exception.
        If the job completes and the results can be retrieved successfully, a raw byte string
        will be returned. The bytes tring can be transformed into a more useful object using
        a variety of helper methods. These helper methods are specific for the type of computation
        you ran and can be found in the corresponding packages.
        """
        elapsed = 0
        job_id_bytes = bytes.fromhex(job_id.id)
        self.wait_until_computation_has_finished(
            job_id,
            interval=interval,
            timeout=timeout
        )
        results = self._get_job_results(job_id_bytes, job_id.compute_node_id)
        return results

    def run_computation_and_get_results(
            self,
            data_room_id: str,
            compute_node_id: str,
            /, *,
            interval: int = 5,
            timeout: int = None
    ) -> Optional[bytes]:
        """
        Run a specific computation and return its results.

        This method is simply a wrapper for running `run_computation` and `get_computation_result`
        directly after each other
        """
        job_id = self.run_computation(data_room_id, compute_node_id)
        return self.get_computation_result(
            job_id,
            interval=interval,
            timeout=timeout
        )

    def _create_data_room_in_platform(
            self,
            data_room_definition: DataRoom,
            data_room_hash: bytes
    ):
        attestation_spec_type =\
                self.driver_attestation_specification.WhichOneof("attestation_specification")

        if attestation_spec_type == "intelEpid":
            mrenclave = self.driver_attestation_specification.intelEpid.mrenclave
        elif attestation_spec_type == "intelDcap":
            mrenclave = self.driver_attestation_specification.intelDcap.mrenclave
        else:
            raise Exception("Unknown attestation specification type")

        attestation_specification_hash = hashlib.sha256(
            serialize_length_delimited(
                self.driver_attestation_specification
            )
        ).hexdigest()

        self.platform._platform_api.publish_data_room(
            data_room_definition,
            data_room_hash=data_room_hash.hex(),
            attestation_specification_hash=attestation_specification_hash
        )

    def _is_web_data_room(self, data_room_id: str) -> bool:
        data_room = self.platform._platform_api.get_data_room_by_hash(data_room_id)
        if data_room:
            return data_room["source"] == "WEB"
        else:
            raise Exception(f"Unable to find data room with id '{data_room_id}'")

    @property
    def platform(self) -> SessionPlatformFeatures:
        """
        Provider of a list of convenience methods to interact with the
        Decentriq platform.

        This field exposes an object that provides a set of convenience features
        known from the Decentriq web platform. These include, for example,
        getting the list of data sets that are also visible in the browser-based
        platform UI.
        """
        if self._platform:
            return self._platform
        else:
            raise Exception(
                "This field is not set as the client from wich this session has"
                " been derived has not been configured with integration to the web"
                " platform."
            )

    @property
    def is_integrated_with_platform(self):
        return self._platform is not None
View Source
from typing import List, Any, Optional
import uuid


__all__ = [ "Node" ]


class Node:
    name: str
    """The name of the node."""

    config: bytes
    """Serialized configuration to use in the compute node definition."""

    enclave_type: str
    """What type of enclave this node uses."""

    dependencies: List[str]
    """A list of names of nodes on which this node directly depends."""

    output_format: Any
    """The `proto.ComputeNodeFormat` of the output from this node."""

    def __init__(
            self,
            name: str,
            config: bytes,
            enclave_type: str,
            dependencies: List[str],
            output_format: Any
    ):
        self.name = name
        self.config = config
        self.enclave_type = enclave_type
        self.dependencies = dependencies
        self.output_format = output_format